Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BB-432: remove useless kafka connectors #2441

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions extensions/oplogPopulator/OplogPopulatorMetrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ class OplogPopulatorMetrics {
this.connectors = ZenkoMetrics.createGauge({
name: 's3_oplog_populator_connectors',
help: 'Total number of configured connectors',
labelNames: ['isOld'],
});
this.reconfigurationLag = ZenkoMetrics.createHistogram({
name: 's3_oplog_populator_reconfiguration_lag_seconds',
Expand Down Expand Up @@ -121,9 +120,7 @@ class OplogPopulatorMetrics {
*/
onConnectorsInstantiated(isOld, count = 1) {
try {
this.connectors.inc({
isOld,
}, count);
this.connectors.inc(count);
} catch (error) {
this._logger.error('An error occured while pushing metrics', {
method: 'OplogPopulatorMetrics.onConnectorsInstantiated',
Expand All @@ -132,6 +129,22 @@ class OplogPopulatorMetrics {
}
}

/**
* updates s3_oplog_populator_connectors metric
* when a connector is destroyed
* @returns {undefined}
*/
onConnectorDestroyed() {
try {
this.connectors.dec(1);
} catch (error) {
this._logger.error('An error occurred while pushing metrics', {
method: 'OplogPopulatorMetrics.onConnectorDestroyed',
error: error.message,
});
}
}

/**
* updates s3_oplog_populator_reconfiguration_lag_seconds metric
* @param {Connector} connector connector instance
Expand Down
11 changes: 11 additions & 0 deletions extensions/oplogPopulator/modules/Connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const connectorParams = joi.object({
name: joi.string().required(),
config: joi.object().required(),
buckets: joi.array().required(),
isRunning: joi.boolean().required(),
francoisferrand marked this conversation as resolved.
Show resolved Hide resolved
logger: joi.object().required(),
kafkaConnectHost: joi.string().required(),
kafkaConnectPort: joi.number().required(),
Expand All @@ -25,6 +26,7 @@ class Connector {
* @constructor
* @param {Object} params connector config
* @param {string} params.name connector name
* @param {Boolean} params.isRunning true if connector is running
* @param {Object} params.config Kafka-connect MongoDB source
* connector config
* @param {string[]} params.buckets buckets assigned to this connector
Expand All @@ -37,6 +39,7 @@ class Connector {
this._name = params.name;
this._config = params.config;
this._buckets = new Set(params.buckets);
this._isRunning = params.isRunning;
this._state = {
// Used to check if buckets assigned to this connector
// got modified from the last connector update
Expand Down Expand Up @@ -82,6 +85,12 @@ class Connector {
*/
get config() { return this._config; }

/**
* Getter for connector running state
* @returns {Boolean} connector running state
*/
get isRunning() { return this._isRunning; }

/**
* Calculate config size in bytes
* @returns {number} config size
Expand Down Expand Up @@ -111,6 +120,7 @@ class Connector {
name: this._name,
config: this._config,
});
this._isRunning = true;
} catch (err) {
this._logger.error('Error while spawning connector', {
method: 'Connector.spawn',
Expand All @@ -129,6 +139,7 @@ class Connector {
async destroy() {
try {
await this._kafkaConnect.deleteConnector(this._name);
this._isRunning = false;
} catch (err) {
this._logger.error('Error while destroying connector', {
method: 'Connector.destroy',
Expand Down
180 changes: 107 additions & 73 deletions extensions/oplogPopulator/modules/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ const paramsJoi = joi.object({

// Promisify async functions
const eachLimit = util.promisify(async.eachLimit);
const timesLimit = util.promisify(async.timesLimit);

/**
* @class ConnectorsManager
Expand Down Expand Up @@ -105,40 +104,24 @@ class ConnectorsManager {

/**
* Creates a connector
* @param {boolean} spawn should connector be spawned
* @returns {Promise|Connector} created connector
* @throws {InternalError}
* @returns {Connector} created connector
*/
async addConnector(spawn = true) {
try {
// generate connector name
const connectorName = this._generateConnectorName();
// get connector config
const config = this._getDefaultConnectorConfiguration(connectorName);
// initialize connector
const connector = new Connector({
name: connectorName,
config,
buckets: [],
logger: this._logger,
kafkaConnectHost: this._kafkaConnectHost,
kafkaConnectPort: this._kafkaConnectPort,
});
if (spawn) {
await connector.spawn();
}
this._logger.debug('Successfully created connector', {
method: 'ConnectorsManager.addConnector',
connector: connector.name
});
return connector;
} catch (err) {
this._logger.error('An error occurred while creating connector', {
method: 'ConnectorsManager.addConnector',
error: err.description || err.message,
});
throw errors.InternalError.customizeDescription(err.description);
}
addConnector() {
Kerkesni marked this conversation as resolved.
Show resolved Hide resolved
// generate connector name
const connectorName = this._generateConnectorName();
// get connector config
const config = this._getDefaultConnectorConfiguration(connectorName);
// initialize connector
const connector = new Connector({
name: connectorName,
config,
buckets: [],
isRunning: false,
logger: this._logger,
kafkaConnectHost: this._kafkaConnectHost,
kafkaConnectPort: this._kafkaConnectPort,
});
return connector;
Kerkesni marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down Expand Up @@ -177,6 +160,7 @@ class ConnectorsManager {
// added manually like 'offset.topic.name'
config: { ...oldConfig, ...config },
buckets,
isRunning: true,
logger: this._logger,
kafkaConnectHost: this._kafkaConnectHost,
kafkaConnectPort: this._kafkaConnectPort,
Expand Down Expand Up @@ -219,11 +203,10 @@ class ConnectorsManager {
}
// Add connectors if required number of connectors not reached
const nbConnectorsToAdd = this._nbConnectors - this._connectors.length;
await timesLimit(nbConnectorsToAdd, 10, async () => {
const newConnector = await this.addConnector();
for (let i = 0; i < nbConnectorsToAdd; i++) {
const newConnector = this.addConnector();
francoisferrand marked this conversation as resolved.
Show resolved Hide resolved
this._connectors.push(newConnector);
this._metricsHandler.onConnectorsInstantiated(false);
});
}
this._logger.info('Successfully initialized connectors', {
method: 'ConnectorsManager.initializeConnectors',
numberOfActiveConnectors: this._connectors.length
Expand All @@ -238,47 +221,98 @@ class ConnectorsManager {
}
}

/**
* Spawns a connector when buckets are configured for it and is not running,
* or destroys connector with no buckets configured
* @param {Connector} connector connector instance
* @returns {Promise<Boolean>} true if connector state changed
* @throws {InternalError}
*/
async _spawnOrDestroyConnector(connector) {
francoisferrand marked this conversation as resolved.
Show resolved Hide resolved
try {
if (connector.isRunning && connector.bucketCount === 0) {
await connector.destroy();
this._metricsHandler.onConnectorDestroyed();
this._logger.info('Successfully destroyed a connector', {
method: 'ConnectorsManager._spawnOrDestroyConnector',
connector: connector.name
});
return true;
} else if (!connector.isRunning && connector.bucketCount > 0) {
await connector.spawn();
this._metricsHandler.onConnectorsInstantiated(false);
this._logger.info('Successfully destroyed a connector', {
method: 'ConnectorsManager._spawnOrDestroyConnector',
connector: connector.name
});
francoisferrand marked this conversation as resolved.
Show resolved Hide resolved
return true;
} else if (connector.isRunning) {
return connector.updatePipeline(true);
}
Kerkesni marked this conversation as resolved.
Show resolved Hide resolved
return false;
} catch (err) {
this._logger.error('Error while spawning or destorying connector', {
method: 'ConnectorsManager._spawnOrDestroyConnector',
connector: this._name,
error: err.description || err.message,
});
throw errors.InternalError.customizeDescription(err.description);
}
}

/**
* Updates the connectors if their configuration changed
* @returns {undefined}
*/
async _updateConnectors() {
const connectorsStatus = {};
let connectorUpdateFailed = false;
await eachLimit(this._connectors, 10, async connector => {
const startTime = Date.now();
connectorsStatus[connector.name] = {
numberOfBuckets: connector.bucketCount,
updated: null,
};
try {
// check if we need to spawn/despawn the connector
// - connector is destroyed if no buckets are configured
// - connector is spawned when buckets are configured on it
// or update the connector when buckets configuration changed
const updated = await this._spawnOrDestroyConnector(connector);
connectorsStatus[connector.name].updated = updated;
if (updated) {
const delta = (Date.now() - startTime) / 1000;
this._metricsHandler.onConnectorReconfiguration(connector, true, delta);
}
} catch (err) {
connectorUpdateFailed = true;
connectorsStatus[connector.name].updated = false;
this._metricsHandler.onConnectorReconfiguration(connector, false);
this._logger.error('Failed to updated connector', {
method: 'ConnectorsManager._updateConnectors',
connector: connector.name,
bucketCount: connector.bucketCount,
error: err.description || err.message,
});
}
});
const logMessage = connectorUpdateFailed ? 'Failed to update some or all the connectors' :
'Successfully updated all the connectors';
const logFunction = connectorUpdateFailed ? this._logger.error.bind(this._logger) :
this._logger.info.bind(this._logger);
logFunction(logMessage, {
method: 'ConnectorsManager._updateConnectors',
connectorsStatus,
});
}

/**
* Schedules connector updates
* @returns {undefined}
*/
scheduleConnectorUpdates() {
schedule.scheduleJob(this._cronRule, async () => {
const connectorsStatus = {};
let connectorUpdateFailed = false;
await eachLimit(this._connectors, 10, async connector => {
const startTime = Date.now();
connectorsStatus[connector.name] = {
numberOfBuckets: connector.bucketCount,
updated: null,
};
try {
const updated = await connector.updatePipeline(true);
connectorsStatus[connector.name].updated = updated;
if (updated) {
const delta = (Date.now() - startTime) / 1000;
this._metricsHandler.onConnectorReconfiguration(connector, true, delta);
}
} catch (err) {
connectorUpdateFailed = true;
connectorsStatus[connector.name].updated = false;
this._metricsHandler.onConnectorReconfiguration(connector, false);
this._logger.error('Failed to updated connector', {
method: 'ConnectorsManager.scheduleConnectorUpdates',
connector: connector.name,
bucketCount: connector.bucketCount,
error: err.description || err.message,
});
}
});
const logMessage = connectorUpdateFailed ? 'Failed to update some or all the connectors' :
'Successfully updated all the connectors';
const logFunction = connectorUpdateFailed ? this._logger.error.bind(this._logger) :
this._logger.info.bind(this._logger);
logFunction(logMessage, {
method: 'ConnectorsManager.scheduleConnectorUpdates',
connectorsStatus,
});
await this._updateConnectors();
});
}

Expand Down
2 changes: 1 addition & 1 deletion monitoring/oplog-populator/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class Metrics:
]

CONNECTORS = metrics.CounterMetric(
's3_oplog_populator_connectors', 'isOld',
's3_oplog_populator_connectors',
job="${oplog_populator_job}", namespace="${namespace}"
)

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "backbeat",
"version": "8.6.26",
"version": "8.6.27",
"description": "Asynchronous queue and job manager",
"main": "index.js",
"scripts": {
Expand Down
1 change: 1 addition & 0 deletions tests/unit/oplogPopulator/Allocator.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const logger = new werelogs.Logger('Allocator');

const defaultConnectorParams = {
config: {},
isRunning: false,
logger,
kafkaConnectHost: '127.0.0.1',
kafkaConnectPort: 8083,
Expand Down
6 changes: 6 additions & 0 deletions tests/unit/oplogPopulator/Connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ describe('Connector', () => {
name: 'example-connector',
config: connectorConfig,
buckets: [],
isRunning: false,
kafkaConnectHost: '127.0.0.1',
kafkaConnectPort: 8083,
logger,
Expand All @@ -40,20 +41,25 @@ describe('Connector', () => {
it('Should spawn connector with correct pipeline', async () => {
const createStub = sinon.stub(connector._kafkaConnect, 'createConnector')
.resolves();
assert.strictEqual(connector.isRunning, false);
await connector.spawn();
assert(createStub.calledOnceWith({
name: 'example-connector',
config: connectorConfig
}));
assert.strictEqual(connector.isRunning, true);
});
});

describe('destroy', () => {
it('Should destroy connector', async () => {
const deleteStub = sinon.stub(connector._kafkaConnect, 'deleteConnector')
.resolves();
connector._isRunning = true;
assert.strictEqual(connector.isRunning, true);
await connector.destroy();
francoisferrand marked this conversation as resolved.
Show resolved Hide resolved
assert(deleteStub.calledOnceWith('example-connector'));
assert.strictEqual(connector.isRunning, false);
});
});

Expand Down
Loading
Loading