Skip to content

Commit

Permalink
test destroying kafka connect connectors when no bucket assigned to them
Browse files Browse the repository at this point in the history
Issue: BB-432
  • Loading branch information
Kerkesni committed Aug 17, 2023
1 parent 546e304 commit c8e700d
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 10 deletions.
1 change: 1 addition & 0 deletions tests/unit/oplogPopulator/Allocator.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const logger = new werelogs.Logger('Allocator');

const defaultConnectorParams = {
config: {},
isRunning: false,
logger,
kafkaConnectHost: '127.0.0.1',
kafkaConnectPort: 8083,
Expand Down
6 changes: 6 additions & 0 deletions tests/unit/oplogPopulator/Connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ describe('Connector', () => {
name: 'example-connector',
config: connectorConfig,
buckets: [],
isRunning: false,
kafkaConnectHost: '127.0.0.1',
kafkaConnectPort: 8083,
logger,
Expand All @@ -40,20 +41,25 @@ describe('Connector', () => {
it('Should spawn connector with correct pipeline', async () => {
const createStub = sinon.stub(connector._kafkaConnect, 'createConnector')
.resolves();
assert.strictEqual(connector.isRunning, false);
await connector.spawn();
assert(createStub.calledOnceWith({
name: 'example-connector',
config: connectorConfig
}));
assert.strictEqual(connector.isRunning, true);
});
});

describe('destroy', () => {
it('Should destroy connector', async () => {
const deleteStub = sinon.stub(connector._kafkaConnect, 'deleteConnector')
.resolves();
connector._isRunning = true;
assert.strictEqual(connector.isRunning, true);
await connector.destroy();
assert(deleteStub.calledOnceWith('example-connector'));
assert.strictEqual(connector.isRunning, false);
});
});

Expand Down
136 changes: 126 additions & 10 deletions tests/unit/oplogPopulator/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,30 @@ const connectorConfig = {
'heartbeat.interval.ms': 10000,
};

const connector1 = new Connector({
name: 'source-connector',
buckets: [],
config: connectorConfig,
logger,
kafkaConnectHost: '127.0.0.1',
kafkaConnectPort: 8083,
});

describe('ConnectorsManager', () => {
let connectorsManager;
let connector1;

let connectorCreateStub;
let connectorDeleteStub;
let connectorUpdateStub;

beforeEach(() => {
connector1 = new Connector({
name: 'source-connector',
buckets: [],
config: connectorConfig,
isRunning: true,
logger,
kafkaConnectHost: '127.0.0.1',
kafkaConnectPort: 8083,
});
connectorCreateStub = sinon.stub(connector1._kafkaConnect, 'createConnector')
.resolves();
connectorDeleteStub = sinon.stub(connector1._kafkaConnect, 'deleteConnector')
.resolves();
connectorUpdateStub = sinon.stub(connector1._kafkaConnect, 'updateConnectorConfig')
.resolves();
connectorsManager = new ConnectorsManager({
nbConnectors: 1,
database: 'metadata',
Expand All @@ -85,7 +97,7 @@ describe('ConnectorsManager', () => {
});

afterEach(() => {
sinon.restore();
sinon.reset();
});

describe('_getDefaultConnectorConfiguration', () => {
Expand Down Expand Up @@ -150,6 +162,7 @@ describe('ConnectorsManager', () => {
assert.strictEqual(connectors[0].name, 'source-connector');
assert.strictEqual(connectors[0].config['offset.partitiom.name'], 'partition-name');
assert.strictEqual(connectors[0].config['topic.namespace.map'], '{"*":"oplog"}');
assert.strictEqual(connectors[0].isRunning, true);
});
});

Expand Down Expand Up @@ -178,5 +191,108 @@ describe('ConnectorsManager', () => {
assert.deepEqual(connectorsManager._oldConnectors, []);
});
});

describe('_spawnOrDestroyConnector', () => {
it('should destroy running connector when no buckets are configured', async () => {
connector1._isRunning = true;
connector1._buckets = new Set();
const updated = await connectorsManager._spawnOrDestroyConnector(connector1);
assert.strictEqual(updated, true);
assert(connectorCreateStub.notCalled);
assert(connectorDeleteStub.calledOnceWith(connector1.name));
});

it('should spawn a non running connector when buckets are configured', async () => {
connector1._isRunning = false;
connector1._buckets = new Set(['bucket1']);
const updated = await connectorsManager._spawnOrDestroyConnector(connector1);
assert.strictEqual(updated, true);
assert(connectorCreateStub.calledOnceWith({
name: connector1.name,
config: connector1.config
}));
assert(connectorDeleteStub.notCalled);
});

it('should do nothing when a running connector has buckets', async () => {
connector1._isRunning = true;
connector1._buckets = new Set(['bucket1']);
const updated = await connectorsManager._spawnOrDestroyConnector(connector1);
assert.strictEqual(updated, false);
assert(connectorCreateStub.notCalled);
assert(connectorDeleteStub.notCalled);
});

it('should do nothing when a non running connector still has no buckets', async () => {
connector1._isRunning = false;
connector1._buckets = new Set();
const updated = await connectorsManager._spawnOrDestroyConnector(connector1);
assert.strictEqual(updated, false);
assert(connectorCreateStub.notCalled);
assert(connectorDeleteStub.notCalled);
});
});

describe('_updateConnectors', () => {
it('should update a running connector when its buckets changed', async () => {
connector1._isRunning = true;
connector1._state.bucketsGotModified = false;
connector1._buckets = new Set(['bucket1']);
connectorsManager._connectors = [connector1];
connector1._buckets = new Set(['bucket1']);
connector1.addBucket('bucket2', false);
await connectorsManager._updateConnectors();
assert(connectorCreateStub.notCalled);
assert(connectorDeleteStub.notCalled);
assert(connectorUpdateStub.calledOnceWith(
connector1.name,
connector1.config
));
});
it('should not update a running connector when its buckets didn\'t change', async () => {
connector1._isRunning = true;
connector1._state.bucketsGotModified = false;
connector1._buckets = new Set(['bucket1']);
connectorsManager._connectors = [connector1];
await connectorsManager._updateConnectors();
assert(connectorCreateStub.notCalled);
assert(connectorDeleteStub.notCalled);
assert(connectorUpdateStub.notCalled);
});
it('should destroy a running connector if no buckets are assigned to it', async () => {
connector1._isRunning = true;
connector1._state.bucketsGotModified = false;
connector1._buckets = new Set([]);
connectorsManager._connectors = [connector1];
await connectorsManager._updateConnectors();
assert(connectorCreateStub.notCalled);
assert(connectorDeleteStub.calledOnceWith(connector1.name));
assert(connectorUpdateStub.notCalled);
});
it('should spawn a non running connector when buckets are assigned to it', async () => {
connector1._isRunning = false;
connector1._state.bucketsGotModified = false;
connector1._buckets = new Set([]);
connectorsManager._connectors = [connector1];
connector1._buckets = new Set(['bucket1']);
await connectorsManager._updateConnectors();
assert(connectorCreateStub.calledOnceWith({
name: connector1.name,
config: connector1.config
}));
assert(connectorDeleteStub.notCalled);
assert(connectorUpdateStub.notCalled);
});
it('should do nothing when a non running connector has not buckets', async () => {
connector1._isRunning = false;
connector1._state.bucketsGotModified = false;
connector1._buckets = new Set([]);
connectorsManager._connectors = [connector1];
await connectorsManager._updateConnectors();
assert(connectorCreateStub.notCalled);
assert(connectorDeleteStub.notCalled);
assert(connectorUpdateStub.notCalled);
});
});
});

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const logger = new werelogs.Logger('LeastFullConnector');

const defaultConnectorParams = {
config: {},
isRunning: true,
logger,
kafkaConnectHost: '127.0.0.1',
kafkaConnectPort: 8083,
Expand Down

0 comments on commit c8e700d

Please sign in to comment.