diff --git a/tests/unit/oplogPopulator/Allocator.js b/tests/unit/oplogPopulator/Allocator.js index dba7cfdbed..3d822a2c03 100644 --- a/tests/unit/oplogPopulator/Allocator.js +++ b/tests/unit/oplogPopulator/Allocator.js @@ -13,6 +13,7 @@ const logger = new werelogs.Logger('Allocator'); const defaultConnectorParams = { config: {}, + isRunning: false, logger, kafkaConnectHost: '127.0.0.1', kafkaConnectPort: 8083, diff --git a/tests/unit/oplogPopulator/Connector.js b/tests/unit/oplogPopulator/Connector.js index f0a438c2a8..746b19bf61 100644 --- a/tests/unit/oplogPopulator/Connector.js +++ b/tests/unit/oplogPopulator/Connector.js @@ -26,6 +26,7 @@ describe('Connector', () => { name: 'example-connector', config: connectorConfig, buckets: [], + isRunning: false, kafkaConnectHost: '127.0.0.1', kafkaConnectPort: 8083, logger, @@ -40,11 +41,13 @@ describe('Connector', () => { it('Should spawn connector with correct pipeline', async () => { const createStub = sinon.stub(connector._kafkaConnect, 'createConnector') .resolves(); + assert.strictEqual(connector.isRunning, false); await connector.spawn(); assert(createStub.calledOnceWith({ name: 'example-connector', config: connectorConfig })); + assert.strictEqual(connector.isRunning, true); }); }); @@ -52,8 +55,11 @@ describe('Connector', () => { it('Should destroy connector', async () => { const deleteStub = sinon.stub(connector._kafkaConnect, 'deleteConnector') .resolves(); + connector._isRunning = true; + assert.strictEqual(connector.isRunning, true); await connector.destroy(); assert(deleteStub.calledOnceWith('example-connector')); + assert.strictEqual(connector.isRunning, false); }); }); diff --git a/tests/unit/oplogPopulator/ConnectorsManager.js b/tests/unit/oplogPopulator/ConnectorsManager.js index 2973603378..1c323209d3 100644 --- a/tests/unit/oplogPopulator/ConnectorsManager.js +++ b/tests/unit/oplogPopulator/ConnectorsManager.js @@ -58,18 +58,30 @@ const connectorConfig = { 'heartbeat.interval.ms': 10000, }; -const connector1 = new Connector({ - name: 'source-connector', - buckets: [], - config: connectorConfig, - logger, - kafkaConnectHost: '127.0.0.1', - kafkaConnectPort: 8083, -}); - describe('ConnectorsManager', () => { let connectorsManager; + let connector1; + + let connectorCreateStub; + let connectorDeleteStub; + let connectorUpdateStub; + beforeEach(() => { + connector1 = new Connector({ + name: 'source-connector', + buckets: [], + config: connectorConfig, + isRunning: true, + logger, + kafkaConnectHost: '127.0.0.1', + kafkaConnectPort: 8083, + }); + connectorCreateStub = sinon.stub(connector1._kafkaConnect, 'createConnector') + .resolves(); + connectorDeleteStub = sinon.stub(connector1._kafkaConnect, 'deleteConnector') + .resolves(); + connectorUpdateStub = sinon.stub(connector1._kafkaConnect, 'updateConnectorConfig') + .resolves(); connectorsManager = new ConnectorsManager({ nbConnectors: 1, database: 'metadata', @@ -85,7 +97,7 @@ describe('ConnectorsManager', () => { }); afterEach(() => { - sinon.restore(); + sinon.reset(); }); describe('_getDefaultConnectorConfiguration', () => { @@ -150,6 +162,7 @@ describe('ConnectorsManager', () => { assert.strictEqual(connectors[0].name, 'source-connector'); assert.strictEqual(connectors[0].config['offset.partitiom.name'], 'partition-name'); assert.strictEqual(connectors[0].config['topic.namespace.map'], '{"*":"oplog"}'); + assert.strictEqual(connectors[0].isRunning, true); }); }); @@ -178,5 +191,108 @@ describe('ConnectorsManager', () => { assert.deepEqual(connectorsManager._oldConnectors, []); }); }); + + describe('_spawnOrDestroyConnector', () => { + it('should destroy running connector when no buckets are configured', async () => { + connector1._isRunning = true; + connector1._buckets = new Set(); + const updated = await connectorsManager._spawnOrDestroyConnector(connector1); + assert.strictEqual(updated, true); + assert(connectorCreateStub.notCalled); + assert(connectorDeleteStub.calledOnceWith(connector1.name)); + }); + + it('should spawn a non running connector when buckets are configured', async () => { + connector1._isRunning = false; + connector1._buckets = new Set(['bucket1']); + const updated = await connectorsManager._spawnOrDestroyConnector(connector1); + assert.strictEqual(updated, true); + assert(connectorCreateStub.calledOnceWith({ + name: connector1.name, + config: connector1.config + })); + assert(connectorDeleteStub.notCalled); + }); + + it('should do nothing when a running connector has buckets', async () => { + connector1._isRunning = true; + connector1._buckets = new Set(['bucket1']); + const updated = await connectorsManager._spawnOrDestroyConnector(connector1); + assert.strictEqual(updated, false); + assert(connectorCreateStub.notCalled); + assert(connectorDeleteStub.notCalled); + }); + + it('should do nothing when a non running connector still has no buckets', async () => { + connector1._isRunning = false; + connector1._buckets = new Set(); + const updated = await connectorsManager._spawnOrDestroyConnector(connector1); + assert.strictEqual(updated, false); + assert(connectorCreateStub.notCalled); + assert(connectorDeleteStub.notCalled); + }); + }); + + describe('_updateConnectors', () => { + it('should update a running connector when its buckets changed', async () => { + connector1._isRunning = true; + connector1._state.bucketsGotModified = false; + connector1._buckets = new Set(['bucket1']); + connectorsManager._connectors = [connector1]; + connector1._buckets = new Set(['bucket1']); + connector1.addBucket('bucket2', false); + await connectorsManager._updateConnectors(); + assert(connectorCreateStub.notCalled); + assert(connectorDeleteStub.notCalled); + assert(connectorUpdateStub.calledOnceWith( + connector1.name, + connector1.config + )); + }); + it('should not update a running connector when its buckets didn\'t change', async () => { + connector1._isRunning = true; + connector1._state.bucketsGotModified = false; + connector1._buckets = new Set(['bucket1']); + connectorsManager._connectors = [connector1]; + await connectorsManager._updateConnectors(); + assert(connectorCreateStub.notCalled); + assert(connectorDeleteStub.notCalled); + assert(connectorUpdateStub.notCalled); + }); + it('should destroy a running connector if no buckets are assigned to it', async () => { + connector1._isRunning = true; + connector1._state.bucketsGotModified = false; + connector1._buckets = new Set([]); + connectorsManager._connectors = [connector1]; + await connectorsManager._updateConnectors(); + assert(connectorCreateStub.notCalled); + assert(connectorDeleteStub.calledOnceWith(connector1.name)); + assert(connectorUpdateStub.notCalled); + }); + it('should spawn a non running connector when buckets are assigned to it', async () => { + connector1._isRunning = false; + connector1._state.bucketsGotModified = false; + connector1._buckets = new Set([]); + connectorsManager._connectors = [connector1]; + connector1._buckets = new Set(['bucket1']); + await connectorsManager._updateConnectors(); + assert(connectorCreateStub.calledOnceWith({ + name: connector1.name, + config: connector1.config + })); + assert(connectorDeleteStub.notCalled); + assert(connectorUpdateStub.notCalled); + }); + it('should do nothing when a non running connector has not buckets', async () => { + connector1._isRunning = false; + connector1._state.bucketsGotModified = false; + connector1._buckets = new Set([]); + connectorsManager._connectors = [connector1]; + await connectorsManager._updateConnectors(); + assert(connectorCreateStub.notCalled); + assert(connectorDeleteStub.notCalled); + assert(connectorUpdateStub.notCalled); + }); + }); }); diff --git a/tests/unit/oplogPopulator/allocationStrategy/LeastFullConnector.js b/tests/unit/oplogPopulator/allocationStrategy/LeastFullConnector.js index e94f636844..c4f54d18b2 100644 --- a/tests/unit/oplogPopulator/allocationStrategy/LeastFullConnector.js +++ b/tests/unit/oplogPopulator/allocationStrategy/LeastFullConnector.js @@ -10,6 +10,7 @@ const logger = new werelogs.Logger('LeastFullConnector'); const defaultConnectorParams = { config: {}, + isRunning: true, logger, kafkaConnectHost: '127.0.0.1', kafkaConnectPort: 8083,