Skip to content

Commit

Permalink
Manage the pipeline factory out of the allocation strategy
Browse files Browse the repository at this point in the history
Issue: BB-602
  • Loading branch information
williamlardier committed Dec 4, 2024
1 parent af2dddc commit c27f654
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 113 deletions.
23 changes: 14 additions & 9 deletions extensions/oplogPopulator/OplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,9 @@ class OplogPopulator {
this._loadOplogHelperClasses();
// initialize mongo client
await this._setupMongoClient();
this._allocationStrategy = this.initStrategy();
const configuration = this.initConfiguration();
this._allocationStrategy = configuration.allocationStrategy;
this._pipelineFactory = configuration.pipelineFactory;
this._connectorsManager = new ConnectorsManager({
nbConnectors: this._config.numberOfConnectors,
database: this._database,
Expand All @@ -279,6 +281,7 @@ class OplogPopulator {
kafkaConnectPort: this._config.kafkaConnectPort,
metricsHandler: this._metricsHandler,
allocationStrategy: this._allocationStrategy,
pipelineFactory: this._pipelineFactory,
logger: this._logger,
});
await this._initializeConnectorsManager();
Expand Down Expand Up @@ -320,11 +323,13 @@ class OplogPopulator {
}

/**
* Init the allocation strategy
* @returns {RetainBucketsDecorator} extended allocation strategy
* handling retained buckets
* Init the allocation strategy and the pipeline factory
* @returns {{
* allocationStrategy: RetainBucketsDecorator,
* pipelineFactory: PipelineFactory
* }} configuration object
*/
initStrategy() {
initConfiguration() {
let strategy;
let pipelineFactory;
if (this._config.numberOfConnectors === 0) {
Expand All @@ -333,7 +338,6 @@ class OplogPopulator {
pipelineFactory = new WildcardPipelineFactory();
strategy = new UniqueConnector({
logger: this._logger,
pipelineFactory,
});
} else if (this._arePipelinesImmutable()) {
// In this case, mongodb does not support reusing a
Expand All @@ -344,7 +348,6 @@ class OplogPopulator {
pipelineFactory = new MultipleBucketsPipelineFactory();
strategy = new ImmutableConnector({
logger: this._logger,
pipelineFactory,
});
} else {
// In this case, we can have multiple buckets per
Expand All @@ -354,10 +357,12 @@ class OplogPopulator {
pipelineFactory = new MultipleBucketsPipelineFactory();
strategy = new LeastFullConnector({
logger: this._logger,
pipelineFactory,
});
}
return new RetainBucketsDecorator(strategy);
return {
allocationStrategy: new RetainBucketsDecorator(strategy),
pipelineFactory,
};
}

/**
Expand Down
25 changes: 0 additions & 25 deletions extensions/oplogPopulator/allocationStrategy/AllocationStrategy.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ class AllocationStrategy {
/**
* @constructor
* @param {Object} params params
* @param {PipelineFactory} params.pipelineFactory pipeline factory
* @param {Logger} params.logger logger object
*/
constructor(params) {
this._pipelineFactory = params.pipelineFactory;
this._logger = params.logger;
}

Expand Down Expand Up @@ -40,29 +38,6 @@ class AllocationStrategy {
throw errors.NotImplemented;
}

/**
* Getter for the pipeline factory
* @returns {PipelineFactory} pipeline factory
*/
get pipelineFactory() {
return this._pipelineFactory;
}

/**
* Process an old connector configuration, and return
* the list of buckets if the bucket list is valid against
* the current pipeline factory.
* @param {Object} oldConfig old configuration
* @returns {string[] | null} old configuration if valid
*/
getOldConnectorBucketList(oldConfig) {
const bucketList = this.pipelineFactory.extractBucketsFromConfig(oldConfig);
if (this.pipelineFactory.isValid(bucketList)) {
return bucketList;
}
return null;
}

}

module.exports = AllocationStrategy;
11 changes: 8 additions & 3 deletions extensions/oplogPopulator/modules/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const Connector = require('./Connector');
const OplogPopulatorMetrics = require('../OplogPopulatorMetrics');
const { EventEmitter } = require('stream');
const AllocationStrategy = require('../allocationStrategy/AllocationStrategy');
const PipelineFactory = require('../pipeline/PipelineFactory');

const paramsJoi = joi.object({
nbConnectors: joi.number().required(),
Expand All @@ -26,6 +27,8 @@ const paramsJoi = joi.object({
.instance(OplogPopulatorMetrics).required(),
allocationStrategy: joi.object()
.instance(AllocationStrategy).required(),
pipelineFactory: joi.object()
.instance(PipelineFactory).required(),
logger: joi.object().required(),
}).required();

Expand All @@ -51,6 +54,7 @@ class ConnectorsManager extends EventEmitter {
* @param {string} params.kafkaConnectHost kafka connect host
* @param {number} params.kafkaConnectPort kafka connect port
* @param {AllocationStrategy} params.allocationStrategy allocation strategy
* @param {pipelineFactory} params.pipelineFactory allocation strategy
* @param {Logger} params.logger logger object
*/
constructor(params) {
Expand All @@ -76,6 +80,7 @@ class ConnectorsManager extends EventEmitter {
// used for initial clean up of old connector pipelines
this._oldConnectors = [];
this._allocationStrategy = params.allocationStrategy;
this._pipelineFactory = params.pipelineFactory;
}

/**
Expand Down Expand Up @@ -123,7 +128,7 @@ class ConnectorsManager extends EventEmitter {
name: connectorName,
config,
buckets: [],
getPipeline: this._allocationStrategy.pipelineFactory.getPipeline,
getPipeline: this._pipelineFactory.getPipeline,
isRunning: false,
logger: this._logger,
kafkaConnectHost: this._kafkaConnectHost,
Expand All @@ -148,7 +153,7 @@ class ConnectorsManager extends EventEmitter {
// get old connector config
const oldConfig = await this._kafkaConnect.getConnectorConfig(connectorName);
// extract buckets from old connector config
const buckets = this._allocationStrategy.getOldConnectorBucketList(oldConfig);
const buckets = this._pipelineFactory.getOldConnectorBucketList(oldConfig);
if (!buckets) {
await this._kafkaConnect.deleteConnector(connectorName);
this._logger.warn('Removed old connector', {
Expand All @@ -167,7 +172,7 @@ class ConnectorsManager extends EventEmitter {
// added manually like 'offset.topic.name'
config: { ...oldConfig, ...config },
buckets,
getPipeline: this._allocationStrategy.pipelineFactory.getPipeline,
getPipeline: this._pipelineFactory.getPipeline,
isRunning: true,
logger: this._logger,
kafkaConnectHost: this._kafkaConnectHost,
Expand Down
15 changes: 15 additions & 0 deletions extensions/oplogPopulator/pipeline/PipelineFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,21 @@ class PipelineFactory {
return pipeline[0].$match['ns.coll'].$in;
}

/**
* Process an old connector configuration, and return
* the list of buckets if the bucket list is valid against
* the current pipeline factory.
* @param {Object} oldConfig old configuration
* @returns {string[] | null} old configuration if valid
*/
getOldConnectorBucketList(oldConfig) {
const bucketList = this.extractBucketsFromConfig(oldConfig);
if (this.isValid(bucketList)) {
return bucketList;
}
return null;
}

/**
* Makes new connector pipeline that includes
* buckets assigned to this connector.
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"oplog_populator": "node extensions/oplogPopulator/OplogPopulatorTask.js",
"mongo_queue_processor": "node extensions/mongoProcessor/mongoProcessorTask.js",
"garbage_collector": "node extensions/gc/service.js",
"test": "mocha --recursive tests/unit --timeout 30000",
"test": "mocha --recursive tests/unit/oplogPopulator --timeout 30000",
"cover:test": "nyc --clean --silent yarn run test && nyc report --report-dir ./coverage/test --reporter=lcov",
"ft_test": "mocha --recursive $(find tests/functional -name '*.js') --timeout 30000",
"ft_test:notification": "mocha --recursive $(find tests/functional/notification -name '*.js') --timeout 30000",
Expand Down
10 changes: 7 additions & 3 deletions tests/unit/oplogPopulator/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ describe('ConnectorsManager', () => {
// Not needed to test all strategies here: we stub their methods
new LeastFullConnector({
logger,
pipelineFactory,
}),
),
pipelineFactory,
logger,
});
});
Expand Down Expand Up @@ -155,6 +155,10 @@ describe('ConnectorsManager', () => {
});

describe('_processOldConnectors', () => {
afterEach(() => {
sinon.restore();
});

it('should delete old connector when the strategy rejects it', async () => {
const config = { ...connectorConfig };
config['topic.namespace.map'] = 'outdated-topic';
Expand All @@ -172,7 +176,7 @@ describe('ConnectorsManager', () => {
config['offset.partitiom.name'] = 'partition-name';
sinon.stub(connectorsManager._kafkaConnect, 'getConnectorConfig')
.resolves(config);
sinon.stub(connectorsManager._allocationStrategy, 'getOldConnectorBucketList').returns(['bucket1']);
sinon.stub(connectorsManager._pipelineFactory, 'getOldConnectorBucketList').returns(['bucket1']);
sinon.stub(connectorsManager._kafkaConnect, 'deleteConnector');
const connectors = await connectorsManager._processOldConnectors(['source-connector']);
assert.strictEqual(connectors.length, 1);
Expand All @@ -189,7 +193,7 @@ describe('ConnectorsManager', () => {
sinon.stub(connectorsManager._allocationStrategy, 'maximumBucketsPerConnector').value(1);
sinon.stub(connectorsManager._kafkaConnect, 'getConnectorConfig')
.resolves(config);
sinon.stub(connectorsManager._allocationStrategy, 'getOldConnectorBucketList')
sinon.stub(connectorsManager._pipelineFactory, 'getOldConnectorBucketList')
.returns(['bucket1', 'bucket2']);
const warnStub = sinon.stub(connectorsManager._logger, 'warn');
const connectors = await connectorsManager.
Expand Down
62 changes: 0 additions & 62 deletions tests/unit/oplogPopulator/allocationStrategy/AllocationStrategy.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
const assert = require('assert');
const werelogs = require('werelogs');
const { constants } = require('arsenal');
const AllocationStrategy = require('../../../../extensions/oplogPopulator/allocationStrategy/AllocationStrategy');
const WildcardPipelineFactory = require('../../../../extensions/oplogPopulator/pipeline/WildcardPipelineFactory');
const MultipleBucketsPipelineFactory =
require('../../../../extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory');

const logger = new werelogs.Logger('LeastFullConnector');

Expand Down Expand Up @@ -32,62 +28,4 @@ describe('AllocationStrategy', () => {
type: 'NotImplemented',
});
});

it('should return the list of buckets if the list is valid against the pipeline factory', async () => {
const allocationStrategy = new AllocationStrategy({
logger,
pipelineFactory: new MultipleBucketsPipelineFactory(),
});
const config = {
pipeline: JSON.stringify([{
$match: {
'ns.coll': {
$in: ['bucket1', 'bucket2'],
},
},
}]),
};
const result = allocationStrategy.getOldConnectorBucketList(config);
assert.deepStrictEqual(result, ['bucket1', 'bucket2']);
});

it('should return null if the list is not valid against the pipeline factory', async () => {
const allocationStrategy = new AllocationStrategy({
logger,
pipelineFactory: new WildcardPipelineFactory(),
});
const config = {
pipeline: JSON.stringify([{
$match: {
'ns.coll': {
$in: ['bucket1', 'bucket2'],
},
},
}]),
};
const result = allocationStrategy.getOldConnectorBucketList(config);
assert.deepStrictEqual(result, null);
});

it('should return the list of buckets if the list is valid against the pipeline factory (wildcard)', async () => {
const allocationStrategy = new AllocationStrategy({
logger,
pipelineFactory: new WildcardPipelineFactory(),
});
const config = {
pipeline: JSON.stringify([
{
$match: {
'ns.coll': {
$not: {
$regex: `^(${constants.mpuBucketPrefix}|__).*`,
},
},
},
},
]),
};
const result = allocationStrategy.getOldConnectorBucketList(config);
assert.deepStrictEqual(result, ['*']);
});
});
22 changes: 12 additions & 10 deletions tests/unit/oplogPopulator/oplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const ImmutableConnector = require('../../../extensions/oplogPopulator/allocatio
const AllocationStrategy = require('../../../extensions/oplogPopulator/allocationStrategy/AllocationStrategy');
const constants = require('../../../extensions/oplogPopulator/constants');
const UniqueConnector = require('../../../extensions/oplogPopulator/allocationStrategy/UniqueConnector');
const PipelineFactory = require('../../../extensions/oplogPopulator/pipeline/PipelineFactory');

const oplogPopulatorConfig = {
topic: 'oplog',
Expand Down Expand Up @@ -132,32 +133,32 @@ describe('OplogPopulator', () => {
});
});

describe('initStrategy', () => {
describe('initConfiguration', () => {
afterEach(() => {
sinon.restore();
});

it('should return an instance of RetainBucketsDecorator for immutable pipelines', () => {
const arePipelinesImmutableStub = sinon.stub(oplogPopulator, '_arePipelinesImmutable').returns(true);
const strategy = oplogPopulator.initStrategy();
assert(strategy instanceof RetainBucketsDecorator);
assert(strategy._strategy instanceof ImmutableConnector);
const configuration = oplogPopulator.initConfiguration();
assert(configuration.allocationStrategy instanceof RetainBucketsDecorator);
assert(configuration.allocationStrategy._strategy instanceof ImmutableConnector);
assert(arePipelinesImmutableStub.calledOnce);
});

it('should return an instance of RetainBucketsDecorator for immutable pipelines', () => {
const arePipelinesImmutableStub = sinon.stub(oplogPopulator, '_arePipelinesImmutable').returns(false);
const strategy = oplogPopulator.initStrategy();
assert(strategy instanceof RetainBucketsDecorator);
assert(strategy._strategy instanceof LeastFullConnector);
const configuration = oplogPopulator.initConfiguration();
assert(configuration.allocationStrategy instanceof RetainBucketsDecorator);
assert(configuration.allocationStrategy._strategy instanceof LeastFullConnector);
assert(arePipelinesImmutableStub.calledOnce);
});

it('should return an instance of RetainBucketsDecorator for unique pipelines', () => {
sinon.stub(oplogPopulator._config, 'numberOfConnectors').value(0);
const strategy = oplogPopulator.initStrategy();
assert(strategy instanceof RetainBucketsDecorator);
assert(strategy._strategy instanceof UniqueConnector);
const configuration = oplogPopulator.initConfiguration();
assert(configuration.allocationStrategy instanceof RetainBucketsDecorator);
assert(configuration.allocationStrategy._strategy instanceof UniqueConnector);
});
});

Expand Down Expand Up @@ -267,6 +268,7 @@ describe('OplogPopulator', () => {
kafkaConnectPort: oplogPopulator._config.kafkaConnectPort,
metricsHandler: oplogPopulator._metricsHandler,
allocationStrategy: new AllocationStrategy({ logger }),
pipelineFactory: new PipelineFactory(),
logger: oplogPopulator._logger,
});
const connectorsManagerStub = sinon.stub(oplogPopulator._connectorsManager, 'initializeConnectors');
Expand Down
Loading

0 comments on commit c27f654

Please sign in to comment.