Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Circuit breaker backport #2425

Merged
merged 10 commits into from
Sep 14, 2023
3 changes: 3 additions & 0 deletions extensions/lifecycle/LifecycleConfigValidator.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const joiSchema = {
}),
}),
probeServer: probeServerJoi.default(),
circuitBreaker: joi.object().optional(),
},
bucketProcessor: {
groupId: joi.string().required(),
Expand All @@ -34,12 +35,14 @@ const joiSchema = {
// overloading the system
concurrency: joi.number().greater(0).default(1),
probeServer: probeServerJoi.default(),
circuitBreaker: joi.object().optional(),
},
objectProcessor: {
groupId: joi.string().required(),
retry: retryParamsJoi,
concurrency: joi.number().greater(0).default(10),
probeServer: probeServerJoi.default(),
circuitBreaker: joi.object().optional(),
},
};

Expand Down
10 changes: 10 additions & 0 deletions extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ const { createBackbeatClient, createS3Client } = require('../../../lib/clients/u
const LifecycleTaskV2 = require('../tasks/LifecycleTaskV2');
const safeJsonParse = require('../util/safeJsonParse');
const { authTypeAssumeRole } = require('../../../lib/constants');
const {
updateCircuitBreakerConfigForImplicitOutputQueue,
} = require('../../../lib/CircuitBreaker');

const PROCESS_OBJECTS_ACTION = 'processObjects';

Expand Down Expand Up @@ -153,6 +156,12 @@ class LifecycleBucketProcessor {
});
}
});

this._circuitBreakerConfig = updateCircuitBreakerConfigForImplicitOutputQueue(
this._lcConfig.bucketProcessor.circuitBreaker,
this._lcConfig.objectProcessor.groupId,
this._lcConfig.objectTasksTopic,
);
}

