Skip to content

Commit

Permalink
Fixed tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sonnes committed Nov 16, 2023
1 parent 2f16d32 commit ba05eef
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
2 changes: 1 addition & 1 deletion xkafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (c *Consumer) subscribe() error {
}

func (c *Consumer) unsubscribe() error {
c.kafka.Commit()
_, _ = c.kafka.Commit()

return c.kafka.Unsubscribe()
}
Expand Down
16 changes: 12 additions & 4 deletions xkafka/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ func TestNewConsumer(t *testing.T) {
assert.NotNil(t, consumer.config.errorHandler)

expectedConfig := kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "test-consumer",
"auto.offset.reset": "earliest",
"enable.auto.commit": true,
"bootstrap.servers": "localhost:9092",
"group.id": "test-consumer",
"auto.offset.reset": "earliest",
"enable.auto.commit": true,
"enable.auto.offset.store": false,
}

assert.EqualValues(t, expectedConfig, consumer.config.configMap)
Expand Down Expand Up @@ -117,6 +118,7 @@ func TestConsumerUnsubscribeError(t *testing.T) {
mockKafka.On("SubscribeTopics", []string(testTopics), mock.Anything).Return(nil)
mockKafka.On("Unsubscribe").Return(unsubError)
mockKafka.On("ReadMessage", testTimeout).Return(km, nil)
mockKafka.On("Commit").Return(nil, nil)

consumer.handler = handler

Expand All @@ -142,6 +144,7 @@ func TestConsumerHandleMessage(t *testing.T) {

mockKafka.On("SubscribeTopics", []string(testTopics), mock.Anything).Return(nil)
mockKafka.On("Unsubscribe").Return(nil)
mockKafka.On("Commit").Return(nil, nil)
mockKafka.On("ReadMessage", testTimeout).Return(km, nil)

consumer.handler = handler
Expand All @@ -164,6 +167,7 @@ func TestConsumerHandleMessageError(t *testing.T) {

mockKafka.On("SubscribeTopics", []string(testTopics), mock.Anything).Return(nil)
mockKafka.On("Unsubscribe").Return(nil)
mockKafka.On("Commit").Return(nil, nil)
mockKafka.On("ReadMessage", testTimeout).Return(km, nil)

consumer.handler = handler
Expand All @@ -187,6 +191,7 @@ func TestConsumerErrorCallback(t *testing.T) {

mockKafka.On("SubscribeTopics", []string(testTopics), mock.Anything).Return(nil)
mockKafka.On("Unsubscribe").Return(nil)
mockKafka.On("Commit").Return(nil, nil)
mockKafka.On("ReadMessage", testTimeout).Return(km, nil)

consumer.handler = handler
Expand Down Expand Up @@ -222,6 +227,7 @@ func TestConsumerReadMessageTimeout(t *testing.T) {

mockKafka.On("SubscribeTopics", []string(testTopics), mock.Anything).Return(nil)
mockKafka.On("Unsubscribe").Return(nil)
mockKafka.On("Commit").Return(nil, nil)
mockKafka.On("ReadMessage", testTimeout).Return(km, nil).Once()
mockKafka.On("ReadMessage", testTimeout).Return(nil, expect).Once()
mockKafka.On("ReadMessage", testTimeout).Return(km, nil)
Expand All @@ -243,6 +249,7 @@ func TestConsumerKafkaError(t *testing.T) {

mockKafka.On("SubscribeTopics", []string(testTopics), mock.Anything).Return(nil)
mockKafka.On("Unsubscribe").Return(nil)
mockKafka.On("Commit").Return(nil, nil)
mockKafka.On("ReadMessage", testTimeout).Return(km, nil).Once()
mockKafka.On("ReadMessage", testTimeout).Return(nil, expect).Once()

Expand All @@ -261,6 +268,7 @@ func TestConsumerMiddlewareExecutionOrder(t *testing.T) {

mockKafka.On("SubscribeTopics", []string(testTopics), mock.Anything).Return(nil)
mockKafka.On("Unsubscribe").Return(nil)
mockKafka.On("Commit").Return(nil, nil)
mockKafka.On("ReadMessage", testTimeout).Return(km, nil)

handler := HandlerFunc(func(ctx context.Context, msg *Message) error {
Expand Down

0 comments on commit ba05eef

Please sign in to comment.