diff --git a/src/initializers/kafka/kafka.ts b/src/initializers/kafka/kafka.ts index 436ef286..442be4e7 100644 --- a/src/initializers/kafka/kafka.ts +++ b/src/initializers/kafka/kafka.ts @@ -2,7 +2,7 @@ import { getLogger } from '../log4js'; import requireInjected from '../../require-injected'; import { KafkaConfig } from '../../typings/kafka'; import type * as KafkajsType from 'kafkajs'; -import { flatten } from 'lodash'; +import { flatten, isEmpty } from 'lodash'; import * as uuid from 'uuid'; const { Kafka }: typeof KafkajsType = requireInjected('kafkajs'); @@ -81,10 +81,11 @@ export default class OrkaKafka { if (offsets.every(({ offset }) => offset === '-1')) { // groupId is not configured const oldOffsets = await admin.fetchOffsets({ groupId: oldGroupId, topic, resolveOffsets: false }); - await admin.setOffsets({ groupId, topic, partitions: oldOffsets }); - return { groupId, renamedFrom: oldGroupId, oldOffsets }; + const knownOffsets = oldOffsets?.filter(o => o.offset !== '-1'); + if (!isEmpty(knownOffsets)) await admin.setOffsets({ groupId, topic, partitions: knownOffsets }); + return { groupId, renamedFrom: oldGroupId, topic, oldOffsets: knownOffsets }; } else { - return { groupId, renamedFrom: oldGroupId, alreadyDeclared: true }; + return { groupId, renamedFrom: oldGroupId, topic, alreadyDeclared: true }; } }) .map(promise => diff --git a/test/initializers/kafka/kafka.test.ts b/test/initializers/kafka/kafka.test.ts index b9096888..7126afd3 100644 --- a/test/initializers/kafka/kafka.test.ts +++ b/test/initializers/kafka/kafka.test.ts @@ -321,27 +321,40 @@ describe('kafka class', () => { }, ssl: true }); - fetchOffsetsStub.onFirstCall().returns([{ parition: 0, offset: '-1' }]); - fetchOffsetsStub.onSecondCall().returns([{ parition: 0, offset: '5' }]); - fetchOffsetsStub.onThirdCall().returns([{ parition: 0, offset: '3' }]); - fetchOffsetsStub.returns([{ parition: 0, offset: '6' }]); + fetchOffsetsStub.onFirstCall().returns([{ partition: 0, offset: '-1' }]); + fetchOffsetsStub.onSecondCall().returns([{ partition: 0, offset: '5' }]); + fetchOffsetsStub.onThirdCall().returns([{ partition: 0, offset: '-1' }]); + fetchOffsetsStub.onCall(3).returns([ + { partition: 0, offset: '3' }, + { partition: 1, offset: '-1' } + ]); + fetchOffsetsStub.returns([{ partition: 0, offset: '-1' }]); const response = await kafka.renameGroupId([ { groupId: 'newGroupId', topic: 'topic', oldGroupId: 'oldGroupId' }, - { groupId: 'newGroupId2', topic: 'topic2', oldGroupId: 'oldGroupId2' } + { groupId: 'newGroupId2', topic: 'topic2', oldGroupId: 'oldGroupId2' }, + { groupId: 'newGroupId3', topic: 'topic3', oldGroupId: 'oldGroupId3' } ]); adminStub.args.should.eql([[]]); fetchOffsetsStub.args.should.eql([ [{ groupId: 'newGroupId', topic: 'topic', resolveOffsets: false }], [{ groupId: 'newGroupId2', topic: 'topic2', resolveOffsets: false }], - [{ groupId: 'oldGroupId', topic: 'topic', resolveOffsets: false }] + [{ groupId: 'newGroupId3', topic: 'topic3', resolveOffsets: false }], + [{ groupId: 'oldGroupId', topic: 'topic', resolveOffsets: false }], + [{ groupId: 'oldGroupId3', topic: 'topic3', resolveOffsets: false }] ]); setOffsetsStub.args.should.eql([ - [{ groupId: 'newGroupId', partitions: [{ offset: '3', parition: 0 }], topic: 'topic' }] + [{ groupId: 'newGroupId', partitions: [{ offset: '3', partition: 0 }], topic: 'topic' }] ]); response.should.eql([ - { groupId: 'newGroupId', oldOffsets: [{ offset: '3', parition: 0 }], renamedFrom: 'oldGroupId' }, - { alreadyDeclared: true, groupId: 'newGroupId2', renamedFrom: 'oldGroupId2' } + { + groupId: 'newGroupId', + oldOffsets: [{ offset: '3', partition: 0 }], + topic: 'topic', + renamedFrom: 'oldGroupId' + }, + { alreadyDeclared: true, groupId: 'newGroupId2', topic: 'topic2', renamedFrom: 'oldGroupId2' }, + { groupId: 'newGroupId3', oldOffsets: [], topic: 'topic3', renamedFrom: 'oldGroupId3' } ]); }); });