Skip to content

Commit

Permalink
Fix lint errors
Browse files Browse the repository at this point in the history
  • Loading branch information
sonnes committed Nov 15, 2024
1 parent 2ff19fd commit 38bd50e
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 15 deletions.
4 changes: 1 addition & 3 deletions examples/xkafka/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ func batchHandler(tracker *Tracker) xkafka.BatchHandlerFunc {
return func(ctx context.Context, batch *xkafka.Batch) error {
err := tracker.SimulateWork()
if err != nil {
batch.AckFail(err)

return err
return batch.AckFail(err)
}

for _, msg := range batch.Messages {
Expand Down
1 change: 1 addition & 0 deletions examples/xkafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func publishMessages(messages []*xkafka.Message) {
xkafka.Brokers(brokers),
xkafka.ErrorHandler(func(err error) error {
log.Error().Err(err).Msg("")

return err
}),
)
Expand Down
41 changes: 29 additions & 12 deletions xkafka/batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,25 +90,27 @@ func (c *BatchConsumer) runSequential(ctx context.Context) (err error) {

batch := NewBatch()
timer := time.NewTimer(c.config.batchTimeout)

defer timer.Stop()

for {
select {
case <-ctx.Done():
if len(batch.Messages) > 0 {
if err := c.processBatch(ctx, batch); err != nil {
return err
}
if err := c.processBatch(ctx, batch); err != nil {
return err
}
return err

return nil

case <-timer.C:
if len(batch.Messages) > 0 {
if err := c.processBatch(ctx, batch); err != nil {
return err
}

batch = NewBatch()
}

timer.Reset(c.config.batchTimeout)

default:
Expand All @@ -122,6 +124,7 @@ func (c *BatchConsumer) runSequential(ctx context.Context) (err error) {
if ferr := c.config.errorHandler(err); ferr != nil {
return ferr
}

continue
}

Expand All @@ -132,7 +135,9 @@ func (c *BatchConsumer) runSequential(ctx context.Context) (err error) {
if err := c.processBatch(ctx, batch); err != nil {
return err
}

batch = NewBatch()

timer.Reset(c.config.batchTimeout)
}
}
Expand All @@ -145,19 +150,15 @@ func (c *BatchConsumer) runAsync(ctx context.Context) error {

batch := NewBatch()
timer := time.NewTimer(c.config.batchTimeout)

defer timer.Stop()

for {
select {
case <-ctx.Done():
st.Wait()

var err error

if len(batch.Messages) > 0 {
err = c.processBatch(ctx, batch)
}

err := c.processBatch(ctx, batch)
uerr := c.unsubscribe()
err = errors.Join(err, uerr)

Expand All @@ -173,6 +174,7 @@ func (c *BatchConsumer) runAsync(ctx context.Context) error {
c.processBatchAsync(ctx, batch, st, cancel)
batch = NewBatch()
}

timer.Reset(c.config.batchTimeout)

default:
Expand All @@ -196,13 +198,18 @@ func (c *BatchConsumer) runAsync(ctx context.Context) error {
if len(batch.Messages) >= c.config.batchSize {
c.processBatchAsync(ctx, batch, st, cancel)
batch = NewBatch()

timer.Reset(c.config.batchTimeout)
}
}
}
}

func (c *BatchConsumer) processBatch(ctx context.Context, batch *Batch) error {
if len(batch.Messages) == 0 {
return nil
}

err := c.handler.HandleBatch(ctx, batch)
if ferr := c.config.errorHandler(err); ferr != nil {
return ferr
Expand All @@ -211,11 +218,17 @@ func (c *BatchConsumer) processBatch(ctx context.Context, batch *Batch) error {
return c.storeBatch(batch)
}

func (c *BatchConsumer) processBatchAsync(ctx context.Context, batch *Batch, st *stream.Stream, cancel context.CancelCauseFunc) {
func (c *BatchConsumer) processBatchAsync(
ctx context.Context,
batch *Batch,
st *stream.Stream,
cancel context.CancelCauseFunc,
) {
st.Go(func() stream.Callback {
err := c.handler.HandleBatch(ctx, batch)
if ferr := c.config.errorHandler(err); ferr != nil {
cancel(ferr)

return func() {}
}

Expand All @@ -233,6 +246,7 @@ func (c *BatchConsumer) storeBatch(batch *Batch) error {
}

tps := batch.GroupMaxOffset()

_, err := c.kafka.StoreOffsets(tps)
if err != nil {
return err
Expand All @@ -252,6 +266,7 @@ func (c *BatchConsumer) concatMiddlewares(h BatchHandler) BatchHandler {
for i := len(c.middlewares) - 1; i >= 0; i-- {
h = c.middlewares[i].BatchMiddleware(h)
}

return h
}

Expand All @@ -261,10 +276,12 @@ func (c *BatchConsumer) subscribe() error {

func (c *BatchConsumer) unsubscribe() error {
_, _ = c.kafka.Commit()

return c.kafka.Unsubscribe()
}

func (c *BatchConsumer) close() error {
<-time.After(c.config.shutdownTimeout)

return c.kafka.Close()
}

0 comments on commit 38bd50e

Please sign in to comment.