From 52b40ad49fa681f7bc91a41a2d754cf717c9f5d3 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Fri, 22 Dec 2023 17:46:57 +0800 Subject: [PATCH] refine some code --- cdc/sink/dmlsink/mq/mq_dml_sink.go | 25 +++--- cdc/sink/dmlsink/mq/worker.go | 121 ++++++++++++++--------------- cdc/sink/dmlsink/mq/worker_test.go | 24 +++--- pkg/sink/codec/encoder.go | 7 ++ pkg/sink/codec/encoder_group.go | 67 ++++++++-------- 5 files changed, 124 insertions(+), 120 deletions(-) diff --git a/cdc/sink/dmlsink/mq/mq_dml_sink.go b/cdc/sink/dmlsink/mq/mq_dml_sink.go index 14ed6226255..abbe1325305 100644 --- a/cdc/sink/dmlsink/mq/mq_dml_sink.go +++ b/cdc/sink/dmlsink/mq/mq_dml_sink.go @@ -146,15 +146,18 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa if s.alive.isDead { return errors.Trace(errors.New("dead dmlSink")) } - // merge the split row callback into one callback - mergedCallback := func(outCallback func(), totalCount uint64) func() { - var acked atomic.Uint64 + // Because we split txn to rows when sending to the MQ. + // So we need to convert the txn level callback to row level callback. + toRowCallback := func(txnCallback func(), totalCount uint64) func() { + var calledCount atomic.Uint64 + // The callback of the last row will trigger the callback of the txn. return func() { - if acked.Add(1) == totalCount { - outCallback() + if calledCount.Inc() == totalCount { + txnCallback() } } } + for _, txn := range txns { if txn.GetTableSinkState() != state.TableSinkSinking { // The table where the event comes from is in stopping, so it's safe @@ -162,8 +165,8 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa txn.Callback() continue } - callback := mergedCallback(txn.Callback, uint64(len(txn.Event.Rows))) + rowCallback := toRowCallback(txn.Callback, uint64(len(txn.Event.Rows))) for _, row := range txn.Event.Rows { topic := s.alive.eventRouter.GetTopicForRowChange(row) partitionNum, err := s.alive.topicManager.GetPartitionNum(s.ctx, topic) @@ -181,20 +184,24 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa s.cancel(err) return errors.Trace(err) } - + // Note: Calculate the partition index after the transformer is applied. + // Because the transformer may change the row of the event. index, key, err := s.alive.eventRouter.GetPartitionForRowChange(row, partitionNum) if err != nil { s.cancel(err) return errors.Trace(err) } + // This never be blocked because this is an unbounded channel. + // We already limit the memory usage by MemoryQuota at SinkManager level. + // So it is safe to send the event to a unbounded channel here. s.alive.worker.msgChan.In() <- mqEvent{ - key: TopicPartitionKey{ + key: codec.TopicPartitionKey{ Topic: topic, Partition: index, PartitionKey: key, }, rowEvent: &dmlsink.RowChangeCallbackableEvent{ Event: row, - Callback: callback, + Callback: rowCallback, SinkState: txn.SinkState, }, } diff --git a/cdc/sink/dmlsink/mq/worker.go b/cdc/sink/dmlsink/mq/worker.go index 8f4ca155de7..93f22f211dc 100644 --- a/cdc/sink/dmlsink/mq/worker.go +++ b/cdc/sink/dmlsink/mq/worker.go @@ -34,24 +34,17 @@ import ( ) const ( - // flushBatchSize is the batch size of the flush worker. - flushBatchSize = 2048 - // flushInterval is the interval of the flush worker. - // We should not set it too big, otherwise it will cause we wait too long to send the message. - flushInterval = 15 * time.Millisecond + // batchSize is the maximum size of the number of messages in a batch. + batchSize = 2048 + // batchInterval is the interval of the worker to collect a batch of messages. + // It shouldn't be too large, otherwise it will lead to a high latency. + batchInterval = 15 * time.Millisecond ) -// TopicPartitionKey contains the topic and partition key of the message. -type TopicPartitionKey struct { - Topic string - Partition int32 - PartitionKey string -} - // mqEvent is the event of the mq worker. // It carries the topic and partition information of the message. type mqEvent struct { - key TopicPartitionKey + key codec.TopicPartitionKey rowEvent *dmlsink.RowChangeCallbackableEvent } @@ -64,7 +57,7 @@ type worker struct { // msgChan caches the messages to be sent. // It is an unbounded channel. msgChan *chann.DrainableChann[mqEvent] - // ticker used to force flush the messages when the interval is reached. + // ticker used to force flush the batched messages when the interval is reached. ticker *time.Ticker encoderGroup codec.EncoderGroup @@ -94,7 +87,7 @@ func newWorker( changeFeedID: id, protocol: protocol, msgChan: chann.NewAutoDrainChann[mqEvent](), - ticker: time.NewTicker(flushInterval), + ticker: time.NewTicker(batchInterval), encoderGroup: encoderGroup, producer: producer, metricMQWorkerSendMessageDuration: mq.WorkerSendMessageDuration.WithLabelValues(id.Namespace, id.ID), @@ -162,9 +155,7 @@ func (w *worker) nonBatchEncodeRun(ctx context.Context) error { } if err := w.encoderGroup.AddEvents( ctx, - event.key.Topic, - event.key.Partition, - event.key.PartitionKey, + event.key, event.rowEvent); err != nil { return errors.Trace(err) } @@ -179,99 +170,103 @@ func (w *worker) batchEncodeRun(ctx context.Context) (retErr error) { zap.String("changefeed", w.changeFeedID.ID), zap.String("protocol", w.protocol.String()), ) - // Fixed size of the batch. - eventsBuf := make([]mqEvent, flushBatchSize) + + msgsBuf := make([]mqEvent, batchSize) for { start := time.Now() - endIndex, err := w.batch(ctx, eventsBuf, flushInterval) + msgCount, err := w.batch(ctx, msgsBuf, batchInterval) if err != nil { return errors.Trace(err) } - if endIndex == 0 { + if msgCount == 0 { continue } - w.metricMQWorkerBatchSize.Observe(float64(endIndex)) + w.metricMQWorkerBatchSize.Observe(float64(msgCount)) w.metricMQWorkerBatchDuration.Observe(time.Since(start).Seconds()) - msgs := eventsBuf[:endIndex] - partitionedRows := w.group(msgs) - for key, events := range partitionedRows { - if err := w.encoderGroup. - AddEvents(ctx, key.Topic, key.Partition, key.PartitionKey, events...); err != nil { + + msgs := msgsBuf[:msgCount] + // Group messages by its TopicPartitionKey before adding them to the encoder group. + groupedMsgs := w.group(msgs) + for key, msg := range groupedMsgs { + if err := w.encoderGroup.AddEvents(ctx, key, msg...); err != nil { return errors.Trace(err) } } } } -// batch collects a batch of messages to be sent to the DML producer. +// batch collects a batch of messages from w.msgChan into buffer. +// It returns the number of messages collected. +// Note: It will block until at least one message is received. func (w *worker) batch( - ctx context.Context, events []mqEvent, flushInterval time.Duration, + ctx context.Context, buffer []mqEvent, flushInterval time.Duration, ) (int, error) { - index := 0 - maxBatchSize := len(events) + msgCount := 0 + maxBatchSize := len(buffer) // We need to receive at least one message or be interrupted, // otherwise it will lead to idling. select { case <-ctx.Done(): - return index, ctx.Err() + return msgCount, ctx.Err() case msg, ok := <-w.msgChan.Out(): if !ok { log.Warn("MQ sink flush worker channel closed") - return index, nil + return msgCount, nil } if msg.rowEvent != nil { w.statistics.ObserveRows(msg.rowEvent.Event) - events[index] = msg - index++ + buffer[msgCount] = msg + msgCount++ } } - // Start a new tick to flush the batch. + // Reset the ticker to start a new batching. + // We need to stop batching when the interval is reached. w.ticker.Reset(flushInterval) for { select { case <-ctx.Done(): - return index, ctx.Err() + return msgCount, ctx.Err() case msg, ok := <-w.msgChan.Out(): if !ok { log.Warn("MQ sink flush worker channel closed") - return index, nil + return msgCount, nil } if msg.rowEvent != nil { w.statistics.ObserveRows(msg.rowEvent.Event) - events[index] = msg - index++ + buffer[msgCount] = msg + msgCount++ } - if index >= maxBatchSize { - return index, nil + if msgCount >= maxBatchSize { + return msgCount, nil } case <-w.ticker.C: - return index, nil + return msgCount, nil } } } -// group is responsible for grouping messages by the partition. +// group groups messages by its key. func (w *worker) group( - events []mqEvent, -) map[TopicPartitionKey][]*dmlsink.RowChangeCallbackableEvent { - partitionedRows := make(map[TopicPartitionKey][]*dmlsink.RowChangeCallbackableEvent) - for _, event := range events { + msgs []mqEvent, +) map[codec.TopicPartitionKey][]*dmlsink.RowChangeCallbackableEvent { + groupedMsgs := make(map[codec.TopicPartitionKey][]*dmlsink.RowChangeCallbackableEvent) + for _, msg := range msgs { // Skip this event when the table is stopping. - if event.rowEvent.GetTableSinkState() != state.TableSinkSinking { - event.rowEvent.Callback() - log.Debug("Skip event of stopped table", zap.Any("event", event.rowEvent)) + if msg.rowEvent.GetTableSinkState() != state.TableSinkSinking { + msg.rowEvent.Callback() + log.Debug("Skip event of stopped table", zap.Any("event", msg.rowEvent)) continue } - if _, ok := partitionedRows[event.key]; !ok { - partitionedRows[event.key] = make([]*dmlsink.RowChangeCallbackableEvent, 0) + if _, ok := groupedMsgs[msg.key]; !ok { + groupedMsgs[msg.key] = make([]*dmlsink.RowChangeCallbackableEvent, 0) } - partitionedRows[event.key] = append(partitionedRows[event.key], event.rowEvent) + groupedMsgs[msg.key] = append(groupedMsgs[msg.key], msg.rowEvent) } - return partitionedRows + return groupedMsgs } func (w *worker) sendMessages(ctx context.Context) error { @@ -285,16 +280,16 @@ func (w *worker) sendMessages(ctx context.Context) error { }() var err error - inputCh := w.encoderGroup.Output() + outCh := w.encoderGroup.Output() for { select { case <-ctx.Done(): return errors.Trace(ctx.Err()) case <-ticker.C: - metric.Set(float64(len(inputCh))) - case future, ok := <-inputCh: + metric.Set(float64(len(outCh))) + case future, ok := <-outCh: if !ok { - log.Warn("MQ sink encode output channel closed", + log.Warn("MQ sink encoder's output channel closed", zap.String("namespace", w.changeFeedID.Namespace), zap.String("changefeed", w.changeFeedID.ID)) return nil @@ -305,11 +300,11 @@ func (w *worker) sendMessages(ctx context.Context) error { for _, message := range future.Messages { start := time.Now() if err = w.statistics.RecordBatchExecution(func() (int, int64, error) { - message.SetPartitionKey(future.PartitionKey) + message.SetPartitionKey(future.Key.PartitionKey) if err := w.producer.AsyncSendMessage( ctx, - future.Topic, - future.Partition, + future.Key.Topic, + future.Key.Partition, message); err != nil { return 0, 0, err } diff --git a/cdc/sink/dmlsink/mq/worker_test.go b/cdc/sink/dmlsink/mq/worker_test.go index fd35995a2f8..f4140c20c28 100644 --- a/cdc/sink/dmlsink/mq/worker_test.go +++ b/cdc/sink/dmlsink/mq/worker_test.go @@ -77,7 +77,7 @@ func TestNonBatchEncode_SendMessages(t *testing.T) { worker, p := newNonBatchEncodeWorker(ctx, t) defer worker.close() - key := TopicPartitionKey{ + key := codec.TopicPartitionKey{ Topic: "test", Partition: 1, } @@ -136,7 +136,7 @@ func TestBatchEncode_Batch(t *testing.T) { defer cancel() worker, _ := newBatchEncodeWorker(ctx, t) defer worker.close() - key := TopicPartitionKey{ + key := codec.TopicPartitionKey{ Topic: "test", Partition: 1, } @@ -168,15 +168,15 @@ func TestBatchEncode_Batch(t *testing.T) { func TestBatchEncode_Group(t *testing.T) { t.Parallel() - key1 := TopicPartitionKey{ + key1 := codec.TopicPartitionKey{ Topic: "test", Partition: 1, } - key2 := TopicPartitionKey{ + key2 := codec.TopicPartitionKey{ Topic: "test", Partition: 2, } - key3 := TopicPartitionKey{ + key3 := codec.TopicPartitionKey{ Topic: "test1", Partition: 2, } @@ -272,11 +272,11 @@ func TestBatchEncode_GroupWhenTableStopping(t *testing.T) { tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo) _, _, colInfo := tableInfo.GetRowColInfos() - key1 := TopicPartitionKey{ + key1 := codec.TopicPartitionKey{ Topic: "test", Partition: 1, } - key2 := TopicPartitionKey{ + key2 := codec.TopicPartitionKey{ Topic: "test", Partition: 2, } @@ -339,15 +339,15 @@ func TestBatchEncode_GroupWhenTableStopping(t *testing.T) { } func TestBatchEncode_SendMessages(t *testing.T) { - key1 := TopicPartitionKey{ + key1 := codec.TopicPartitionKey{ Topic: "test", Partition: 1, } - key2 := TopicPartitionKey{ + key2 := codec.TopicPartitionKey{ Topic: "test", Partition: 2, } - key3 := TopicPartitionKey{ + key3 := codec.TopicPartitionKey{ Topic: "test1", Partition: 2, } @@ -502,11 +502,11 @@ func TestBatchEncodeWorker_Abort(t *testing.T) { } func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) { - key1 := TopicPartitionKey{ + key1 := codec.TopicPartitionKey{ Topic: "test", Partition: 1, } - key2 := TopicPartitionKey{ + key2 := codec.TopicPartitionKey{ Topic: "test", Partition: 2, } diff --git a/pkg/sink/codec/encoder.go b/pkg/sink/codec/encoder.go index 3ddc345559e..c6250c08610 100644 --- a/pkg/sink/codec/encoder.go +++ b/pkg/sink/codec/encoder.go @@ -83,3 +83,10 @@ func IsColumnValueEqual(preValue, updatedValue interface{}) bool { // the value type should be the same return preValue == updatedValue } + +// TopicPartitionKey contains the topic and partition key of the message. +type TopicPartitionKey struct { + Topic string + Partition int32 + PartitionKey string +} diff --git a/pkg/sink/codec/encoder_group.go b/pkg/sink/codec/encoder_group.go index 82f818a7521..6989c1d696f 100644 --- a/pkg/sink/codec/encoder_group.go +++ b/pkg/sink/codec/encoder_group.go @@ -38,10 +38,9 @@ const ( type EncoderGroup interface { // Run start the group Run(ctx context.Context) error - // AddEvents add events into the group, handled by one of the encoders - // all input events should belong to the same topic and partition, this should be guaranteed by the caller - AddEvents(ctx context.Context, topic string, partition int32, - partitionKey string, events ...*dmlsink.RowChangeCallbackableEvent) error + // AddEvents add events into the group and encode them by one of the encoders in the group. + // Note: The caller should make sure all events should belong to the same topic and partition. + AddEvents(ctx context.Context, key TopicPartitionKey, events ...*dmlsink.RowChangeCallbackableEvent) error // Output returns a channel produce futures Output() <-chan *future } @@ -50,7 +49,9 @@ type encoderGroup struct { changefeedID model.ChangeFeedID builder RowEventEncoderBuilder - count int + // concurrency is the number of encoder pipelines to run + concurrency int + // inputCh is the input channel for each encoder pipeline inputCh []chan *future index uint64 @@ -59,25 +60,25 @@ type encoderGroup struct { // NewEncoderGroup creates a new EncoderGroup instance func NewEncoderGroup(builder RowEventEncoderBuilder, - count int, changefeedID model.ChangeFeedID, + concurrency int, changefeedID model.ChangeFeedID, ) *encoderGroup { - if count <= 0 { - count = config.DefaultEncoderGroupConcurrency + if concurrency <= 0 { + concurrency = config.DefaultEncoderGroupConcurrency } - inputCh := make([]chan *future, count) - for i := 0; i < count; i++ { + inputCh := make([]chan *future, concurrency) + for i := 0; i < concurrency; i++ { inputCh[i] = make(chan *future, defaultInputChanSize) } return &encoderGroup{ changefeedID: changefeedID, - builder: builder, - count: count, - inputCh: inputCh, - index: 0, - outputCh: make(chan *future, defaultInputChanSize*count), + builder: builder, + concurrency: concurrency, + inputCh: inputCh, + index: 0, + outputCh: make(chan *future, defaultInputChanSize*concurrency), } } @@ -89,7 +90,7 @@ func (g *encoderGroup) Run(ctx context.Context) error { zap.String("changefeed", g.changefeedID.ID)) }() eg, ctx := errgroup.WithContext(ctx) - for i := 0; i < g.count; i++ { + for i := 0; i < g.concurrency; i++ { idx := i eg.Go(func() error { return g.runEncoder(ctx, idx) @@ -113,7 +114,7 @@ func (g *encoderGroup) runEncoder(ctx context.Context, idx int) error { metric.Set(float64(len(inputCh))) case future := <-inputCh: for _, event := range future.events { - err := encoder.AppendRowChangedEvent(ctx, future.Topic, event.Event, event.Callback) + err := encoder.AppendRowChangedEvent(ctx, future.Key.Topic, event.Event, event.Callback) if err != nil { return errors.Trace(err) } @@ -126,13 +127,11 @@ func (g *encoderGroup) runEncoder(ctx context.Context, idx int) error { func (g *encoderGroup) AddEvents( ctx context.Context, - topic string, - partition int32, - partitionKey string, + key TopicPartitionKey, events ...*dmlsink.RowChangeCallbackableEvent, ) error { - future := newFuture(topic, partition, partitionKey, events...) - index := atomic.AddUint64(&g.index, 1) % uint64(g.count) + future := newFuture(key, events...) + index := atomic.AddUint64(&g.index, 1) % uint64(g.concurrency) select { case <-ctx.Done(): return ctx.Err() @@ -158,26 +157,22 @@ func (g *encoderGroup) cleanMetrics() { common.CleanMetrics(g.changefeedID) } +// future is a wrapper of the result of encoding events +// It's used to notify the caller that the result is ready. type future struct { - Topic string - Partition int32 - PartitionKey string - events []*dmlsink.RowChangeCallbackableEvent - Messages []*common.Message - - done chan struct{} + Key TopicPartitionKey + events []*dmlsink.RowChangeCallbackableEvent + Messages []*common.Message + done chan struct{} } -func newFuture(topic string, partition int32, partitionKey string, +func newFuture(key TopicPartitionKey, events ...*dmlsink.RowChangeCallbackableEvent, ) *future { return &future{ - Topic: topic, - Partition: partition, - PartitionKey: partitionKey, - events: events, - - done: make(chan struct{}), + Key: key, + events: events, + done: make(chan struct{}), } }