Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 177 additions & 19 deletions lib/routes/routeBackbeat.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const { constants: { HTTP_STATUS_CONFLICT } } = require('http2');
const url = require('url');
const async = require('async');
const httpProxy = require('http-proxy');
Expand All @@ -7,7 +8,13 @@ const joi = require('@hapi/joi');
const backbeatProxy = httpProxy.createProxyServer({
ignorePath: true,
});
const { auth, errors, errorInstances, s3middleware, s3routes, models, storage } = require('arsenal');
const { auth, errors, errorInstances, s3middleware, s3routes, models, storage, versioning } = require('arsenal');
const { decode, encode } = versioning.VersionID;
const {
VersionIdCollisionException,
StaleMicroVersionIdException,
MicroVersionIdAlreadyStoredException,
} = require('@scality/cloudserverclient');

const { responseJSONBody } = s3routes.routesUtils;
const { getSubPartIds } = s3middleware.azureHelper.mpuUtils;
Expand All @@ -21,6 +28,7 @@ const locationStorageCheck = require('../api/apiUtils/object/locationStorageChec
const { dataStore } = require('../api/apiUtils/object/storeObject');
const prepareRequestContexts = require('../api/apiUtils/authorization/prepareRequestContexts');
const { decodeVersionId } = require('../api/apiUtils/object/versioning');
const getReplicationInfo = require('../api/apiUtils/object/getReplicationInfo');
const locationKeysHaveChanged = require('../api/apiUtils/object/locationKeysHaveChanged');
const { standardMetadataValidateBucketAndObj, metadataGetObject } = require('../metadata/metadataUtils');
const { config } = require('../Config');
Expand All @@ -32,6 +40,7 @@ const {
} = require('../api/apiUtils/integrity/validateChecksums');
const { BackendInfo } = models;
const { pushReplicationMetric } = require('./utilities/pushReplicationMetric');
const writeContinue = require('../utilities/writeContinue');
const kms = require('../kms/wrapper');
const { listLifecycleCurrents } = require('../api/backbeat/listLifecycleCurrents');
const { listLifecycleNonCurrents } = require('../api/backbeat/listLifecycleNonCurrents');
Expand Down Expand Up @@ -93,7 +102,7 @@ function _isObjectRequest(req) {
return ['data', 'metadata', 'multiplebackenddata', 'multiplebackendmetadata'].includes(req.resourceType);
}

function _respondWithHeaders(response, payload, extraHeaders, log, callback) {
function _respondWithHeaders(response, payload, extraHeaders, log, callback, statusCode = 200) {
let body = '';
if (typeof payload === 'string') {
body = payload;
Expand All @@ -115,10 +124,10 @@ function _respondWithHeaders(response, payload, extraHeaders, log, callback) {
// eslint-disable-next-line no-param-reassign
response.serverAccessLog.endTurnAroundTime = process.hrtime.bigint();
}
response.writeHead(200, httpHeaders);
response.writeHead(statusCode, httpHeaders);
response.end(body, 'utf8', () => {
log.end().info('responded with payload', {
httpCode: 200,
httpCode: statusCode,
contentLength: Buffer.byteLength(body),
});
callback();
Expand All @@ -129,6 +138,15 @@ function _respond(response, payload, log, callback) {
_respondWithHeaders(response, payload, {}, log, callback);
}

function _respondWithHeaderCrrConflict(response, log, callback, code, message, mvId) {
return _respondWithHeaders(
response,
{ code, message },
{ 'x-scal-micro-version-id': mvId ? encode(mvId) : '' },
log, callback, HTTP_STATUS_CONFLICT,
);
}

function _getRequestPayload(req, cb) {
const payload = [];
let payloadLen = 0;
Expand Down Expand Up @@ -414,6 +432,38 @@ function putData(request, response, bucketInfo, objMd, log, callback) {
log.error(errMessage);
return callback(errorInstances.BadRequest.customizeDescription(errMessage));
}

const incomingVersionIdEncoded = request.headers['x-scal-version-id'];
if (incomingVersionIdEncoded) {
const incomingVersionIdDecoded = decode(incomingVersionIdEncoded);
if (incomingVersionIdDecoded instanceof Error) {
log.error('crr putData: failed to decode x-scal-version-id header', {
method: 'putData',
error: incomingVersionIdDecoded.message,
});
return callback(errorInstances.BadRequest.customizeDescription(
'bad request: invalid x-scal-version-id header'));
}
if (objMd && objMd.versionId === incomingVersionIdDecoded) {
// Data already at destination for this version; return 409 with the existing
// microVersionId so backbeat can decide if putMetadata is still needed.
log.debug('crr putData: version already at destination', {
method: 'putData',
bucketName: request.bucketName,
objectKey: request.objectKey,
hasMicroVersionId: !!objMd.microVersionId,
});
request.resume();
return _respondWithHeaderCrrConflict(
response, log, callback,
VersionIdCollisionException.name,
'version id already at destination',
objMd.microVersionId,
);
}
}

writeContinue(request, response);
const context = {
bucketName: request.bucketName,
owner: canonicalID,
Expand Down Expand Up @@ -539,6 +589,65 @@ function getCanonicalIdsByAccountId(accountId, log, cb) {
}

function putMetadata(request, response, bucketInfo, objMd, log, callback) {
const { bucketName, objectKey } = request;

const encodedMicroVersionId = request.headers['x-scal-micro-version-id'];

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably need to differentiate between

  • the absence of microVersionId (ie. x-scal-micro-version-id is not present in the request) → no extra check
  • microVersionId is empty (something like x-scal-micro-version-id: "", when referring to the first metadata revision when object was created) → need the extra check, and check if the object already has newer micro version id (i.e. either it does not have one → "equal", or it does → "newer")

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I change the code a bit to do what you mention in the first point, and also return an error when the header is present but fail to be decoded like in putData.

For the other point,
The current implementation is : new source will always have a microVersionId, it's talked about in another comment at the top of this review, but I don't see how we can properly deal with empty microversion ids on new objects written after this feature is added, without having issues later. It means we would have situation like

A->B->C
I put on A, A has no microVersionID, but replicates to B and C which now have a microVersionID.
But A still doesn't have a microVersionID. What happens if there is a loop configuration and C goes back to A
What happens on the second write on A when a first write has already replicated on B and C ? B and C will have a microVersionID but A won't ?

It seems to me that deciding to not assign a microVersionID on all new putObject writes is gonna get real tricky in the long run

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new source will always have a microVersionId

I really don't understand why we need this.

This was discussed in the design, and is a critical point to mitigate the impact on MongoDB sizing.

  • It really just mean "no micro versionID" is also a valid micro version id, and older than any other valid microVersionID value
  • There is no additional risk due to concurrent putObject : as replication is possible only in versioned buckets, so object will be versioned and not have the same VersionID → so no conflict is possible (conflict is only ever checks on objects with THE SAME versionId)

I put on A, A has no microVersionID, but replicates to B and C which now have a microVersionID.

when replicating, we do putMetadata : so B and C should have the same "empty" microVersionID.
the replication itself MUST NOT change the microVersionID (or metadata, for that matter): it just performs an (almost) verbatim copy of the metadata.

const isCascadeWrite = encodedMicroVersionId !== undefined;
let incomingMicroVersionId = null;
if (isCascadeWrite) {
// '' means source has no microVersionId, treated as older revision
incomingMicroVersionId = encodedMicroVersionId === '' ? null : decode(encodedMicroVersionId);
if (incomingMicroVersionId instanceof Error) {
log.error('crr putMetadata: failed to decode x-scal-micro-version-id header', {
method: 'putMetadata',
error: incomingMicroVersionId.message,
});
return callback(errorInstances.BadRequest.customizeDescription(
'bad request: invalid x-scal-micro-version-id header'));
}
// Loop/stale detection only applies when the object already exists at destination
if (objMd) {
// null = oldest revision (original putObject, before any metadata update).
// VersionID strings are in reverse chronological order: a lexicographically
// larger string is an older revision.
const isSourceOlderThanDestination = (sourceMicroVersionId, destinationMicroVersionId) =>
destinationMicroVersionId !== null &&
(sourceMicroVersionId === null || sourceMicroVersionId > destinationMicroVersionId);

// Treat '' and undefined as null (no microVersionId set).
const sourceMicroVersionId = incomingMicroVersionId || null;
const destinationMicroVersionId = objMd.microVersionId || null;
if (sourceMicroVersionId === destinationMicroVersionId) {
log.debug('crr cascade putMetadata: loop detected, skipping write', {
method: 'putMetadata',
bucketName,
objectKey,
});
request.resume();
return _respondWithHeaderCrrConflict(
response, log, callback,
MicroVersionIdAlreadyStoredException.name,
'incoming microVersionId already at destination',
);
}
if (isSourceOlderThanDestination(sourceMicroVersionId, destinationMicroVersionId)) {
log.debug('crr cascade putMetadata: stale event, rejecting', {
method: 'putMetadata',
bucketName,
objectKey,
});
request.resume();
return _respondWithHeaderCrrConflict(
response, log, callback,
StaleMicroVersionIdException.name,
'incoming revision is older than destination',
objMd?.microVersionId,
);
}
}
}

Comment thread
SylvainSenechal marked this conversation as resolved.

return _getRequestPayload(request, (err, payload) => {
if (err) {
return callback(err);
Expand All @@ -552,15 +661,15 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {
return callback(errors.MalformedPOSTRequest);
}

const { headers, bucketName, objectKey } = request;
const { headers } = request;

// Destination-side delete-marker replication.
// We need the REPLICA status to distinguish from
// source-side replication status updates that also carry isDeleteMarker=true.
if (
omVal.isDeleteMarker &&
omVal.replicationInfo &&
omVal.replicationInfo.status === 'REPLICA' &&
(omVal.replicationInfo.isReplica === true || omVal.replicationInfo.status === 'REPLICA') &&
request.serverAccessLog
) {
// eslint-disable-next-line no-param-reassign
Expand All @@ -576,7 +685,7 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {
// The REPLICA status excludes source-side replication-status updates.
if (
omVal.replicationInfo &&
omVal.replicationInfo.status === 'REPLICA' &&
(omVal.replicationInfo.isReplica === true || omVal.replicationInfo.status === 'REPLICA') &&
(omVal.originOp === 's3:ObjectTagging:Put' || omVal.originOp === 's3:ObjectTagging:Delete') &&
request.serverAccessLog
) {
Expand All @@ -593,7 +702,7 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {
// The REPLICA status excludes source-side replication-status updates.
if (
omVal.replicationInfo &&
omVal.replicationInfo.status === 'REPLICA' &&
(omVal.replicationInfo.isReplica === true || omVal.replicationInfo.status === 'REPLICA') &&
omVal.originOp === 's3:ObjectAcl:Put' &&
request.serverAccessLog
) {
Expand All @@ -607,17 +716,23 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {

if (headers['x-scal-replication-content'] === 'METADATA') {
if (!objMd) {
return callback(errors.ObjNotFound);
// For metadata-only writes, the destination's data location must be
// preserved from the existing object. If there is no existing object
// and the source has data, there is nothing to copy the location from.
// Zero-byte objects have no data location, so this is safe to skip.
if (omVal['content-length'] > 0) {
return callback(errors.ObjNotFound);
}
} else {
[
'location',
'x-amz-server-side-encryption',
'x-amz-server-side-encryption-aws-kms-key-id',
'x-amz-server-side-encryption-customer-algorithm',
].forEach(headerName => {
omVal[headerName] = objMd[headerName];
});
}

[
'location',
'x-amz-server-side-encryption',
'x-amz-server-side-encryption-aws-kms-key-id',
'x-amz-server-side-encryption-customer-algorithm',
].forEach(headerName => {
omVal[headerName] = objMd[headerName];
});
}

let versionId = decodeVersionId(request.query);
Expand Down Expand Up @@ -672,7 +787,8 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {
// then we want to create a version for the replica object even though
// none was provided in the object metadata value.
if (omVal.replicationInfo.isNFS) {
const { isReplica } = omVal.replicationInfo;
const isReplica = omVal.replicationInfo.isReplica === true
|| omVal.replicationInfo.status === 'REPLICA';
versioning = isReplica;
omVal.replicationInfo.isNFS = !isReplica;
}
Expand Down Expand Up @@ -724,6 +840,48 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {
options.isNull = isNull;
}

// Cascade triggering
// If the bucket receiving this replica has its own CRR rules, set
// status to PENDING so the queue populator here picks it up for the
// next hop. If not, clear the source-side replicationInfo fields
// Always mark isReplica=true.
if (isCascadeWrite) {
const isMDOnly = headers['x-scal-replication-content'] === 'METADATA';
const objSize = omVal['content-length'] || 0;

// These S3-compatible Scality locations are excluded
// as cascade targets because they use the MultiBackend S3 path which
// bypasses the putData/putMetadata routes, so loop detection cannot fire
// on those destinations.
Comment on lines +852 to +855

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this part of the design? I may be wrong, but I thought we wanted (needed maybe, to be check with product team) to trigger multi-backend replication after CRR ?
→ please double check

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's here https://github.com/scality/citadel/pull/349/changes#diff-f4fbff9b43d54385a8778f028f6950744393bb5235c5860afa7eec1b89072f83R266

but yeah i need to triple check it because yes i don't really like this part with these hardcoded locations

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We rediscussed with Maël, I think this is ok to keep, we don't want cascade replication to continue on these locations

const BLOCKED_LOCATION_TYPES = ['location-scality-ring-s3-v1', 'location-scality-artesca-s3-v1'];

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it be tested on a real lab ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be honestly, i think i need to double check this together with the design and the code


const nextReplInfo = getReplicationInfo(config, objectKey, bucketInfo, isMDOnly, objSize, null, null, null);

if (nextReplInfo) {
nextReplInfo.backends = nextReplInfo.backends.filter(b => {
const loc = config.locationConstraints[b.site];
return !loc || !BLOCKED_LOCATION_TYPES.includes(loc.type);
});
Comment on lines +861 to +864

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic should be in getReplicationInfo, this is the function responsible to build the list of backend/destinations. Doing it there will also avoid create a backend entry only to remove it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure you wanna do this ? It means I have to add some kind of flag to getReplicationInfo to do the current logic in getReplicationInfo 🤔

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I see, maybe it can be done cleanly if, in getReplicationInfo.js I add a helper function, leave the existing function almost as it is and just call that extra helper at the end of getReplicationInfo if the flag is on ?

}

if (nextReplInfo && nextReplInfo.backends.length > 0) {
omVal.replicationInfo = nextReplInfo;
} else {
omVal.replicationInfo = {
Comment thread
SylvainSenechal marked this conversation as resolved.
status: '',
backends: [],
content: [],
destination: '',
storageClass: '',
role: '',
storageType: '',
dataStoreVersionId: '',
};
}

omVal.replicationInfo.isReplica = true;
}

return async.series(
[
// Zenko's CRR delegates replacing the account
Expand Down
12 changes: 6 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@
},
"homepage": "https://github.com/scality/S3#readme",
"dependencies": {
"@aws-crypto/crc32": "^5.2.0",
"@aws-crypto/crc32c": "^5.2.0",
"@aws-sdk/client-iam": "^3.930.0",
"@aws-sdk/client-s3": "^3.1013.0",
"@aws-sdk/client-sts": "^3.930.0",
"@aws-sdk/crc64-nvme-crt": "^3.989.0",
Comment thread
SylvainSenechal marked this conversation as resolved.
"@aws-sdk/credential-providers": "^3.864.0",
"@aws-sdk/middleware-retry": "^3.374.0",
"@aws-sdk/protocol-http": "^3.374.0",
"@aws-sdk/s3-request-presigner": "^3.901.0",
"@aws-sdk/signature-v4": "^3.374.0",
"@aws-crypto/crc32": "^5.2.0",
"@aws-crypto/crc32c": "^5.2.0",
"@aws-sdk/crc64-nvme-crt": "^3.989.0",
"@azure/storage-blob": "^12.28.0",
"@hapi/joi": "^17.1.1",
"@opentelemetry/api": "^1.9.0",
Expand Down Expand Up @@ -65,11 +65,11 @@
"vaultclient": "scality/vaultclient#8.5.3",
"werelogs": "scality/werelogs#8.2.2",
"ws": "^8.18.0",
"@scality/cloudserverclient": "1.0.9",
"xml2js": "^0.6.2"
Comment thread
SylvainSenechal marked this conversation as resolved.
},
"devDependencies": {
"@eslint/compat": "^1.2.2",
"@scality/cloudserverclient": "1.0.7",
"@scality/eslint-config-scality": "scality/Guidelines#8.3.1",
"eslint": "^9.14.0",
"eslint-plugin-import": "^2.31.0",
Expand All @@ -88,10 +88,10 @@
"nodemon": "^3.1.10",
"nyc": "^15.1.0",
"pino-pretty": "^13.1.3",
"prettier": "^3.4.2",
"sinon": "^13.0.1",
"ts-morph": "^28.0.0",
"tv4": "^1.3.0",
"prettier": "^3.4.2"
"tv4": "^1.3.0"
},
"resolutions": {
"jsonwebtoken": "^9.0.0",
Expand Down
Loading
Loading