Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/improvement/BB-434/newerNoncurre…
Browse files Browse the repository at this point in the history
…ntVersions' into w/8.5/improvement/BB-434/newerNoncurrentVersions
  • Loading branch information
nicolas2bert committed Sep 1, 2023
2 parents bd8e92f + 6f0963e commit 7076f80
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 17 deletions.
9 changes: 6 additions & 3 deletions extensions/lifecycle/tasks/LifecycleTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,14 @@ class LifecycleTask extends BackbeatTask {
const ncvHeapObject = this.ncvHeap.get(bucketName).get(version.Key);

const nncvSize = parseInt(rule[ncve][nncv], 10);
if (!ncvHeapObject.get(rule.Id)) {
ncvHeapObject.set(rule.Id, new MinHeap(nncvSize, noncurrentVersionCompare));

const ruleId = rule[ncve].ID;

if (!ncvHeapObject.get(ruleId)) {
ncvHeapObject.set(ruleId, new MinHeap(nncvSize, noncurrentVersionCompare));
}

const heap = ncvHeapObject.get(rule.Id);
const heap = ncvHeapObject.get(ruleId);

if (heap.size < nncvSize) {
heap.add(version);
Expand Down
32 changes: 31 additions & 1 deletion extensions/lifecycle/tasks/LifecycleTaskV2.js
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,18 @@ class LifecycleTaskV2 extends LifecycleTask {
return done(err);
}

// create Set of unique keys not matching the next marker to
// indicate the object level entries to be cleared at the end
// of the processing step
const uniqueObjectKeysNotNextMarker = new Set();
if (markerInfo.keyMarker) {
contents.forEach(v => {
if (v.Key !== markerInfo.keyMarker) {
uniqueObjectKeysNotNextMarker.add(v.Key);
}
});
}

// re-queue truncated listing only once.
if (isTruncated && nbRetries === 0) {
const entry = Object.assign({}, bucketData, {
Expand All @@ -198,7 +210,25 @@ class LifecycleTaskV2 extends LifecycleTask {
});
}
return this._compareRulesToList(bucketData, bucketLCRules,
contents, log, done);
contents, log, err => {
if (err) {
return done(err);
}

if (!isTruncated) {
// end of bucket listing
// clear bucket level entry and all object entries
this._ncvHeapBucketClear(bucketData.target.bucket);
} else {
// clear object level entries that have been processed
this._ncvHeapObjectsClear(
bucketData.target.bucket,
uniqueObjectKeysNotNextMarker
);
}

return done();
});
});
}

Expand Down
141 changes: 141 additions & 0 deletions tests/functional/lifecycle/LifecycleTaskV2-versioned.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ describe('LifecycleTaskV2 with bucket versioned', () => {
kafkaBacklogMetrics: { snapshotTopicOffsets: () => {} },
pausedLocations: new Set(),
log,
ncvHeap: new Map(),
}),
};
lifecycleTask = new LifecycleTaskV2(lp);
Expand Down Expand Up @@ -747,4 +748,144 @@ describe('LifecycleTaskV2 with bucket versioned', () => {
return done();
});
});

it('should not publish any entry because version is retain by NewerNoncurrentVersions', done => {
const nonCurrentExpirationRule = [{
NoncurrentVersionExpiration: {
NoncurrentDays: 2,
NewerNoncurrentVersions: 1,
},
ID: '123',
Prefix: '',
Status: 'Enabled',
}];

const keyName = 'key1';
const versionId = 'versionid1';
const key = keyMock.nonCurrent({ keyName, versionId, daysEarlier: 1 });
const contents = [key];
backbeatMetadataProxy.listLifecycleResponse = { contents, isTruncated: false, markerInfo: {} };

const nbRetries = 0;
return lifecycleTask.processBucketEntry(nonCurrentExpirationRule, bucketData, s3,
backbeatMetadataProxy, nbRetries, err => {
assert.ifError(err);
// test that the non-current listing is triggered
assert.strictEqual(backbeatMetadataProxy.listLifecycleType, 'noncurrent');

// test parameters used to list lifecycle keys
const { listLifecycleParams } = backbeatMetadataProxy;
expectNominalListingParams(bucketName, listLifecycleParams);

// test that the entry is valid and pushed to kafka topic
assert.strictEqual(kafkaEntries.length, 0);
return done();
});
});

