From 16580517d3f4e6fda85d076b9e2ec403722719d3 Mon Sep 17 00:00:00 2001 From: Meng Yan Date: Mon, 1 Apr 2024 07:00:05 +0000 Subject: [PATCH] Fix the bug that the memory leak of the producer Signed-off-by: Meng Yan --- pkg/transport/config/confluent_config.go | 3 +- pkg/transport/consumer/generic_consumer.go | 2 +- pkg/transport/kafka_confluent/option.go | 113 ++++++------ pkg/transport/kafka_confluent/protocol.go | 196 ++++++++++++++------- 4 files changed, 187 insertions(+), 127 deletions(-) diff --git a/pkg/transport/config/confluent_config.go b/pkg/transport/config/confluent_config.go index 78587a5fa..651bcfb37 100644 --- a/pkg/transport/config/confluent_config.go +++ b/pkg/transport/config/confluent_config.go @@ -20,9 +20,10 @@ func GetConfluentConfigMap(kafkaConfig *transport.KafkaConfig, producer bool) (* "log.connection.close": "false", // https://github.com/confluentinc/librdkafka/issues/4349 "ssl.endpoint.identification.algorithm": "none", + // the events channel server for both producer and consumer + "go.events.channel.size": 1000, } if producer { - _ = kafkaConfigMap.SetKey("go.events.channel.size", 1000) _ = kafkaConfigMap.SetKey("go.produce.channel.size", 1000) _ = kafkaConfigMap.SetKey("acks", "1") _ = kafkaConfigMap.SetKey("retries", "0") diff --git a/pkg/transport/consumer/generic_consumer.go b/pkg/transport/consumer/generic_consumer.go index 13bd0115b..cb9095658 100644 --- a/pkg/transport/consumer/generic_consumer.go +++ b/pkg/transport/consumer/generic_consumer.go @@ -116,7 +116,7 @@ func (c *GenericConsumer) Start(ctx context.Context) error { } c.log.Info("init consumer", "offsets", offsets) if len(offsets) > 0 { - receiveContext = kafka_confluent.CommitOffsetCtx(ctx, offsets) + receiveContext = kafka_confluent.WithTopicPartitionOffsets(ctx, offsets) } } diff --git a/pkg/transport/kafka_confluent/option.go b/pkg/transport/kafka_confluent/option.go index 74fcef919..dc3764716 100644 --- a/pkg/transport/kafka_confluent/option.go +++ b/pkg/transport/kafka_confluent/option.go @@ -1,20 +1,25 @@ +/* + Copyright 2023 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + package kafka_confluent import ( "context" - "fmt" + "errors" - cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) // Option is the function signature required to be considered an kafka_confluent.Option. type Option func(*Protocol) error +// WithConfigMap sets the configMap to init the kafka client. This option is not required. func WithConfigMap(config *kafka.ConfigMap) Option { return func(p *Protocol) error { if config == nil { - return fmt.Errorf("the kafka.ConfigMap option must not be nil") + return errors.New("the kafka.ConfigMap option must not be nil") } p.kafkaConfigMap = config return nil @@ -25,44 +30,36 @@ func WithConfigMap(config *kafka.ConfigMap) Option { func WithSenderTopic(defaultTopic string) Option { return func(p *Protocol) error { if defaultTopic == "" { - return fmt.Errorf("the producer topic option must not be nil") + return errors.New("the producer topic option must not be nil") } p.producerDefaultTopic = defaultTopic return nil } } -// WithDeliveryChan sets the deliveryChan for the kafka.Producer. This option is not required. -func WithDeliveryChan(deliveryChan chan kafka.Event) Option { - return func(p *Protocol) error { - if deliveryChan == nil { - return fmt.Errorf("the producer deliveryChan option must not be nil") - } - p.producerDeliveryChan = deliveryChan - return nil - } -} - +// WithReceiverTopics sets the topics for the kafka.Consumer. This option is not required. func WithReceiverTopics(topics []string) Option { return func(p *Protocol) error { if topics == nil { - return fmt.Errorf("the consumer topics option must not be nil") + return errors.New("the consumer topics option must not be nil") } p.consumerTopics = topics return nil } } +// WithRebalanceCallBack sets the callback for rebalancing of the consumer group. This option is not required. func WithRebalanceCallBack(rebalanceCb kafka.RebalanceCb) Option { return func(p *Protocol) error { if rebalanceCb == nil { - return fmt.Errorf("the consumer group rebalance callback must not be nil") + return errors.New("the consumer group rebalance callback must not be nil") } p.consumerRebalanceCb = rebalanceCb return nil } } +// WithPollTimeout sets timeout of the consumer polling for message or events, return nil on timeout. This option is not required. func WithPollTimeout(timeoutMs int) Option { return func(p *Protocol) error { p.consumerPollTimeout = timeoutMs @@ -70,38 +67,67 @@ func WithPollTimeout(timeoutMs int) Option { } } +// WithSender set a kafka.Producer instance to init the client directly. This option is not required. func WithSender(producer *kafka.Producer) Option { return func(p *Protocol) error { if producer == nil { - return fmt.Errorf("the producer option must not be nil") + return errors.New("the producer option must not be nil") } p.producer = producer return nil } } +// WithEventHandler provide a func on how to handle the kafka.Event for the producer.Events() chan. This option is not required. +func WithEventHandler(handler func(ctx context.Context, err kafka.Event)) Option { + return func(p *Protocol) error { + p.producerEventHandler = handler + return nil + } +} + +// WithErrorHandler provide a func on how to handle the kafka.Error which the kafka.Consumer has polled. This option is not required. +func WithErrorHandler(handler func(ctx context.Context, err kafka.Error)) Option { + return func(p *Protocol) error { + p.consumerErrorHandler = handler + return nil + } +} + +// WithSender set a kafka.Consumer instance to init the client directly. This option is not required. func WithReceiver(consumer *kafka.Consumer) Option { return func(p *Protocol) error { if consumer == nil { - return fmt.Errorf("the consumer option must not be nil") + return errors.New("the consumer option must not be nil") } p.consumer = consumer return nil } } -// Opaque key type used to store offsets: assgin offset from ctx, commit offset from context -type commitOffsetType struct{} +// Opaque key type used to store topicPartitionOffsets: assign them from ctx. This option is not required. +type topicPartitionOffsetsType struct{} -var offsetKey = commitOffsetType{} +var offsetKey = topicPartitionOffsetsType{} -// CommitOffsetCtx will return the topic partitions to commit offsets for. -func CommitOffsetCtx(ctx context.Context, topicPartitions []kafka.TopicPartition) context.Context { - return context.WithValue(ctx, offsetKey, topicPartitions) +// WithTopicPartitionOffsets will set the positions where the consumer starts consuming from. This option is not required. +func WithTopicPartitionOffsets(ctx context.Context, topicPartitionOffsets []kafka.TopicPartition) context.Context { + if len(topicPartitionOffsets) == 0 { + panic("the topicPartitionOffsets cannot be empty") + } + for _, offset := range topicPartitionOffsets { + if offset.Topic == nil || *(offset.Topic) == "" { + panic("the kafka topic cannot be nil or empty") + } + if offset.Partition < 0 || offset.Offset < 0 { + panic("the kafka partition/offset must be non-negative") + } + } + return context.WithValue(ctx, offsetKey, topicPartitionOffsets) } -// CommitOffsetCtx looks in the given context and returns `[]kafka.TopicPartition` if found and valid, otherwise nil. -func CommitOffsetFrom(ctx context.Context) []kafka.TopicPartition { +// TopicPartitionOffsetsFrom looks in the given context and returns []kafka.TopicPartition or nil if not set +func TopicPartitionOffsetsFrom(ctx context.Context) []kafka.TopicPartition { c := ctx.Value(offsetKey) if c != nil { if s, ok := c.([]kafka.TopicPartition); ok { @@ -111,39 +137,6 @@ func CommitOffsetFrom(ctx context.Context) []kafka.TopicPartition { return nil } -const ( - OffsetEventSource = "io.cloudevents.kafka.confluent.consumer" - OffsetEventType = "io.cloudevents.kafka.confluent.consumer.offsets" -) - -func NewOffsetEvent() cloudevents.Event { - e := cloudevents.NewEvent() - e.SetSource(OffsetEventSource) - e.SetType(OffsetEventType) - return e -} - -// Opaque key type used to store topic partition -type topicPartitionKeyType struct{} - -var topicPartitionKey = topicPartitionKeyType{} - -// WithTopicPartition returns back a new context with the given partition. -func WithTopicPartition(ctx context.Context, partition int32) context.Context { - return context.WithValue(ctx, topicPartitionKey, partition) -} - -// TopicPartitionFrom looks in the given context and returns `partition` as a int64 if found and valid, otherwise -1. -func TopicPartitionFrom(ctx context.Context) int32 { - c := ctx.Value(topicPartitionKey) - if c != nil { - if s, ok := c.(int32); ok { - return s - } - } - return -1 -} - // Opaque key type used to store message key type messageKeyType struct{} diff --git a/pkg/transport/kafka_confluent/protocol.go b/pkg/transport/kafka_confluent/protocol.go index 6578f85a0..1f69a79e9 100644 --- a/pkg/transport/kafka_confluent/protocol.go +++ b/pkg/transport/kafka_confluent/protocol.go @@ -1,7 +1,13 @@ +/* + Copyright 2023 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + package kafka_confluent import ( "context" + "errors" "fmt" "io" "sync" @@ -22,50 +28,71 @@ var ( type Protocol struct { kafkaConfigMap *kafka.ConfigMap - consumer *kafka.Consumer - consumerTopics []string - consumerRebalanceCb kafka.RebalanceCb // optional - consumerPollTimeout int // optional - consumerMux sync.Mutex + consumer *kafka.Consumer + consumerTopics []string + consumerRebalanceCb kafka.RebalanceCb // optional + consumerPollTimeout int // optional + consumerErrorHandler func(ctx context.Context, err kafka.Error) //optional + consumerMux sync.Mutex + consumerIncoming chan *kafka.Message + consumerCtx context.Context + consumerCancel context.CancelFunc - producer *kafka.Producer - producerDeliveryChan chan kafka.Event // optional - producerDefaultTopic string // optional - producerDefaultPartition int32 // optional + producer *kafka.Producer + producerDeliveryChan chan kafka.Event // optional + producerDefaultTopic string // optional + producerCtx context.Context + producerCancel context.CancelFunc + producerEventHandler func(ctx context.Context, err kafka.Event) //optional - // receiver - incoming chan *kafka.Message + closerMux sync.Mutex } func New(opts ...Option) (*Protocol, error) { p := &Protocol{ - producerDefaultPartition: kafka.PartitionAny, - consumerPollTimeout: 100, - incoming: make(chan *kafka.Message), + consumerPollTimeout: 100, + consumerIncoming: make(chan *kafka.Message), } if err := p.applyOptions(opts...); err != nil { return nil, err } - if p.consumerTopics != nil && p.consumer == nil && p.kafkaConfigMap != nil { - consumer, err := kafka.NewConsumer(p.kafkaConfigMap) - if err != nil { - return nil, err + if p.kafkaConfigMap != nil { + if p.consumerTopics != nil && p.consumer == nil { + consumer, err := kafka.NewConsumer(p.kafkaConfigMap) + if err != nil { + return nil, err + } + p.consumer = consumer } - p.consumer = consumer - } else if p.producer == nil && p.kafkaConfigMap != nil { - producer, err := kafka.NewProducer(p.kafkaConfigMap) - if err != nil { - return nil, err + if p.producerDefaultTopic != "" && p.producer == nil { + producer, err := kafka.NewProducer(p.kafkaConfigMap) + if err != nil { + return nil, err + } + p.producer = producer } - p.producer = producer - p.producerDeliveryChan = make(chan kafka.Event) + if p.producer == nil && p.consumer == nil { + return nil, errors.New("at least receiver or sender topic must be set") + } + } + if p.producerDefaultTopic != "" && p.producer == nil { + return nil, fmt.Errorf("at least configmap or producer must be set for the sender topic: %s", p.producerDefaultTopic) } - if p.kafkaConfigMap == nil && p.producer == nil && p.consumer == nil { - return nil, fmt.Errorf("At least one of the following to initialize the protocol: config, producer, or consumer.") + if len(p.consumerTopics) > 0 && p.consumer == nil { + return nil, fmt.Errorf("at least configmap or consumer must be set for the receiver topics: %s", p.consumerTopics) } + if p.kafkaConfigMap == nil && p.producer == nil && p.consumer == nil { + return nil, errors.New("at least one of the following to initialize the protocol must be set: config, producer, or consumer") + } + if p.producer != nil { + p.producerDeliveryChan = make(chan kafka.Event) + if p.producerEventHandler == nil { + p.producerEventHandler = func(ctx context.Context, err kafka.Event) {} + } + } return p, nil } @@ -79,25 +106,49 @@ func (p *Protocol) applyOptions(opts ...Option) error { } func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) (err error) { - // support the commit offset from the context - offsets := CommitOffsetFrom(ctx) - if offsets != nil { - if p.consumer == nil { - return fmt.Errorf("the consumer client must not be nil") - } - _, err = p.consumer.CommitOffsets(offsets) - return err + if p.producer == nil { + return errors.New("producer client must be set") } - if p.producer == nil { - return fmt.Errorf("the producer client must not be nil") + p.closerMux.Lock() + defer p.closerMux.Unlock() + if p.producer.IsClosed() { + return errors.New("producer is closed") + } + + // Listen to all the client instance-level events in a different goroutine + // It's important to read these events, otherwise the events channel will eventually fill up + if p.producerCtx == nil { + p.producerCtx, p.producerCancel = context.WithCancel(ctx) + go func() { + // clean up the resources + defer func() { + logger := cecontext.LoggerFrom(ctx) + if !p.producer.IsClosed() { + logger.Info("Closing producer") + p.producer.Close() + close(p.producerDeliveryChan) + } + }() + + for { + select { + case <-p.producerCtx.Done(): + return + default: + ev := <-p.producer.Events() + p.producerEventHandler(ctx, ev) + } + } + }() } + defer in.Finish(err) kafkaMsg := &kafka.Message{ TopicPartition: kafka.TopicPartition{ Topic: &p.producerDefaultTopic, - Partition: p.producerDefaultPartition, + Partition: kafka.PartitionAny, }, } @@ -105,10 +156,6 @@ func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers .. kafkaMsg.TopicPartition.Topic = &topic } - if partition := TopicPartitionFrom(ctx); partition != -1 { - kafkaMsg.TopicPartition.Partition = partition - } - if messageKey := MessageKeyFrom(ctx); messageKey != "" { kafkaMsg.Key = []byte(messageKey) } @@ -132,10 +179,10 @@ func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers .. func (p *Protocol) OpenInbound(ctx context.Context) error { if p.consumer == nil { - return fmt.Errorf("the consumer client must not be nil") + return errors.New("the consumer client must be set") } if p.consumerTopics == nil { - return fmt.Errorf("the consumer topics must not be nil") + return errors.New("the consumer topics must be set") } p.consumerMux.Lock() @@ -143,7 +190,7 @@ func (p *Protocol) OpenInbound(ctx context.Context) error { logger := cecontext.LoggerFrom(ctx) // Query committed offsets for each partition - if positions := CommitOffsetFrom(ctx); positions != nil { + if positions := TopicPartitionOffsetsFrom(ctx); positions != nil { if err := p.consumer.Assign(positions); err != nil { return err } @@ -155,11 +202,25 @@ func (p *Protocol) OpenInbound(ctx context.Context) error { return err } - run := true - for run { + p.closerMux.Lock() + p.consumerCtx, p.consumerCancel = context.WithCancel(ctx) + defer p.consumerCancel() + p.closerMux.Unlock() + + defer func() { + if !p.consumer.IsClosed() { + logger.Infof("Closing consumer %v", p.consumerTopics) + if err = p.consumer.Close(); err != nil { + logger.Errorf("failed to close the consumer: %v", err) + } + } + close(p.consumerIncoming) + }() + + for { select { - case <-ctx.Done(): - run = false + case <-p.consumerCtx.Done(): + return p.consumerCtx.Err() default: ev := p.consumer.Poll(p.consumerPollTimeout) if ev == nil { @@ -167,25 +228,27 @@ func (p *Protocol) OpenInbound(ctx context.Context) error { } switch e := ev.(type) { case *kafka.Message: - p.incoming <- e + p.consumerIncoming <- e case kafka.Error: - // Errors should generally be considered informational, the client - // will try to automatically recover. - // logger.Warnf("Consumer get a kafka error %v: %v\n", e.Code(), e) - default: - // logger.Infof("Ignored %v\n", e) + // Errors should generally be considered informational, the client will try to automatically recover. + // But in here, we choose to terminate the application if all brokers are down. + logger.Infof("Error %v: %v", e.Code(), e) + if p.consumerErrorHandler != nil { + p.consumerErrorHandler(ctx, e) + } + if e.Code() == kafka.ErrAllBrokersDown { + logger.Error("All broker connections are down") + return e + } } } } - - logger.Infof("Closing consumer %v", p.consumerTopics) - return p.consumer.Close() } // Receive implements Receiver.Receive func (p *Protocol) Receive(ctx context.Context) (binding.Message, error) { select { - case m, ok := <-p.incoming: + case m, ok := <-p.consumerIncoming: if !ok { return nil, io.EOF } @@ -196,14 +259,17 @@ func (p *Protocol) Receive(ctx context.Context) (binding.Message, error) { } } +// Close cleans up resources after use. Must be called to properly close underlying Kafka resources and avoid resource leaks func (p *Protocol) Close(ctx context.Context) error { - if p.consumer != nil { - return p.consumer.Close() + p.closerMux.Lock() + defer p.closerMux.Unlock() + + if p.consumerCancel != nil { + p.consumerCancel() } - if p.producer != nil { - p.producer.Close() + + if p.producerCancel != nil { + p.producerCancel() } - close(p.producerDeliveryChan) - close(p.incoming) return nil }