From 82983ce3c02bde30a81e46a348f26dabcda26011 Mon Sep 17 00:00:00 2001 From: "Tony.Shao" Date: Thu, 14 Jan 2016 14:08:29 +0800 Subject: [PATCH] fix bug. make sure all data has consumed in successChan --- consumer.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/consumer.go b/consumer.go index eb84bc2..14a2e72 100644 --- a/consumer.go +++ b/consumer.go @@ -55,6 +55,7 @@ type Consumer struct { stopStreams chan bool close chan bool stopCleanup chan struct{} + wg sync.WaitGroup topicCount TopicsToNumStreams metrics *ConsumerMetrics @@ -869,7 +870,10 @@ func (c *Consumer) reflectPartitionOwnershipDecision(partitionOwnershipDecision successfullyOwnedPartitions := make([]*TopicAndPartition, 0) successChan := make(chan TopicAndPartition) + go func() { + c.wg.Add(1) + defer c.wg.Done() for tp := range successChan { successfullyOwnedPartitions = append(successfullyOwnedPartitions, &tp) } @@ -881,6 +885,7 @@ func (c *Consumer) reflectPartitionOwnershipDecision(partitionOwnershipDecision pool.Stop() close(successChan) + c.wg.Wait() if len(partitionOwnershipDecision) > len(successfullyOwnedPartitions) { if Logger.IsAllowed(WarnLevel) {