Skip to content

Commit

Permalink
Merge pull request #171 from xiocode/master
Browse files Browse the repository at this point in the history
fix bug. make sure all data has consumed in successChan
  • Loading branch information
serejja committed Jan 14, 2016
2 parents 33e781a + 82983ce commit 8885c4f
Showing 1 changed file with 5 additions and 0 deletions.
5 changes: 5 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Consumer struct {
stopStreams chan bool
close chan bool
stopCleanup chan struct{}
wg sync.WaitGroup
topicCount TopicsToNumStreams

metrics *ConsumerMetrics
Expand Down Expand Up @@ -869,7 +870,10 @@ func (c *Consumer) reflectPartitionOwnershipDecision(partitionOwnershipDecision

successfullyOwnedPartitions := make([]*TopicAndPartition, 0)
successChan := make(chan TopicAndPartition)

go func() {
c.wg.Add(1)
defer c.wg.Done()
for tp := range successChan {
successfullyOwnedPartitions = append(successfullyOwnedPartitions, &tp)
}
Expand All @@ -881,6 +885,7 @@ func (c *Consumer) reflectPartitionOwnershipDecision(partitionOwnershipDecision

pool.Stop()
close(successChan)
c.wg.Wait()

if len(partitionOwnershipDecision) > len(successfullyOwnedPartitions) {
if Logger.IsAllowed(WarnLevel) {
Expand Down

0 comments on commit 8885c4f

Please sign in to comment.