Skip to content

Commit

Permalink
Do not set offsets to -1 (#226)
Browse files Browse the repository at this point in the history
* Do not set offsets to -1

* Log also topic
  • Loading branch information
nikostoulas authored Feb 22, 2021
1 parent 9f6baa0 commit 1186b2b
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 13 deletions.
9 changes: 5 additions & 4 deletions src/initializers/kafka/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { getLogger } from '../log4js';
import requireInjected from '../../require-injected';
import { KafkaConfig } from '../../typings/kafka';
import type * as KafkajsType from 'kafkajs';
import { flatten } from 'lodash';
import { flatten, isEmpty } from 'lodash';
import * as uuid from 'uuid';

const { Kafka }: typeof KafkajsType = requireInjected('kafkajs');
Expand Down Expand Up @@ -81,10 +81,11 @@ export default class OrkaKafka {
if (offsets.every(({ offset }) => offset === '-1')) {
// groupId is not configured
const oldOffsets = await admin.fetchOffsets({ groupId: oldGroupId, topic, resolveOffsets: false });
await admin.setOffsets({ groupId, topic, partitions: oldOffsets });
return { groupId, renamedFrom: oldGroupId, oldOffsets };
const knownOffsets = oldOffsets?.filter(o => o.offset !== '-1');
if (!isEmpty(knownOffsets)) await admin.setOffsets({ groupId, topic, partitions: knownOffsets });
return { groupId, renamedFrom: oldGroupId, topic, oldOffsets: knownOffsets };
} else {
return { groupId, renamedFrom: oldGroupId, alreadyDeclared: true };
return { groupId, renamedFrom: oldGroupId, topic, alreadyDeclared: true };
}
})
.map(promise =>
Expand Down
31 changes: 22 additions & 9 deletions test/initializers/kafka/kafka.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,27 +321,40 @@ describe('kafka class', () => {
},
ssl: true
});
fetchOffsetsStub.onFirstCall().returns([{ parition: 0, offset: '-1' }]);
fetchOffsetsStub.onSecondCall().returns([{ parition: 0, offset: '5' }]);
fetchOffsetsStub.onThirdCall().returns([{ parition: 0, offset: '3' }]);
fetchOffsetsStub.returns([{ parition: 0, offset: '6' }]);
fetchOffsetsStub.onFirstCall().returns([{ partition: 0, offset: '-1' }]);
fetchOffsetsStub.onSecondCall().returns([{ partition: 0, offset: '5' }]);
fetchOffsetsStub.onThirdCall().returns([{ partition: 0, offset: '-1' }]);
fetchOffsetsStub.onCall(3).returns([
{ partition: 0, offset: '3' },
{ partition: 1, offset: '-1' }
]);
fetchOffsetsStub.returns([{ partition: 0, offset: '-1' }]);
const response = await kafka.renameGroupId([
{ groupId: 'newGroupId', topic: 'topic', oldGroupId: 'oldGroupId' },
{ groupId: 'newGroupId2', topic: 'topic2', oldGroupId: 'oldGroupId2' }
{ groupId: 'newGroupId2', topic: 'topic2', oldGroupId: 'oldGroupId2' },
{ groupId: 'newGroupId3', topic: 'topic3', oldGroupId: 'oldGroupId3' }
]);

adminStub.args.should.eql([[]]);
fetchOffsetsStub.args.should.eql([
[{ groupId: 'newGroupId', topic: 'topic', resolveOffsets: false }],
[{ groupId: 'newGroupId2', topic: 'topic2', resolveOffsets: false }],
[{ groupId: 'oldGroupId', topic: 'topic', resolveOffsets: false }]
[{ groupId: 'newGroupId3', topic: 'topic3', resolveOffsets: false }],
[{ groupId: 'oldGroupId', topic: 'topic', resolveOffsets: false }],
[{ groupId: 'oldGroupId3', topic: 'topic3', resolveOffsets: false }]
]);
setOffsetsStub.args.should.eql([
[{ groupId: 'newGroupId', partitions: [{ offset: '3', parition: 0 }], topic: 'topic' }]
[{ groupId: 'newGroupId', partitions: [{ offset: '3', partition: 0 }], topic: 'topic' }]
]);
response.should.eql([
{ groupId: 'newGroupId', oldOffsets: [{ offset: '3', parition: 0 }], renamedFrom: 'oldGroupId' },
{ alreadyDeclared: true, groupId: 'newGroupId2', renamedFrom: 'oldGroupId2' }
{
groupId: 'newGroupId',
oldOffsets: [{ offset: '3', partition: 0 }],
topic: 'topic',
renamedFrom: 'oldGroupId'
},
{ alreadyDeclared: true, groupId: 'newGroupId2', topic: 'topic2', renamedFrom: 'oldGroupId2' },
{ groupId: 'newGroupId3', oldOffsets: [], topic: 'topic3', renamedFrom: 'oldGroupId3' }
]);
});
});
Expand Down

0 comments on commit 1186b2b

Please sign in to comment.