it('should retain the first non current version but publish the second one', done => {
const nonCurrentExpirationRule = [{
NoncurrentVersionExpiration: {
NoncurrentDays: 2,
NewerNoncurrentVersions: 1,
},
ID: '123',
Prefix: '',
Status: 'Enabled',
}];

const keyName = 'key1';
const versionId = 'versionid1';
const version1 = keyMock.nonCurrent({ keyName, versionId, daysEarlier: 2 });
const versionId2 = 'versionid2';
const version2 = keyMock.nonCurrent({ keyName, versionId: versionId2, daysEarlier: 1 });
const contents = [version1, version2];
backbeatMetadataProxy.listLifecycleResponse = { contents, isTruncated: false, markerInfo: {} };

const nbRetries = 0;
return lifecycleTask.processBucketEntry(nonCurrentExpirationRule, bucketData, s3,
backbeatMetadataProxy, nbRetries, err => {
assert.ifError(err);
// test that the non-current listing is triggered
assert.strictEqual(backbeatMetadataProxy.listLifecycleType, 'noncurrent');

// test parameters used to list lifecycle keys
const { listLifecycleParams } = backbeatMetadataProxy;
expectNominalListingParams(bucketName, listLifecycleParams);

// test that the entry is valid and pushed to kafka topic
assert.strictEqual(kafkaEntries.length, 1);
const firstEntry = kafkaEntries[0];
testKafkaEntry.expectObjectExpirationEntry(firstEntry, { keyName, versionId });
return done();
});
});

it('should retain the correct number of versions when the rule changes', done => {
const nonCurrentExpirationRule = [{
NoncurrentVersionExpiration: {
NoncurrentDays: 2,
NewerNoncurrentVersions: 2,
},
ID: '123',
Prefix: '',
Status: 'Enabled',
}];

const keyName = 'key1';
const versionId = 'versionid1';
const version1 = keyMock.nonCurrent({ keyName, versionId, daysEarlier: 2 });
const versionId2 = 'versionid2';
const version2 = keyMock.nonCurrent({ keyName, versionId: versionId2, daysEarlier: 1 });
const contents = [version1, version2];
backbeatMetadataProxy.listLifecycleResponse = {
contents,
isTruncated: true,
markerInfo: { keyMarker: 'key1', versionIdMarker: 'versionid2' },
};

const nbRetries = 0;
return lifecycleTask.processBucketEntry(nonCurrentExpirationRule, bucketData, s3,
backbeatMetadataProxy, nbRetries, err => {
assert.ifError(err);

kafkaEntries = [];
const nonCurrentExpirationRule2 = [{
NoncurrentVersionExpiration: {
NoncurrentDays: 2,
NewerNoncurrentVersions: 1,
},
ID: '456',
Prefix: '',
Status: 'Enabled',
}];

return lifecycleTask.processBucketEntry(nonCurrentExpirationRule2, bucketData, s3,
backbeatMetadataProxy, nbRetries, err => {
assert.ifError(err);
// test that the non-current listing is triggered
assert.strictEqual(backbeatMetadataProxy.listLifecycleType, 'noncurrent');

// test parameters used to list lifecycle keys
const { listLifecycleParams } = backbeatMetadataProxy;
expectNominalListingParams(bucketName, listLifecycleParams);

// test that the entry is valid and pushed to kafka topic
assert.strictEqual(kafkaEntries.length, 2);

const firstEntry = kafkaEntries[0];
testKafkaEntry.expectBucketEntry(firstEntry, {
hasBeforeDate: true,
keyMarker: keyName,
versionIdMarker: versionId2,
prefix: '',
listType: 'noncurrent',
});

const secondEntry = kafkaEntries[1];
testKafkaEntry.expectObjectExpirationEntry(secondEntry, { keyName, versionId });
return done();
});
});
});
});
27 changes: 14 additions & 13 deletions tests/unit/lifecycle/LifecycleTask.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -1730,9 +1730,10 @@ describe('lifecycle task helper methods', () => {
});

