Skip to content

Commit

Permalink
Fix the bug that the memory leak of the producer
Browse files Browse the repository at this point in the history
Signed-off-by: Meng Yan <[email protected]>
  • Loading branch information
yanmxa committed Apr 1, 2024
1 parent fdb0e53 commit 1658051
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 127 deletions.
3 changes: 2 additions & 1 deletion pkg/transport/config/confluent_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ func GetConfluentConfigMap(kafkaConfig *transport.KafkaConfig, producer bool) (*
"log.connection.close": "false",
// https://github.com/confluentinc/librdkafka/issues/4349
"ssl.endpoint.identification.algorithm": "none",
// the events channel server for both producer and consumer
"go.events.channel.size": 1000,
}
if producer {
_ = kafkaConfigMap.SetKey("go.events.channel.size", 1000)
_ = kafkaConfigMap.SetKey("go.produce.channel.size", 1000)
_ = kafkaConfigMap.SetKey("acks", "1")
_ = kafkaConfigMap.SetKey("retries", "0")
Expand Down
2 changes: 1 addition & 1 deletion pkg/transport/consumer/generic_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (c *GenericConsumer) Start(ctx context.Context) error {
}
c.log.Info("init consumer", "offsets", offsets)
if len(offsets) > 0 {
receiveContext = kafka_confluent.CommitOffsetCtx(ctx, offsets)
receiveContext = kafka_confluent.WithTopicPartitionOffsets(ctx, offsets)
}
}

