Skip to content

Commit

Permalink
BB-413: backport conductor checkpoint feature
Browse files Browse the repository at this point in the history
  • Loading branch information
rachedbenmustapha committed Aug 25, 2023
1 parent bcfcaa1 commit ae56f08
Showing 1 changed file with 99 additions and 16 deletions.
115 changes: 99 additions & 16 deletions extensions/lifecycle/conductor/LifecycleConductor.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ const safeJsonParse = require('../util/safeJsonParse');
const { AccountIdCache } = require('../util/AccountIdCache');
const { BreakerState, CircuitBreaker } = require('breakbeat').CircuitBreaker;
const {
updateCircuitBreakerConfigForImplicitOutputQueue
updateCircuitBreakerConfigForImplicitOutputQueue,
} = require('../../../lib/CircuitBreaker');

const DEFAULT_CRON_RULE = '* * * * *';
const DEFAULT_CONCURRENCY = 10;
const ACCOUNT_SPLITTER = ':';
const BUCKET_CHECKPOINT_PUSH_NUMBER = 50;
const BUCKET_CHECKPOINT_PUSH_NUMBER_BUCKETD = 50;

const LIFEYCLE_CONDUCTOR_CLIENT_ID = 'lifecycle:conductor';

Expand Down Expand Up @@ -315,7 +315,13 @@ class LifecycleConductor {
return this.listZookeeperBuckets(queue, log, cb);
}

return this.listBucketdBuckets(queue, log, cb);
return this.restoreBucketCheckpoint((err, marker) => {
if (err) {
return cb(err);
}

return this.listBucketdBuckets(queue, marker || null, log, cb);
});
}

listZookeeperBuckets(queue, log, cb) {
Expand Down Expand Up @@ -361,24 +367,73 @@ class LifecycleConductor {
});
}

listBucketdBuckets(queue, log, cb) {
let isTruncated = false;
let marker = null;
checkpointBucket(bucketEntry, cb) {
if (bucketEntry === null) {
return process.nextTick(cb);
}

return this._zkClient.setData(
this.getBucketProgressZkPath(),
Buffer.from(bucketEntry),
this.lastSentVersion,
(err, stat) => {
if (err) {
return cb(err);
}

if (stat) {
this.lastSentVersion = stat.version;
}

this.lastSentId = null;

return cb();
},
);
}

restoreBucketCheckpoint(cb) {
this._zkClient.getData(this.getBucketProgressZkPath(), (err, data, stat) => {
if (err) {
return cb(err);
}

const entry = data ? data.toString('ascii') : null;
if (stat) {
this.lastSentVersion = stat.version;
}

return cb(null, entry);
});
}

listBucketdBuckets(queue, initMarker, log, cb) {
let isTruncated = true;
let marker = initMarker;
let nEnqueued = 0;
const start = new Date();
const retryWrapper = new BackbeatTask();

this.lastSentId = null;
this.lastSentVersion = -1;

async.doWhilst(
next => {
if (queue.length() > this._maxInFlightBatchSize) {
log.info('delaying bucket pull', {
nEnqueuedToDownstream: nEnqueued,
inFlight: queue.length(),
maxInFlight: this._maxInFlightBatchSize,
bucketListingPushRateHz: Math.round(nEnqueued * 1000 / (new Date() - start)),
const breakerState = this._circuitBreaker.state;
const queueInfo = {
nEnqueuedToDownstream: nEnqueued,
inFlight: queue.length(),
maxInFlight: this._maxInFlightBatchSize,
bucketListingPushRateHz: Math.round(nEnqueued * 1000 / (new Date() - start)),
breakerState,
};

if (queue.length() > this._maxInFlightBatchSize ||
breakerState !== BreakerState.Nominal) {
log.info('delaying bucket pull', queueInfo);
return this.checkpointBucket(this.lastSentId, () => {
setTimeout(next, 10000);
});

return setTimeout(next, 10000);
}

return retryWrapper.retry({
Expand All @@ -400,6 +455,7 @@ class LifecycleConductor {
}

isTruncated = result.IsTruncated;
let needCheckpoint = false;

result.Contents.forEach(o => {
marker = o.key;
Expand All @@ -408,6 +464,11 @@ class LifecycleConductor {
if (!this._isBlacklisted(canonicalId, bucketName)) {
nEnqueued += 1;
queue.push({ canonicalId, bucketName });
this.lastSentId = o.key;
if (nEnqueued % BUCKET_CHECKPOINT_PUSH_NUMBER_BUCKETD === 0) {
needCheckpoint = true;
}

// Optimization:
// If we only blacklist by accounts, and the last bucket is blacklisted
// we can skip listing buckets until the next account.
Expand All @@ -419,6 +480,10 @@ class LifecycleConductor {
}
});

if (needCheckpoint) {
return this.checkpointBucket(marker, done);
}

return done();
}
),
Expand All @@ -427,7 +492,19 @@ class LifecycleConductor {
}, next);
},
() => isTruncated,
err => cb(err, nEnqueued));
err => {
if (!err) {
// clear last seen bucket from zk
this.checkpointBucket('', err => {
if (err) {
return cb(err);
}

return cb(null, nEnqueued);
});
}
}
);
}

_controlBacklog(done) {
Expand Down Expand Up @@ -521,7 +598,7 @@ class LifecycleConductor {
}

_setupZookeeperClient(cb) {
if (this._bucketSource !== 'zookeeper') {
if (!this.needsZookeeper()) {
process.nextTick(cb);
return;
}
Expand All @@ -542,6 +619,12 @@ class LifecycleConductor {
});
}

needsZookeeper() {
return this._bucketSource === 'zookeeper' || // bucket list stored in zk
this._bucketSource === 'mongodb' || // bucket stream checkpoints in zk
this._bucketSource === 'bucketd'; // bucket stream checkpoints in zk
}

/**
* Initialize kafka producer and clients
*
Expand Down

0 comments on commit ae56f08

Please sign in to comment.