it('should clear the ncvHeap object of the listed bucket/keys', () => {
const ruleId = 'rule_name';
const rules = {
Id: 'rule_name',
NoncurrentVersionExpiration: {
ID: ruleId,
NoncurrentDays: 1,
NewerNoncurrentVersions: 10,
},
Expand Down Expand Up @@ -1773,16 +1774,16 @@ describe('lifecycle task helper methods', () => {
assert(lct2.ncvHeap.has(b1));
assert(!lct2.ncvHeap.get(b1).has(version1.Key));
assert(!lct2.ncvHeap.get(b1).has(version2.Key));
assert(lct2.ncvHeap.get(b1).get(version3.Key).has(rules.Id));
assert.strictEqual(lct2.ncvHeap.get(b1).get(version3.Key).get(rules.Id).size, 1);
assert(lct2.ncvHeap.get(b1).get(version3.Key).has(ruleId));
assert.strictEqual(lct2.ncvHeap.get(b1).get(version3.Key).get(ruleId).size, 1);

assert(lct2.ncvHeap.has(b2));
assert(lct2.ncvHeap.get(b2).has(version1.Key));
assert(lct2.ncvHeap.get(b2).get(version1.Key).has(rules.Id));
assert.strictEqual(lct2.ncvHeap.get(b2).get(version1.Key).get(rules.Id).size, 1);
assert(lct2.ncvHeap.get(b2).get(version1.Key).has(ruleId));
assert.strictEqual(lct2.ncvHeap.get(b2).get(version1.Key).get(ruleId).size, 1);
assert(lct2.ncvHeap.get(b2).has(version2.Key));
assert(lct2.ncvHeap.get(b2).get(version2.Key).has(rules.Id));
assert.strictEqual(lct2.ncvHeap.get(b2).get(version2.Key).get(rules.Id).size, 1);
assert(lct2.ncvHeap.get(b2).get(version2.Key).has(ruleId));
assert.strictEqual(lct2.ncvHeap.get(b2).get(version2.Key).get(ruleId).size, 1);
});
});

Expand Down Expand Up @@ -1813,9 +1814,10 @@ describe('lifecycle task helper methods', () => {
});

it('should clear the ncvHeap object of the listed bucket/keys', () => {
const ruleId = 'rule_name';
const rules = {
Id: 'rule_name',
NoncurrentVersionExpiration: {
ID: ruleId,
NoncurrentDays: 1,
NewerNoncurrentVersions: 10,
},
Expand Down Expand Up @@ -1854,11 +1856,11 @@ describe('lifecycle task helper methods', () => {
assert(!lct2.ncvHeap.has(b1));
assert(lct2.ncvHeap.has(b2));
assert(lct2.ncvHeap.get(b2).has(version1.Key));
assert(lct2.ncvHeap.get(b2).get(version1.Key).has(rules.Id));
assert.strictEqual(lct2.ncvHeap.get(b2).get(version1.Key).get(rules.Id).size, 1);
assert(lct2.ncvHeap.get(b2).get(version1.Key).has(ruleId));
assert.strictEqual(lct2.ncvHeap.get(b2).get(version1.Key).get(ruleId).size, 1);
assert(lct2.ncvHeap.get(b2).has(version2.Key));
assert(lct2.ncvHeap.get(b2).get(version2.Key).has(rules.Id));
assert.strictEqual(lct2.ncvHeap.get(b2).get(version2.Key).get(rules.Id).size, 1);
assert(lct2.ncvHeap.get(b2).get(version2.Key).has(ruleId));
assert.strictEqual(lct2.ncvHeap.get(b2).get(version2.Key).get(ruleId).size, 1);
});
});

Expand Down Expand Up @@ -2049,4 +2051,3 @@ describe('lifecycle task helper methods', () => {
});
});
});

0 comments on commit 7076f80

Please sign in to comment.