Skip to content

Commit

Permalink
Rename group ids (#221)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikostoulas authored Feb 3, 2021
1 parent 0241565 commit ddd4635
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 10 deletions.
16 changes: 11 additions & 5 deletions examples/kafka-example/app.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { orka } = require('../../build');
const { orka, getKafka } = require('../../build');

orka({
beforeMiddleware: () => [
Expand All @@ -7,11 +7,17 @@ orka({
await next();
}
],
diamorphosis: { configFolder: './examples/kafka-example' },
routesPath: './examples/kafka-example/routes.js',
logoPath: './examples/simple-example/logo.txt',
beforeStart: () => {
diamorphosis: { configFolder: __dirname },
routesPath: __dirname + '/routes.js',
logoPath: __dirname + '/logo.txt',
beforeStart: async () => {
const config = require('./config');
const topic = config.kafka.consumer.topics.name;
// Will copy offsets from oldGroupId to the new one
await getKafka().renameGroupId([{ groupId: config.kafka.groupId, topic, oldGroupId: config.kafka.oldGroupId }]);

const KafkaHandler = require('./handler');
new KafkaHandler(getKafka(), { topic, fromBeginning: true });
console.log(`Going to start env: ${config.nodeEnv}`);
}
}).start();
5 changes: 3 additions & 2 deletions examples/kafka-example/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ module.exports = {
json: true
},
kafka: {
groupId: 'orka.example.consumer',
groupId: 'orka.example.pigasos',
oldGroupId: 'orka.example.consumer',
clientId: 'orka.example.producer',
brokers: ['localhost:9092'],
certificates: {
Expand All @@ -25,7 +26,7 @@ module.exports = {
consumer: {
topics: {
name: 'orka.example.test',
batchSize: 10
groupId: 'orka.example.consumer'
}
},
producer: {
Expand Down
4 changes: 2 additions & 2 deletions examples/kafka-example/routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ module.exports = {
const config = require('./config');
const kafka = getKafka();
const topic = config.kafka.consumer.topics.name;
const batchSize = config.kafka.consumer.topics.batchSize;
new KafkaHandler(kafka, { topic, logger: getLogger('test'), batchSize });
const groupId = config.kafka.consumer.topics.groupId;
new KafkaHandler(kafka, { topic, logger: getLogger('test'), fromBeginning: true, consumerOptions: { groupId } });
},
'/write': async (ctx, next) => {
const config = require('./config');
Expand Down
27 changes: 27 additions & 0 deletions src/initializers/kafka/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,33 @@ export default class OrkaKafka {
return admin;
}

public async renameGroupId(groupIds: { groupId: string; topic: string; oldGroupId: string }[]) {
const admin = await this.connectAdmin();
const renamings = await Promise.all(
groupIds
.map(async ({ groupId, topic, oldGroupId }) => {
const offsets = await admin.fetchOffsets({ groupId, topic, resolveOffsets: false });
if (offsets.every(({ offset }) => offset === '-1')) {
// groupId is not configured
const oldOffsets = await admin.fetchOffsets({ groupId: oldGroupId, topic, resolveOffsets: false });
await admin.setOffsets({ groupId, topic, partitions: oldOffsets });
return { groupId, renamedFrom: oldGroupId, oldOffsets };
} else {
return { groupId, renamedFrom: oldGroupId, alreadyDeclared: true };
}
})
.map(promise =>
promise.catch(e => {
logger.error(e);
return e;
})
)
);
await admin.disconnect();
logger.info(`Added groupIds with offsets: ${JSON.stringify(renamings)}`);
return renamings;
}

public async metadata() {
const admin = await this.connectAdmin();
const metadata = await admin.fetchTopicMetadata();
Expand Down
46 changes: 45 additions & 1 deletion test/initializers/kafka/kafka.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ describe('kafka class', () => {
let fetchTopicMetadataStub;
let createTopicsStub;
let kafkaStubReturn;
let fetchOffsetsStub;
let setOffsetsStub;
let Kafka: typeof KafkaType;

beforeEach(async function () {
Expand All @@ -24,11 +26,15 @@ describe('kafka class', () => {
consumerStub = sandbox.stub().returns({ connect: sandbox.stub() });
fetchTopicMetadataStub = sandbox.stub().resolves('metadata');
createTopicsStub = sandbox.stub().onFirstCall().resolves(true).resolves(false);
fetchOffsetsStub = sandbox.stub();
setOffsetsStub = sandbox.stub();
adminStub = sandbox.stub().returns({
connect: sandbox.stub(),
disconnect: sandbox.stub(),
fetchTopicMetadata: fetchTopicMetadataStub,
createTopics: createTopicsStub
createTopics: createTopicsStub,
fetchOffsets: fetchOffsetsStub,
setOffsets: setOffsetsStub
});
kafkaStubReturn = { producer: () => producerStub, consumer: consumerStub, admin: adminStub };
kafkaStub = sinon.stub().returns(kafkaStubReturn);
Expand Down Expand Up @@ -301,4 +307,42 @@ describe('kafka class', () => {
response.should.eql([{ foo: true }, { bar: false }, { test: false }]);
});
});

describe('renameGroupId', function () {
it('copies offsets from old groupId-topic combination', async function () {
const kafka = new Kafka({
sasl: { mechanism: 'scram-sha-256', password: 'foo', username: 'bar' },
groupId: 'groupId',
clientId: 'clientId',
brokers: ['broker-consumer'],
producer: {
brokers: ['broker-producer'],
sasl: { mechanism: 'scram-sha-256', password: 'foo-producer', username: 'bar' }
},
ssl: true
});
fetchOffsetsStub.onFirstCall().returns([{ parition: 0, offset: '-1' }]);
fetchOffsetsStub.onSecondCall().returns([{ parition: 0, offset: '5' }]);
fetchOffsetsStub.onThirdCall().returns([{ parition: 0, offset: '3' }]);
fetchOffsetsStub.returns([{ parition: 0, offset: '6' }]);
const response = await kafka.renameGroupId([
{ groupId: 'newGroupId', topic: 'topic', oldGroupId: 'oldGroupId' },
{ groupId: 'newGroupId2', topic: 'topic2', oldGroupId: 'oldGroupId2' }
]);

adminStub.args.should.eql([[]]);
fetchOffsetsStub.args.should.eql([
[{ groupId: 'newGroupId', topic: 'topic', resolveOffsets: false }],
[{ groupId: 'newGroupId2', topic: 'topic2', resolveOffsets: false }],
[{ groupId: 'oldGroupId', topic: 'topic', resolveOffsets: false }]
]);
setOffsetsStub.args.should.eql([
[{ groupId: 'newGroupId', partitions: [{ offset: '3', parition: 0 }], topic: 'topic' }]
]);
response.should.eql([
{ groupId: 'newGroupId', oldOffsets: [{ offset: '3', parition: 0 }], renamedFrom: 'oldGroupId' },
{ alreadyDeclared: true, groupId: 'newGroupId2', renamedFrom: 'oldGroupId2' }
]);
});
});
});

0 comments on commit ddd4635

Please sign in to comment.