/**
Expand Down Expand Up @@ -436,6 +445,7 @@ class LifecycleBucketProcessor {
groupId: this._lcConfig.bucketProcessor.groupId,
concurrency: this._lcConfig.bucketProcessor.concurrency,
queueProcessor: this._processBucketEntry.bind(this),
circuitBreaker: this._circuitBreakerConfig,
});
this._consumer.on('error', err => {
if (!this._consumerReady) {
Expand Down
155 changes: 137 additions & 18 deletions extensions/lifecycle/conductor/LifecycleConductor.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ const { authTypeAssumeRole } = require('../../../lib/constants');
const VaultClientCache = require('../../../lib/clients/VaultClientCache');
const safeJsonParse = require('../util/safeJsonParse');
const { AccountIdCache } = require('../util/AccountIdCache');
const { BreakerState, CircuitBreaker } = require('breakbeat').CircuitBreaker;
const {
updateCircuitBreakerConfigForImplicitOutputQueue,
} = require('../../../lib/CircuitBreaker');

const DEFAULT_CRON_RULE = '* * * * *';
const DEFAULT_CONCURRENCY = 10;
const ACCOUNT_SPLITTER = ':';
const BUCKET_CHECKPOINT_PUSH_NUMBER_BUCKETD = 50;

const LIFEYCLE_CONDUCTOR_CLIENT_ID = 'lifecycle:conductor';

Expand Down Expand Up @@ -103,6 +108,23 @@ class LifecycleConductor {
this.onlyBlacklistAccounts = this.bucketsBlacklisted.size === 0 && this.accountsBlacklisted.size > 0;

this.logger = new Logger('Backbeat:Lifecycle:Conductor');

const circuitBreakerConfig = updateCircuitBreakerConfigForImplicitOutputQueue(
lcConfig.conductor.circuitBreaker,
lcConfig.bucketProcessor.groupId,
lcConfig.bucketTasksTopic,
);

this._circuitBreaker = this.buildCircuitBreaker(circuitBreakerConfig);
}

buildCircuitBreaker(conf) {
try {
return new CircuitBreaker(conf);
} catch (e) {
this.logger.error('invalid circuit breaker configuration');
throw e;
}
}

/**
Expand All @@ -129,9 +151,19 @@ class LifecycleConductor {
return `${this.lcConfig.zookeeperPath}/data/buckets`;
}

getBucketProgressZkPath() {
return `${this.lcConfig.zookeeperPath}/data/bucket-send-progress`;
}

initZkPaths(cb) {
async.each([this.getBucketsZkPath()],
(path, done) => this._zkClient.mkdirp(path, done), cb);
async.each(
[
this.getBucketsZkPath(),
this.getBucketProgressZkPath(),
],
(path, done) => this._zkClient.mkdirp(path, done),
cb,
);
}

_isBlacklisted(canonicalId, bucketName) {
Expand Down Expand Up @@ -283,7 +315,13 @@ class LifecycleConductor {
return this.listZookeeperBuckets(queue, log, cb);
}

return this.listBucketdBuckets(queue, log, cb);
return this.restoreBucketCheckpoint((err, marker) => {
if (err) {
return cb(err);
}

return this.listBucketdBuckets(queue, marker || null, log, cb);
});
}

listZookeeperBuckets(queue, log, cb) {
Expand Down Expand Up @@ -329,24 +367,73 @@ class LifecycleConductor {
});
}

listBucketdBuckets(queue, log, cb) {
let isTruncated = false;
let marker = null;
checkpointBucket(bucketEntry, cb) {
if (bucketEntry === null) {
return process.nextTick(cb);
}

return this._zkClient.setData(
this.getBucketProgressZkPath(),
Buffer.from(bucketEntry),
this.lastSentVersion,
(err, stat) => {
if (err) {
return cb(err);
}

if (stat) {
this.lastSentVersion = stat.version;
}

this.lastSentId = null;

return cb();
},
);
}

restoreBucketCheckpoint(cb) {
this._zkClient.getData(this.getBucketProgressZkPath(), (err, data, stat) => {
if (err) {
return cb(err);
}

const entry = data ? data.toString('ascii') : null;
if (stat) {
this.lastSentVersion = stat.version;
}

return cb(null, entry);
});
}

listBucketdBuckets(queue, initMarker, log, cb) {
let isTruncated = true;
let marker = initMarker;
let nEnqueued = 0;
const start = new Date();
const retryWrapper = new BackbeatTask();

this.lastSentId = null;
this.lastSentVersion = -1;

async.doWhilst(
next => {
if (queue.length() > this._maxInFlightBatchSize) {
log.info('delaying bucket pull', {
nEnqueuedToDownstream: nEnqueued,
inFlight: queue.length(),
maxInFlight: this._maxInFlightBatchSize,
bucketListingPushRateHz: Math.round(nEnqueued * 1000 / (new Date() - start)),
const breakerState = this._circuitBreaker.state;
const queueInfo = {
nEnqueuedToDownstream: nEnqueued,
inFlight: queue.length(),
maxInFlight: this._maxInFlightBatchSize,
bucketListingPushRateHz: Math.round(nEnqueued * 1000 / (new Date() - start)),
breakerState,
};

if (queue.length() > this._maxInFlightBatchSize ||
breakerState !== BreakerState.Nominal) {
log.info('delaying bucket pull', queueInfo);
return this.checkpointBucket(this.lastSentId, () => {
setTimeout(next, 10000);
});

return setTimeout(next, 10000);
}

return retryWrapper.retry({
Expand All @@ -368,6 +455,7 @@ class LifecycleConductor {
}

isTruncated = result.IsTruncated;
let needCheckpoint = false;

result.Contents.forEach(o => {
marker = o.key;
Expand All @@ -376,6 +464,11 @@ class LifecycleConductor {
if (!this._isBlacklisted(canonicalId, bucketName)) {
nEnqueued += 1;
queue.push({ canonicalId, bucketName });
this.lastSentId = o.key;
if (nEnqueued % BUCKET_CHECKPOINT_PUSH_NUMBER_BUCKETD === 0) {
needCheckpoint = true;
}

// Optimization:
// If we only blacklist by accounts, and the last bucket is blacklisted
// we can skip listing buckets until the next account.
Expand All @@ -387,6 +480,10 @@ class LifecycleConductor {
}
});

if (needCheckpoint) {
return this.checkpointBucket(marker, done);
}

return done();
}
),
Expand All @@ -395,7 +492,21 @@ class LifecycleConductor {
}, next);
},
() => isTruncated,
err => cb(err, nEnqueued));
err => {
if (err) {
return cb(err, nEnqueued);
}

// clear last seen bucket from zk
return this.checkpointBucket('', err => {
if (err) {
return cb(err);
}

return cb(null, nEnqueued);
});
}
);
}

_controlBacklog(done) {
Expand Down Expand Up @@ -489,12 +600,12 @@ class LifecycleConductor {
}

_setupZookeeperClient(cb) {
if (this._bucketSource !== 'zookeeper') {
if (!this.needsZookeeper()) {
process.nextTick(cb);
return;
}
this._zkClient = zookeeperHelper.createClient(
this.zkConfig.connectionString);
this.zkConfig.connectionString, this.zkConfig);
this._zkClient.connect();
this._zkClient.once('error', cb);
this._zkClient.once('ready', () => {
Expand All @@ -506,10 +617,16 @@ class LifecycleConductor {
'error from lifecycle conductor zookeeper client',
{ error: err });
});
cb();
this.initZkPaths(cb);
});
}

needsZookeeper() {
return this._bucketSource === 'zookeeper' || // bucket list stored in zk
this._bucketSource === 'mongodb' || // bucket stream checkpoints in zk
this._bucketSource === 'bucketd'; // bucket stream checkpoints in zk
Comment on lines +625 to +627
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Little more compact (feel free to ignore):

Suggested change
return this._bucketSource === 'zookeeper' || // bucket list stored in zk
this._bucketSource === 'mongodb' || // bucket stream checkpoints in zk
this._bucketSource === 'bucketd'; // bucket stream checkpoints in zk
return ['zookeeper', 'mongodb', 'bucketd'].includes(this._bucketSource);

}

/**
* Initialize kafka producer and clients
*
Expand All @@ -522,6 +639,7 @@ class LifecycleConductor {
return process.nextTick(done);
}

this._circuitBreaker.start();
this._setupVaultClientCache();
return async.series([
next => this._setupProducer(next),
Expand Down Expand Up @@ -622,6 +740,7 @@ class LifecycleConductor {
});
},
], err => {
this._circuitBreaker.stop();
this._started = false;
return done(err);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class LifecycleObjectProcessor extends EventEmitter {
groupId: this._lcConfig.objectProcessor.groupId,
concurrency: this._lcConfig.objectProcessor.concurrency,
queueProcessor: this.processKafkaEntry.bind(this),
circuitBreaker: this._lcConfig.objectProcessor.circuitBreaker,
});
this._consumer.on('error', err => {
if (!consumerReady) {
Expand Down
1 change: 1 addition & 0 deletions extensions/replication/ReplicationConfigValidator.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ const joiSchema = {
})
),
sourceCheckIfSizeGreaterThanMB: joi.number().positive().default(100),
circuitBreaker: joi.object().optional(),
}).required(),
replicationStatusProcessor: {
groupId: joi.string().required(),
Expand Down
5 changes: 4 additions & 1 deletion extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,10 @@ class QueueProcessor extends EventEmitter {
* replication
* @param {String} site - site name
* @param {MetricsProducer} mProducer - instance of metrics producer
* @param {Object} circuitBreakerConfig - breakbeat configuration
*/
constructor(topic, kafkaConfig, sourceConfig, destConfig, repConfig,
httpsConfig, internalHttpsConfig, site, mProducer) {
httpsConfig, internalHttpsConfig, site, mProducer, circuitBreakerConfig) {
super();
this.topic = topic;
this.kafkaConfig = kafkaConfig;
Expand All @@ -185,6 +186,7 @@ class QueueProcessor extends EventEmitter {
this.site = site;
this._mProducer = mProducer;
this.serviceName = constants.services.replicationQueueProcessor;
this.circuitBreakerConfig = circuitBreakerConfig;

this.echoMode = false;

Expand Down Expand Up @@ -419,6 +421,7 @@ class QueueProcessor extends EventEmitter {
concurrency: this.repConfig.queueProcessor.concurrency,
queueProcessor: this.processKafkaEntry.bind(this),
logConsumerMetricsIntervalS: this.repConfig.queueProcessor.logConsumerMetricsIntervalS,
circuitBreaker: this.circuitBreakerConfig,
});
this._consumer.on('error', () => { });
this._consumer.on('ready', () => {
Expand Down
3 changes: 2 additions & 1 deletion extensions/replication/queueProcessor/task.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ werelogs.configure({ level: config.log.logLevel,
const metricsProducer = new MetricsProducer(kafkaConfig, mConfig);
const queueProcessor = new QueueProcessor(
repConfig.topic, kafkaConfig, sourceConfig, destConfig, repConfig,
httpsConfig, internalHttpsConfig, site, metricsProducer
httpsConfig, internalHttpsConfig, site, metricsProducer,
repConfig.queueProcessor.circuitBreaker,
);

/**
Expand Down
Loading
Loading