Skip to content

Commit

Permalink
Merge branch 'improvement/BB-566-avoid-infinite-replication' into q/8.7
Browse files Browse the repository at this point in the history
  • Loading branch information
bert-e committed Oct 29, 2024
2 parents 63c62a5 + f74d084 commit 6694ca8
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 1 deletion.
1 change: 1 addition & 0 deletions extensions/replication/ReplicationConfigValidator.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ const joiSchema = joi.object({
probeServerPerSite,
).default({ bindAddress: 'localhost', port: 4042 }),
circuitBreaker: joi.object().optional(),
sourceCheckIfSizeGreaterThanMB: joi.number().positive().default(100),
}).required(),
replicationStatusProcessor: {
groupId: joi.string().required(),
Expand Down
61 changes: 61 additions & 0 deletions extensions/replication/tasks/ReplicateObject.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ const { getAccountCredentials } = require('../../../lib/credentials/AccountCrede
const RoleCredentials = require('../../../lib/credentials/RoleCredentials');
const { metricsExtension, metricsTypeQueued, metricsTypeCompleted, replicationStages } = require('../constants');

const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry');

const errorAlreadyCompleted = {};

function _extractAccountIdFromRole(role) {
return role.split(':')[4];
}
Expand Down Expand Up @@ -331,6 +335,49 @@ class ReplicateObject extends BackbeatTask {
});
}

_refreshSourceEntry(sourceEntry, log, cb) {
const params = {
Bucket: sourceEntry.getBucket(),
Key: sourceEntry.getObjectKey(),
VersionId: sourceEntry.getEncodedVersionId(),
};
return this.backbeatSource.getMetadata(params, (err, blob) => {
if (err) {
err.origin = 'source';

Check warning on line 346 in extensions/replication/tasks/ReplicateObject.js

View workflow job for this annotation

GitHub Actions / tests

Assignment to property of function parameter 'err'
log.error('error getting metadata blob from S3', {
method: 'ReplicateObject._refreshSourceEntry',
error: err,
});
return cb(err);
}
const parsedEntry = ObjectQueueEntry.createFromBlob(blob.Body);
if (parsedEntry.error) {
log.error('error parsing metadata blob', {
error: parsedEntry.error,
method: 'ReplicateObject._refreshSourceEntry',
});
return cb(errors.InternalError.
customizeDescription('error parsing metadata blob'));
}
const refreshedEntry = new ObjectQueueEntry(sourceEntry.getBucket(),
sourceEntry.getObjectVersionedKey(), parsedEntry.result);
return cb(null, refreshedEntry);
});
}

_checkSourceReplication(sourceEntry, log, cb) {
this._refreshSourceEntry(sourceEntry, log, (err, refreshedEntry) => {
if (err) {
return cb(err);
}
const status = refreshedEntry.getReplicationSiteStatus(this.site);
if (status === 'COMPLETED') {
return cb(errorAlreadyCompleted);
}
return cb();
});
}

_getAndPutData(sourceEntry, destEntry, log, cb) {
log.debug('replicating data', { entry: sourceEntry.getLogInfo() });
if (sourceEntry.getLocation().some(part => {
Expand Down Expand Up @@ -700,6 +747,14 @@ class ReplicateObject extends BackbeatTask {
(sourceRole, targetRole, next) => {
this._setTargetAccountMd(destEntry, targetRole, log, next);
},
next => {
if (!mdOnly &&
sourceEntry.getContentLength() / 1000000 >=
this.repConfig.queueProcessor.sourceCheckIfSizeGreaterThanMB) {
return this._checkSourceReplication(sourceEntry, log, next);
}
return next();
},
// Get data from source bucket and put it on the target bucket
next => {
if (!mdOnly) {
Expand Down Expand Up @@ -768,6 +823,12 @@ class ReplicateObject extends BackbeatTask {
});
return done();
}
if (err === errorAlreadyCompleted) {
log.warn('replication skipped: ' +
'source object version already COMPLETED',
{ entry: sourceEntry.getLogInfo() });
return done();
}
if (err.ObjNotFound || err.code === 'ObjNotFound') {
if (err.origin === 'source') {
log.info('replication skipped: ' +
Expand Down
77 changes: 76 additions & 1 deletion tests/functional/replication/queueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,6 @@ class S3Mock extends TestConfigurator {
}

_getMetadataSource(req, url, query, res) {
assert(query.versionId);
res.writeHead(200);
res.end(JSON.stringify({
Body: JSON.parse(this.getParam('kafkaEntry.value')).value,
Expand Down Expand Up @@ -797,6 +796,7 @@ describe('queue processor functional tests with mocking', () => {
},
groupId: 'backbeat-func-test-group-id',
mpuPartsConcurrency: 10,
sourceCheckIfSizeGreaterThanMB: 10,
},
},
{ host: '127.0.0.1',
Expand Down Expand Up @@ -984,6 +984,62 @@ describe('queue processor functional tests with mocking', () => {
}),
], done);
});

it('should check object MD if size is bigger than sourceCheckIfSizeGreaterThanMB', done => {
s3mock.setParam('contentLength', 100000000);
let checkMdCalled = false;
s3mock.setParam('routes.source.s3.getMetadata.handler',
(req, url, query, res) => {
checkMdCalled = true;
s3mock.resetParam('routes.source.s3.getMetadata.handler');
s3mock._getMetadataSource(req, url, query, res);
}, { _static: true });

async.parallel([
done => {
s3mock.onPutSourceMd = done;
},
done => queueProcessorSF.processReplicationEntry(
s3mock.getParam('kafkaEntry'), err => {
assert.ifError(err);
assert.strictEqual(s3mock.hasPutTargetData, true);
assert(s3mock.hasPutTargetMd);
assert(checkMdCalled);
done();
}),
], () => {
s3mock.resetParam('contentLength');
done();
});
});

it('should not check object MD if size is smaller than sourceCheckIfSizeGreaterThanMB', done => {
s3mock.setParam('contentLength', 1);
let checkMdCalled = false;
s3mock.setParam('routes.source.s3.getMetadata.handler',
(req, url, query, res) => {
checkMdCalled = true;
s3mock.resetParam('routes.source.s3.getMetadata.handler');
s3mock._getMetadataSource(req, url, query, res);
}, { _static: true });

async.parallel([
done => {
s3mock.onPutSourceMd = done;
},
done => queueProcessorSF.processReplicationEntry(
s3mock.getParam('kafkaEntry'), err => {
assert.ifError(err);
assert.strictEqual(s3mock.hasPutTargetData, true);
assert(s3mock.hasPutTargetMd);
assert.strictEqual(checkMdCalled, false);
done();
}),
], () => {
s3mock.resetParam('contentLength');
done();
});
});
});

describe('error paths', function errorPaths() {
Expand Down Expand Up @@ -1451,6 +1507,25 @@ describe('queue processor functional tests with mocking', () => {
], done);
});
});

it('should fail a replication if unable to get metadata', done => {
s3mock.setParam('contentLength', 100000000);
s3mock.installBackbeatErrorResponder('source.s3.getMetadata',
errors.ObjNotFound,
{ once: true });
async.parallel([
done => queueProcessorSF.processReplicationEntry(
s3mock.getParam('kafkaEntry'), err => {
assert.ifError(err);
assert(!s3mock.hasPutTargetData);
assert(!s3mock.hasPutTargetMd);
done();
}),
], () => {
s3mock.resetParam('contentLength');
done();
});
});
});
});

Expand Down

0 comments on commit 6694ca8

Please sign in to comment.