diff --git a/ext/transport/kafka/writer.go b/ext/transport/kafka/writer.go index fd40da31cf..fc863c5505 100644 --- a/ext/transport/kafka/writer.go +++ b/ext/transport/kafka/writer.go @@ -8,6 +8,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/segmentio/kafka-go" + + "github.com/goto/optimus/internal/errors" ) const ( @@ -20,6 +22,8 @@ var kafkaQueueCounter = promauto.NewCounter(prometheus.CounterOpts{ }) type Writer struct { + logger log.Logger + kafkaWriter *kafka.Writer } @@ -36,7 +40,7 @@ func NewWriter(kafkaBrokerUrls []string, topic string, logger log.Logger) *Write ErrorLogger: kafka.LoggerFunc(logger.Error), } - return &Writer{kafkaWriter: writer} + return &Writer{kafkaWriter: writer, logger: logger} } func (w *Writer) Close() error { @@ -51,10 +55,23 @@ func (w *Writer) Write(messages [][]byte) error { } } - err := w.kafkaWriter.WriteMessages(context.Background(), kafkaMessages...) - if err == nil { - kafkaQueueCounter.Add(float64(len(messages))) - return nil + return w.send(kafkaMessages) +} + +func (w *Writer) send(messages []kafka.Message) error { + err := w.kafkaWriter.WriteMessages(context.Background(), messages...) + if err != nil { + var messageSizeError kafka.MessageTooLargeError + if errors.As(err, &messageSizeError) { + w.logger.Error("Received too large message error for a message, trying remaining") + w.logger.Error("Discarded message: %s", string(messageSizeError.Message.Value)) + + return w.send(messageSizeError.Remaining) + } + + return err } - return err + + kafkaQueueCounter.Add(float64(len(messages))) + return nil }