Skip to content

Commit

Permalink
Refactored to remove ManualOffset
Browse files Browse the repository at this point in the history
  • Loading branch information
sonnes committed Nov 10, 2023
1 parent bfe8a16 commit 2f16d32
Show file tree
Hide file tree
Showing 16 changed files with 73 additions and 1,158 deletions.
29 changes: 0 additions & 29 deletions e2e/xkafka/README.md

This file was deleted.

31 changes: 0 additions & 31 deletions e2e/xkafka/go.mod

This file was deleted.

483 changes: 0 additions & 483 deletions e2e/xkafka/go.sum

This file was deleted.

42 changes: 0 additions & 42 deletions e2e/xkafka/main.go

This file was deleted.

27 changes: 17 additions & 10 deletions examples/xkafka/README.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
Examples of how to use the xkafka package to read and write messages to a Kafka topic using `xkafka.Consumer` and `xkafka.Producer`.

This example shows how to use the xkafka package to read and write messages to a Kafka topic using `xkafka.Consumer` and `xkafka.Producer`.
## Running Kafka

## Running xkafka
Start a Kafka broker using the provided `docker-compose.yml` file:

To run the example, first start a Kafka broker.:
```bash
$ docker-compose up -d
```

## Scenarios

### Sequential Consumer

This is the default behavior of the consumer. The consumer will process messages sequentially and commit the offset based on the `auto.commit.interval.ms` configuration.

```bash
$ docker run -p 2181:2181 -p 9092:9092 \
--env ADVERTISED_HOST=kafka \
--env ADVERTISED_PORT=9092 \
--env AUTO_CREATE_TOPICS=true \
spotify/kafka
go run *.go sequential
```

Run the example:
### Async Consumer

Async mode is enabled by setting `xkafka.Concurrency` to a value greater than 1. The consumer will process messages using a pool of Go routines.

```bash
$ go run main.go
go run *.go async
```
7 changes: 1 addition & 6 deletions e2e/xkafka/async.go → examples/xkafka/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ import (
)

func runAsync(c *cli.Context) error {
isManual := c.Bool("manual")
topic := "seq-" + xid.New().String()
topic := "async-" + xid.New().String()

if err := createTopic(topic, partitions); err != nil {
return err
Expand All @@ -30,10 +29,6 @@ func runAsync(c *cli.Context) error {
xkafka.Concurrency(2),
}

if isManual {
opts = append(opts, xkafka.ManualOffset(true))
}

// start consumers first
s.consumers = make([]*xkafka.Consumer, partitions)
components := make([]xrun.Component, partitions)
Expand Down
File renamed without changes.
25 changes: 7 additions & 18 deletions examples/xkafka/go.mod
Original file line number Diff line number Diff line change
@@ -1,49 +1,38 @@
module github.com/gojekfarm/xtools/examples/xkafka

go 1.19
go 1.21

replace (
github.com/gojekfarm/xtools => ../../
github.com/gojekfarm/xtools/xkafka => ../../xkafka
github.com/gojekfarm/xtools/xkafka/middleware/prometheus => ../../xkafka/middleware/prometheus
github.com/gojekfarm/xtools/xkafka/middleware/zerolog => ../../xkafka/middleware/zerolog

)

require (
github.com/confluentinc/confluent-kafka-go v1.9.2
github.com/gojekfarm/xrun v0.3.0
github.com/gojekfarm/xtools/xkafka v0.4.1
github.com/gojekfarm/xtools/xkafka/middleware/prometheus v0.4.1
github.com/gojekfarm/xtools/xkafka/middleware/zerolog v0.4.1
github.com/prometheus/client_golang v1.14.0
github.com/lmittmann/tint v1.0.3
github.com/rs/xid v1.4.0
github.com/rs/zerolog v1.29.0
github.com/urfave/cli/v2 v2.23.7
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cockroachdb/errors v1.9.0 // indirect
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f // indirect
github.com/cockroachdb/redact v1.1.3 // indirect
github.com/confluentinc/confluent-kafka-go v1.9.2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/getsentry/sentry-go v0.13.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sourcegraph/conc v0.2.0 // indirect
github.com/sourcegraph/sourcegraph/lib v0.0.0-20221216004406-749998a2ac74 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect
google.golang.org/protobuf v1.28.1 // indirect
)
Loading

0 comments on commit 2f16d32

Please sign in to comment.