Expand Down
113 changes: 53 additions & 60 deletions pkg/transport/kafka_confluent/option.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
/*
Copyright 2023 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

package kafka_confluent

import (
"context"
"fmt"
"errors"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

// Option is the function signature required to be considered an kafka_confluent.Option.
type Option func(*Protocol) error

// WithConfigMap sets the configMap to init the kafka client. This option is not required.
func WithConfigMap(config *kafka.ConfigMap) Option {
return func(p *Protocol) error {
if config == nil {
return fmt.Errorf("the kafka.ConfigMap option must not be nil")
return errors.New("the kafka.ConfigMap option must not be nil")
}
p.kafkaConfigMap = config
return nil
Expand All @@ -25,83 +30,104 @@ func WithConfigMap(config *kafka.ConfigMap) Option {
func WithSenderTopic(defaultTopic string) Option {
return func(p *Protocol) error {
if defaultTopic == "" {
return fmt.Errorf("the producer topic option must not be nil")
return errors.New("the producer topic option must not be nil")
}
p.producerDefaultTopic = defaultTopic
return nil
}
}

// WithDeliveryChan sets the deliveryChan for the kafka.Producer. This option is not required.
func WithDeliveryChan(deliveryChan chan kafka.Event) Option {
return func(p *Protocol) error {
if deliveryChan == nil {
return fmt.Errorf("the producer deliveryChan option must not be nil")
}
p.producerDeliveryChan = deliveryChan
return nil
}
}

// WithReceiverTopics sets the topics for the kafka.Consumer. This option is not required.
func WithReceiverTopics(topics []string) Option {
return func(p *Protocol) error {
if topics == nil {
return fmt.Errorf("the consumer topics option must not be nil")
return errors.New("the consumer topics option must not be nil")
}
p.consumerTopics = topics
return nil
}
}

// WithRebalanceCallBack sets the callback for rebalancing of the consumer group. This option is not required.
func WithRebalanceCallBack(rebalanceCb kafka.RebalanceCb) Option {
return func(p *Protocol) error {
if rebalanceCb == nil {
return fmt.Errorf("the consumer group rebalance callback must not be nil")
return errors.New("the consumer group rebalance callback must not be nil")
}
p.consumerRebalanceCb = rebalanceCb
return nil
}
}

// WithPollTimeout sets timeout of the consumer polling for message or events, return nil on timeout. This option is not required.
func WithPollTimeout(timeoutMs int) Option {
return func(p *Protocol) error {
p.consumerPollTimeout = timeoutMs
return nil
}
}

// WithSender set a kafka.Producer instance to init the client directly. This option is not required.
func WithSender(producer *kafka.Producer) Option {
return func(p *Protocol) error {
if producer == nil {
return fmt.Errorf("the producer option must not be nil")
return errors.New("the producer option must not be nil")
}
p.producer = producer
return nil
}
}

// WithEventHandler provide a func on how to handle the kafka.Event for the producer.Events() chan. This option is not required.
func WithEventHandler(handler func(ctx context.Context, err kafka.Event)) Option {
return func(p *Protocol) error {
p.producerEventHandler = handler
return nil
}
}

// WithErrorHandler provide a func on how to handle the kafka.Error which the kafka.Consumer has polled. This option is not required.
func WithErrorHandler(handler func(ctx context.Context, err kafka.Error)) Option {
return func(p *Protocol) error {
p.consumerErrorHandler = handler
return nil
}
}

// WithSender set a kafka.Consumer instance to init the client directly. This option is not required.
func WithReceiver(consumer *kafka.Consumer) Option {
return func(p *Protocol) error {
if consumer == nil {
return fmt.Errorf("the consumer option must not be nil")
return errors.New("the consumer option must not be nil")
}
p.consumer = consumer
return nil
}
}

// Opaque key type used to store offsets: assgin offset from ctx, commit offset from context
type commitOffsetType struct{}
// Opaque key type used to store topicPartitionOffsets: assign them from ctx. This option is not required.
type topicPartitionOffsetsType struct{}

var offsetKey = commitOffsetType{}
var offsetKey = topicPartitionOffsetsType{}

// CommitOffsetCtx will return the topic partitions to commit offsets for.
func CommitOffsetCtx(ctx context.Context, topicPartitions []kafka.TopicPartition) context.Context {
return context.WithValue(ctx, offsetKey, topicPartitions)
// WithTopicPartitionOffsets will set the positions where the consumer starts consuming from. This option is not required.
func WithTopicPartitionOffsets(ctx context.Context, topicPartitionOffsets []kafka.TopicPartition) context.Context {
if len(topicPartitionOffsets) == 0 {
panic("the topicPartitionOffsets cannot be empty")
}
for _, offset := range topicPartitionOffsets {
if offset.Topic == nil || *(offset.Topic) == "" {
panic("the kafka topic cannot be nil or empty")
}
if offset.Partition < 0 || offset.Offset < 0 {
panic("the kafka partition/offset must be non-negative")
}
}
return context.WithValue(ctx, offsetKey, topicPartitionOffsets)
}

// CommitOffsetCtx looks in the given context and returns `[]kafka.TopicPartition` if found and valid, otherwise nil.
func CommitOffsetFrom(ctx context.Context) []kafka.TopicPartition {
// TopicPartitionOffsetsFrom looks in the given context and returns []kafka.TopicPartition or nil if not set
func TopicPartitionOffsetsFrom(ctx context.Context) []kafka.TopicPartition {
c := ctx.Value(offsetKey)
if c != nil {
if s, ok := c.([]kafka.TopicPartition); ok {
Expand All @@ -111,39 +137,6 @@ func CommitOffsetFrom(ctx context.Context) []kafka.TopicPartition {
return nil
}

const (
OffsetEventSource = "io.cloudevents.kafka.confluent.consumer"
OffsetEventType = "io.cloudevents.kafka.confluent.consumer.offsets"
)

func NewOffsetEvent() cloudevents.Event {
e := cloudevents.NewEvent()
e.SetSource(OffsetEventSource)
e.SetType(OffsetEventType)
return e
}

// Opaque key type used to store topic partition
type topicPartitionKeyType struct{}

var topicPartitionKey = topicPartitionKeyType{}

// WithTopicPartition returns back a new context with the given partition.
func WithTopicPartition(ctx context.Context, partition int32) context.Context {
return context.WithValue(ctx, topicPartitionKey, partition)
}

// TopicPartitionFrom looks in the given context and returns `partition` as a int64 if found and valid, otherwise -1.
func TopicPartitionFrom(ctx context.Context) int32 {
c := ctx.Value(topicPartitionKey)
if c != nil {
if s, ok := c.(int32); ok {
return s
}
}
return -1
}

// Opaque key type used to store message key
type messageKeyType struct{}

Expand Down
Loading

0 comments on commit 1658051

Please sign in to comment.