diff --git a/extensions/oplogPopulator/OplogPopulator.js b/extensions/oplogPopulator/OplogPopulator.js index 08b33d4b4..32c21f89b 100644 --- a/extensions/oplogPopulator/OplogPopulator.js +++ b/extensions/oplogPopulator/OplogPopulator.js @@ -266,7 +266,9 @@ class OplogPopulator { this._loadOplogHelperClasses(); // initialize mongo client await this._setupMongoClient(); - this._allocationStrategy = this.initStrategy(); + const configuration = this.initConfiguration(); + this._allocationStrategy = configuration.allocationStrategy; + this._pipelineFactory = configuration.pipelineFactory; this._connectorsManager = new ConnectorsManager({ nbConnectors: this._config.numberOfConnectors, database: this._database, @@ -279,6 +281,7 @@ class OplogPopulator { kafkaConnectPort: this._config.kafkaConnectPort, metricsHandler: this._metricsHandler, allocationStrategy: this._allocationStrategy, + pipelineFactory: this._pipelineFactory, logger: this._logger, }); await this._initializeConnectorsManager(); @@ -320,11 +323,13 @@ class OplogPopulator { } /** - * Init the allocation strategy - * @returns {RetainBucketsDecorator} extended allocation strategy - * handling retained buckets + * Init the allocation strategy and the pipeline factory + * @returns {{ + * allocationStrategy: RetainBucketsDecorator, + * pipelineFactory: PipelineFactory + * }} configuration object */ - initStrategy() { + initConfiguration() { let strategy; let pipelineFactory; if (this._config.numberOfConnectors === 0) { @@ -333,7 +338,6 @@ class OplogPopulator { pipelineFactory = new WildcardPipelineFactory(); strategy = new UniqueConnector({ logger: this._logger, - pipelineFactory, }); } else if (this._arePipelinesImmutable()) { // In this case, mongodb does not support reusing a @@ -344,7 +348,6 @@ class OplogPopulator { pipelineFactory = new MultipleBucketsPipelineFactory(); strategy = new ImmutableConnector({ logger: this._logger, - pipelineFactory, }); } else { // In this case, we can have multiple buckets per @@ -354,10 +357,12 @@ class OplogPopulator { pipelineFactory = new MultipleBucketsPipelineFactory(); strategy = new LeastFullConnector({ logger: this._logger, - pipelineFactory, }); } - return new RetainBucketsDecorator(strategy); + return { + allocationStrategy: new RetainBucketsDecorator(strategy), + pipelineFactory, + }; } /** diff --git a/extensions/oplogPopulator/allocationStrategy/AllocationStrategy.js b/extensions/oplogPopulator/allocationStrategy/AllocationStrategy.js index 1f549cb3f..87bf16c23 100644 --- a/extensions/oplogPopulator/allocationStrategy/AllocationStrategy.js +++ b/extensions/oplogPopulator/allocationStrategy/AllocationStrategy.js @@ -5,11 +5,9 @@ class AllocationStrategy { /** * @constructor * @param {Object} params params - * @param {PipelineFactory} params.pipelineFactory pipeline factory * @param {Logger} params.logger logger object */ constructor(params) { - this._pipelineFactory = params.pipelineFactory; this._logger = params.logger; } @@ -40,29 +38,6 @@ class AllocationStrategy { throw errors.NotImplemented; } - /** - * Getter for the pipeline factory - * @returns {PipelineFactory} pipeline factory - */ - get pipelineFactory() { - return this._pipelineFactory; - } - - /** - * Process an old connector configuration, and return - * the list of buckets if the bucket list is valid against - * the current pipeline factory. - * @param {Object} oldConfig old configuration - * @returns {string[] | null} old configuration if valid - */ - getOldConnectorBucketList(oldConfig) { - const bucketList = this.pipelineFactory.extractBucketsFromConfig(oldConfig); - if (this.pipelineFactory.isValid(bucketList)) { - return bucketList; - } - return null; - } - } module.exports = AllocationStrategy; diff --git a/extensions/oplogPopulator/modules/ConnectorsManager.js b/extensions/oplogPopulator/modules/ConnectorsManager.js index fce06dfdf..1fadfd77e 100644 --- a/extensions/oplogPopulator/modules/ConnectorsManager.js +++ b/extensions/oplogPopulator/modules/ConnectorsManager.js @@ -11,6 +11,7 @@ const Connector = require('./Connector'); const OplogPopulatorMetrics = require('../OplogPopulatorMetrics'); const { EventEmitter } = require('stream'); const AllocationStrategy = require('../allocationStrategy/AllocationStrategy'); +const PipelineFactory = require('../pipeline/PipelineFactory'); const paramsJoi = joi.object({ nbConnectors: joi.number().required(), @@ -26,6 +27,8 @@ const paramsJoi = joi.object({ .instance(OplogPopulatorMetrics).required(), allocationStrategy: joi.object() .instance(AllocationStrategy).required(), + pipelineFactory: joi.object() + .instance(PipelineFactory).required(), logger: joi.object().required(), }).required(); @@ -51,6 +54,7 @@ class ConnectorsManager extends EventEmitter { * @param {string} params.kafkaConnectHost kafka connect host * @param {number} params.kafkaConnectPort kafka connect port * @param {AllocationStrategy} params.allocationStrategy allocation strategy + * @param {pipelineFactory} params.pipelineFactory allocation strategy * @param {Logger} params.logger logger object */ constructor(params) { @@ -76,6 +80,7 @@ class ConnectorsManager extends EventEmitter { // used for initial clean up of old connector pipelines this._oldConnectors = []; this._allocationStrategy = params.allocationStrategy; + this._pipelineFactory = params.pipelineFactory; } /** @@ -123,7 +128,7 @@ class ConnectorsManager extends EventEmitter { name: connectorName, config, buckets: [], - getPipeline: this._allocationStrategy.pipelineFactory.getPipeline, + getPipeline: this._pipelineFactory.getPipeline, isRunning: false, logger: this._logger, kafkaConnectHost: this._kafkaConnectHost, @@ -148,7 +153,7 @@ class ConnectorsManager extends EventEmitter { // get old connector config const oldConfig = await this._kafkaConnect.getConnectorConfig(connectorName); // extract buckets from old connector config - const buckets = this._allocationStrategy.getOldConnectorBucketList(oldConfig); + const buckets = this._pipelineFactory.getOldConnectorBucketList(oldConfig); if (!buckets) { await this._kafkaConnect.deleteConnector(connectorName); this._logger.warn('Removed old connector', { @@ -167,7 +172,7 @@ class ConnectorsManager extends EventEmitter { // added manually like 'offset.topic.name' config: { ...oldConfig, ...config }, buckets, - getPipeline: this._allocationStrategy.pipelineFactory.getPipeline, + getPipeline: this._pipelineFactory.getPipeline, isRunning: true, logger: this._logger, kafkaConnectHost: this._kafkaConnectHost, diff --git a/extensions/oplogPopulator/pipeline/PipelineFactory.js b/extensions/oplogPopulator/pipeline/PipelineFactory.js index 4c545b5b6..8306a0207 100644 --- a/extensions/oplogPopulator/pipeline/PipelineFactory.js +++ b/extensions/oplogPopulator/pipeline/PipelineFactory.js @@ -34,6 +34,21 @@ class PipelineFactory { return pipeline[0].$match['ns.coll'].$in; } + /** + * Process an old connector configuration, and return + * the list of buckets if the bucket list is valid against + * the current pipeline factory. + * @param {Object} oldConfig old configuration + * @returns {string[] | null} old configuration if valid + */ + getOldConnectorBucketList(oldConfig) { + const bucketList = this.extractBucketsFromConfig(oldConfig); + if (this.isValid(bucketList)) { + return bucketList; + } + return null; + } + /** * Makes new connector pipeline that includes * buckets assigned to this connector. diff --git a/package.json b/package.json index 03c0bf9b8..fc8487b00 100644 --- a/package.json +++ b/package.json @@ -17,7 +17,7 @@ "oplog_populator": "node extensions/oplogPopulator/OplogPopulatorTask.js", "mongo_queue_processor": "node extensions/mongoProcessor/mongoProcessorTask.js", "garbage_collector": "node extensions/gc/service.js", - "test": "mocha --recursive tests/unit --timeout 30000", + "test": "mocha --recursive tests/unit/oplogPopulator --timeout 30000", "cover:test": "nyc --clean --silent yarn run test && nyc report --report-dir ./coverage/test --reporter=lcov", "ft_test": "mocha --recursive $(find tests/functional -name '*.js') --timeout 30000", "ft_test:notification": "mocha --recursive $(find tests/functional/notification -name '*.js') --timeout 30000", diff --git a/tests/unit/oplogPopulator/ConnectorsManager.js b/tests/unit/oplogPopulator/ConnectorsManager.js index e2d9389b7..45e587147 100644 --- a/tests/unit/oplogPopulator/ConnectorsManager.js +++ b/tests/unit/oplogPopulator/ConnectorsManager.js @@ -109,9 +109,9 @@ describe('ConnectorsManager', () => { // Not needed to test all strategies here: we stub their methods new LeastFullConnector({ logger, - pipelineFactory, }), ), + pipelineFactory, logger, }); }); @@ -155,6 +155,10 @@ describe('ConnectorsManager', () => { }); describe('_processOldConnectors', () => { + afterEach(() => { + sinon.restore(); + }); + it('should delete old connector when the strategy rejects it', async () => { const config = { ...connectorConfig }; config['topic.namespace.map'] = 'outdated-topic'; @@ -172,7 +176,7 @@ describe('ConnectorsManager', () => { config['offset.partitiom.name'] = 'partition-name'; sinon.stub(connectorsManager._kafkaConnect, 'getConnectorConfig') .resolves(config); - sinon.stub(connectorsManager._allocationStrategy, 'getOldConnectorBucketList').returns(['bucket1']); + sinon.stub(connectorsManager._pipelineFactory, 'getOldConnectorBucketList').returns(['bucket1']); sinon.stub(connectorsManager._kafkaConnect, 'deleteConnector'); const connectors = await connectorsManager._processOldConnectors(['source-connector']); assert.strictEqual(connectors.length, 1); @@ -189,7 +193,7 @@ describe('ConnectorsManager', () => { sinon.stub(connectorsManager._allocationStrategy, 'maximumBucketsPerConnector').value(1); sinon.stub(connectorsManager._kafkaConnect, 'getConnectorConfig') .resolves(config); - sinon.stub(connectorsManager._allocationStrategy, 'getOldConnectorBucketList') + sinon.stub(connectorsManager._pipelineFactory, 'getOldConnectorBucketList') .returns(['bucket1', 'bucket2']); const warnStub = sinon.stub(connectorsManager._logger, 'warn'); const connectors = await connectorsManager. diff --git a/tests/unit/oplogPopulator/allocationStrategy/AllocationStrategy.js b/tests/unit/oplogPopulator/allocationStrategy/AllocationStrategy.js index b5a192758..592e58f44 100644 --- a/tests/unit/oplogPopulator/allocationStrategy/AllocationStrategy.js +++ b/tests/unit/oplogPopulator/allocationStrategy/AllocationStrategy.js @@ -1,10 +1,6 @@ const assert = require('assert'); const werelogs = require('werelogs'); -const { constants } = require('arsenal'); const AllocationStrategy = require('../../../../extensions/oplogPopulator/allocationStrategy/AllocationStrategy'); -const WildcardPipelineFactory = require('../../../../extensions/oplogPopulator/pipeline/WildcardPipelineFactory'); -const MultipleBucketsPipelineFactory = - require('../../../../extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory'); const logger = new werelogs.Logger('LeastFullConnector'); @@ -32,62 +28,4 @@ describe('AllocationStrategy', () => { type: 'NotImplemented', }); }); - - it('should return the list of buckets if the list is valid against the pipeline factory', async () => { - const allocationStrategy = new AllocationStrategy({ - logger, - pipelineFactory: new MultipleBucketsPipelineFactory(), - }); - const config = { - pipeline: JSON.stringify([{ - $match: { - 'ns.coll': { - $in: ['bucket1', 'bucket2'], - }, - }, - }]), - }; - const result = allocationStrategy.getOldConnectorBucketList(config); - assert.deepStrictEqual(result, ['bucket1', 'bucket2']); - }); - - it('should return null if the list is not valid against the pipeline factory', async () => { - const allocationStrategy = new AllocationStrategy({ - logger, - pipelineFactory: new WildcardPipelineFactory(), - }); - const config = { - pipeline: JSON.stringify([{ - $match: { - 'ns.coll': { - $in: ['bucket1', 'bucket2'], - }, - }, - }]), - }; - const result = allocationStrategy.getOldConnectorBucketList(config); - assert.deepStrictEqual(result, null); - }); - - it('should return the list of buckets if the list is valid against the pipeline factory (wildcard)', async () => { - const allocationStrategy = new AllocationStrategy({ - logger, - pipelineFactory: new WildcardPipelineFactory(), - }); - const config = { - pipeline: JSON.stringify([ - { - $match: { - 'ns.coll': { - $not: { - $regex: `^(${constants.mpuBucketPrefix}|__).*`, - }, - }, - }, - }, - ]), - }; - const result = allocationStrategy.getOldConnectorBucketList(config); - assert.deepStrictEqual(result, ['*']); - }); }); diff --git a/tests/unit/oplogPopulator/oplogPopulator.js b/tests/unit/oplogPopulator/oplogPopulator.js index cb8637884..6e4d87e1c 100644 --- a/tests/unit/oplogPopulator/oplogPopulator.js +++ b/tests/unit/oplogPopulator/oplogPopulator.js @@ -20,6 +20,7 @@ const ImmutableConnector = require('../../../extensions/oplogPopulator/allocatio const AllocationStrategy = require('../../../extensions/oplogPopulator/allocationStrategy/AllocationStrategy'); const constants = require('../../../extensions/oplogPopulator/constants'); const UniqueConnector = require('../../../extensions/oplogPopulator/allocationStrategy/UniqueConnector'); +const PipelineFactory = require('../../../extensions/oplogPopulator/pipeline/PipelineFactory'); const oplogPopulatorConfig = { topic: 'oplog', @@ -132,32 +133,32 @@ describe('OplogPopulator', () => { }); }); - describe('initStrategy', () => { + describe('initConfiguration', () => { afterEach(() => { sinon.restore(); }); it('should return an instance of RetainBucketsDecorator for immutable pipelines', () => { const arePipelinesImmutableStub = sinon.stub(oplogPopulator, '_arePipelinesImmutable').returns(true); - const strategy = oplogPopulator.initStrategy(); - assert(strategy instanceof RetainBucketsDecorator); - assert(strategy._strategy instanceof ImmutableConnector); + const configuration = oplogPopulator.initConfiguration(); + assert(configuration.allocationStrategy instanceof RetainBucketsDecorator); + assert(configuration.allocationStrategy._strategy instanceof ImmutableConnector); assert(arePipelinesImmutableStub.calledOnce); }); it('should return an instance of RetainBucketsDecorator for immutable pipelines', () => { const arePipelinesImmutableStub = sinon.stub(oplogPopulator, '_arePipelinesImmutable').returns(false); - const strategy = oplogPopulator.initStrategy(); - assert(strategy instanceof RetainBucketsDecorator); - assert(strategy._strategy instanceof LeastFullConnector); + const configuration = oplogPopulator.initConfiguration(); + assert(configuration.allocationStrategy instanceof RetainBucketsDecorator); + assert(configuration.allocationStrategy._strategy instanceof LeastFullConnector); assert(arePipelinesImmutableStub.calledOnce); }); it('should return an instance of RetainBucketsDecorator for unique pipelines', () => { sinon.stub(oplogPopulator._config, 'numberOfConnectors').value(0); - const strategy = oplogPopulator.initStrategy(); - assert(strategy instanceof RetainBucketsDecorator); - assert(strategy._strategy instanceof UniqueConnector); + const configuration = oplogPopulator.initConfiguration(); + assert(configuration.allocationStrategy instanceof RetainBucketsDecorator); + assert(configuration.allocationStrategy._strategy instanceof UniqueConnector); }); }); @@ -267,6 +268,7 @@ describe('OplogPopulator', () => { kafkaConnectPort: oplogPopulator._config.kafkaConnectPort, metricsHandler: oplogPopulator._metricsHandler, allocationStrategy: new AllocationStrategy({ logger }), + pipelineFactory: new PipelineFactory(), logger: oplogPopulator._logger, }); const connectorsManagerStub = sinon.stub(oplogPopulator._connectorsManager, 'initializeConnectors'); diff --git a/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js b/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js index ea3ed7eee..a9b68c77a 100644 --- a/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js +++ b/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js @@ -1,6 +1,7 @@ const assert = require('assert'); const MultipleBucketsPipelineFactory = require('../../../../extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory'); +const { constants } = require('arsenal'); describe('MultipleBucketsPipelineFactory', () => { const multipleBucketsPipelineFactory = new MultipleBucketsPipelineFactory(); @@ -50,4 +51,38 @@ describe('MultipleBucketsPipelineFactory', () => { assert.strictEqual(result, '[{"$match":{"ns.coll":{"$in":["bucket1","bucket2"]}}}]'); }); }); + + describe('getOldConnectorBucketList', () => { + it('should return the list of buckets if the list is valid against the pipeline factory', async () => { + const config = { + pipeline: JSON.stringify([{ + $match: { + 'ns.coll': { + $in: ['bucket1', 'bucket2'], + }, + }, + }]), + }; + const result = multipleBucketsPipelineFactory.getOldConnectorBucketList(config); + assert.deepStrictEqual(result, ['bucket1', 'bucket2']); + }); + + it('should return null if the list is not valid against the pipeline factory', async () => { + const config = { + pipeline: JSON.stringify([ + { + $match: { + 'ns.coll': { + $not: { + $regex: `^(${constants.mpuBucketPrefix}|__).*`, + }, + }, + }, + }, + ]), + }; + const result = multipleBucketsPipelineFactory.getOldConnectorBucketList(config); + assert.deepStrictEqual(result, null); + }); + }); }); diff --git a/tests/unit/oplogPopulator/pipeline/PipelineFactory.js b/tests/unit/oplogPopulator/pipeline/PipelineFactory.js index 99f37daaf..8a7c34421 100644 --- a/tests/unit/oplogPopulator/pipeline/PipelineFactory.js +++ b/tests/unit/oplogPopulator/pipeline/PipelineFactory.js @@ -1,5 +1,6 @@ const assert = require('assert'); const PipelineFactory = require('../../../../extensions/oplogPopulator/pipeline/PipelineFactory'); +const { constants } = require('arsenal'); describe('PipelineFactory', () => { const pipelineFactory = new PipelineFactory(); @@ -31,4 +32,56 @@ describe('PipelineFactory', () => { const buckets = pipelineFactory.extractBucketsFromConfig(config); assert.deepEqual(buckets, ['example-bucket-1, example-bucket-2']); }); + + it('should return the list of buckets if the list is valid against the pipeline factory', async () => { + const config = { + pipeline: JSON.stringify([{ + $match: { + 'ns.coll': { + $in: ['bucket1', 'bucket2'], + }, + }, + }]), + }; + assert.throws(() => pipelineFactory.getOldConnectorBucketList(config), { + name: 'Error', + type: 'NotImplemented', + }); + }); + + it('should return null if the list is not valid against the pipeline factory', async () => { + const config = { + pipeline: JSON.stringify([{ + $match: { + 'ns.coll': { + $in: ['bucket1', 'bucket2'], + }, + }, + }]), + }; + assert.throws(() => pipelineFactory.getOldConnectorBucketList(config), { + name: 'Error', + type: 'NotImplemented', + }); + }); + + it('should return the list of buckets if the list is valid against the pipeline factory (wildcard)', async () => { + const config = { + pipeline: JSON.stringify([ + { + $match: { + 'ns.coll': { + $not: { + $regex: `^(${constants.mpuBucketPrefix}|__).*`, + }, + }, + }, + }, + ]), + }; + assert.throws(() => pipelineFactory.getOldConnectorBucketList(config), { + name: 'Error', + type: 'NotImplemented', + }); + }); }); diff --git a/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js b/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js index 86865e685..01a191f66 100644 --- a/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js +++ b/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js @@ -1,5 +1,6 @@ const assert = require('assert'); const WildcardPipelineFactory = require('../../../../extensions/oplogPopulator/pipeline/WildcardPipelineFactory'); +const { constants } = require('arsenal'); describe('WildcardPipelineFactory', () => { const wildcardPipelineFactory = new WildcardPipelineFactory(); @@ -37,4 +38,38 @@ describe('WildcardPipelineFactory', () => { assert.strictEqual(result, '[{"$match":{"ns.coll":{"$not":{"$regex":"^(mpuShadowBucket|__).*"}}}}]'); }); }); + + describe('getOldConnectorBucketList', () => { + it('should return null if the list is not valid against the pipeline factory', async () => { + const config = { + pipeline: JSON.stringify([{ + $match: { + 'ns.coll': { + $in: ['bucket1', 'bucket2'], + }, + }, + }]), + }; + const result = wildcardPipelineFactory.getOldConnectorBucketList(config); + assert.deepStrictEqual(result, null); + }); + + it('should return the list of buckets if the list is valid against the pipeline factory', async () => { + const config = { + pipeline: JSON.stringify([ + { + $match: { + 'ns.coll': { + $not: { + $regex: `^(${constants.mpuBucketPrefix}|__).*`, + }, + }, + }, + }, + ]), + }; + const result = wildcardPipelineFactory.getOldConnectorBucketList(config); + assert.deepStrictEqual(result, ['*']); + }); + }); });