Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mq (ticdc): make some code more readable. #10347

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions cdc/sink/dmlsink/mq/mq_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,24 +146,27 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa
if s.alive.isDead {
return errors.Trace(errors.New("dead dmlSink"))
}
// merge the split row callback into one callback
mergedCallback := func(outCallback func(), totalCount uint64) func() {
var acked atomic.Uint64
// Because we split txn to rows when sending to the MQ.
// So we need to convert the txn level callback to row level callback.
toRowCallback := func(txnCallback func(), totalCount uint64) func() {
var calledCount atomic.Uint64
// The callback of the last row will trigger the callback of the txn.
return func() {
if acked.Add(1) == totalCount {
outCallback()
if calledCount.Inc() == totalCount {
txnCallback()
}
}
}

for _, txn := range txns {
if txn.GetTableSinkState() != state.TableSinkSinking {
// The table where the event comes from is in stopping, so it's safe
// to drop the event directly.
txn.Callback()
continue
}
callback := mergedCallback(txn.Callback, uint64(len(txn.Event.Rows)))

rowCallback := toRowCallback(txn.Callback, uint64(len(txn.Event.Rows)))
for _, row := range txn.Event.Rows {
topic := s.alive.eventRouter.GetTopicForRowChange(row)
partitionNum, err := s.alive.topicManager.GetPartitionNum(s.ctx, topic)
Expand All @@ -181,20 +184,24 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa
s.cancel(err)
return errors.Trace(err)
}

// Note: Calculate the partition index after the transformer is applied.
// Because the transformer may change the row of the event.
index, key, err := s.alive.eventRouter.GetPartitionForRowChange(row, partitionNum)
if err != nil {
s.cancel(err)
return errors.Trace(err)
}

// This never be blocked because this is an unbounded channel.
// We already limit the memory usage by MemoryQuota at SinkManager level.
// So it is safe to send the event to a unbounded channel here.
s.alive.worker.msgChan.In() <- mqEvent{
key: TopicPartitionKey{
key: codec.TopicPartitionKey{
Topic: topic, Partition: index, PartitionKey: key,
},
rowEvent: &dmlsink.RowChangeCallbackableEvent{
Event: row,
Callback: callback,
Callback: rowCallback,
SinkState: txn.SinkState,
},
}
Expand Down
121 changes: 58 additions & 63 deletions cdc/sink/dmlsink/mq/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,17 @@ import (
)

const (
// flushBatchSize is the batch size of the flush worker.
flushBatchSize = 2048
// flushInterval is the interval of the flush worker.
// We should not set it too big, otherwise it will cause we wait too long to send the message.
flushInterval = 15 * time.Millisecond
// batchSize is the maximum size of the number of messages in a batch.
batchSize = 2048
// batchInterval is the interval of the worker to collect a batch of messages.
// It shouldn't be too large, otherwise it will lead to a high latency.
batchInterval = 15 * time.Millisecond
)

// TopicPartitionKey contains the topic and partition key of the message.
type TopicPartitionKey struct {
Topic string
Partition int32
PartitionKey string
}

// mqEvent is the event of the mq worker.
// It carries the topic and partition information of the message.
type mqEvent struct {
key TopicPartitionKey
key codec.TopicPartitionKey
rowEvent *dmlsink.RowChangeCallbackableEvent
}

Expand All @@ -64,7 +57,7 @@ type worker struct {
// msgChan caches the messages to be sent.
// It is an unbounded channel.
msgChan *chann.DrainableChann[mqEvent]
// ticker used to force flush the messages when the interval is reached.
// ticker used to force flush the batched messages when the interval is reached.
ticker *time.Ticker

encoderGroup codec.EncoderGroup
Expand Down Expand Up @@ -94,7 +87,7 @@ func newWorker(
changeFeedID: id,
protocol: protocol,
msgChan: chann.NewAutoDrainChann[mqEvent](),
ticker: time.NewTicker(flushInterval),
ticker: time.NewTicker(batchInterval),
encoderGroup: encoderGroup,
producer: producer,
metricMQWorkerSendMessageDuration: mq.WorkerSendMessageDuration.WithLabelValues(id.Namespace, id.ID),
Expand Down Expand Up @@ -162,9 +155,7 @@ func (w *worker) nonBatchEncodeRun(ctx context.Context) error {
}
if err := w.encoderGroup.AddEvents(
ctx,
event.key.Topic,
event.key.Partition,
event.key.PartitionKey,
event.key,
event.rowEvent); err != nil {
return errors.Trace(err)
}
Expand All @@ -179,99 +170,103 @@ func (w *worker) batchEncodeRun(ctx context.Context) (retErr error) {
zap.String("changefeed", w.changeFeedID.ID),
zap.String("protocol", w.protocol.String()),
)
// Fixed size of the batch.
eventsBuf := make([]mqEvent, flushBatchSize)

msgsBuf := make([]mqEvent, batchSize)
for {
start := time.Now()
endIndex, err := w.batch(ctx, eventsBuf, flushInterval)
msgCount, err := w.batch(ctx, msgsBuf, batchInterval)
if err != nil {
return errors.Trace(err)
}
if endIndex == 0 {
if msgCount == 0 {
continue
}

w.metricMQWorkerBatchSize.Observe(float64(endIndex))
w.metricMQWorkerBatchSize.Observe(float64(msgCount))
w.metricMQWorkerBatchDuration.Observe(time.Since(start).Seconds())
msgs := eventsBuf[:endIndex]
partitionedRows := w.group(msgs)
for key, events := range partitionedRows {
if err := w.encoderGroup.
AddEvents(ctx, key.Topic, key.Partition, key.PartitionKey, events...); err != nil {

msgs := msgsBuf[:msgCount]
// Group messages by its TopicPartitionKey before adding them to the encoder group.
groupedMsgs := w.group(msgs)
for key, msg := range groupedMsgs {
if err := w.encoderGroup.AddEvents(ctx, key, msg...); err != nil {
return errors.Trace(err)
}
}
}
}

// batch collects a batch of messages to be sent to the DML producer.
// batch collects a batch of messages from w.msgChan into buffer.
// It returns the number of messages collected.
// Note: It will block until at least one message is received.
func (w *worker) batch(
ctx context.Context, events []mqEvent, flushInterval time.Duration,
ctx context.Context, buffer []mqEvent, flushInterval time.Duration,
) (int, error) {
index := 0
maxBatchSize := len(events)
msgCount := 0
maxBatchSize := len(buffer)
// We need to receive at least one message or be interrupted,
// otherwise it will lead to idling.
select {
case <-ctx.Done():
return index, ctx.Err()
return msgCount, ctx.Err()
case msg, ok := <-w.msgChan.Out():
if !ok {
log.Warn("MQ sink flush worker channel closed")
return index, nil
return msgCount, nil
}
if msg.rowEvent != nil {
w.statistics.ObserveRows(msg.rowEvent.Event)
events[index] = msg
index++
buffer[msgCount] = msg
msgCount++
}
}

// Start a new tick to flush the batch.
// Reset the ticker to start a new batching.
// We need to stop batching when the interval is reached.
w.ticker.Reset(flushInterval)
for {
select {
case <-ctx.Done():
return index, ctx.Err()
return msgCount, ctx.Err()
case msg, ok := <-w.msgChan.Out():
if !ok {
log.Warn("MQ sink flush worker channel closed")
return index, nil
return msgCount, nil
}

if msg.rowEvent != nil {
w.statistics.ObserveRows(msg.rowEvent.Event)
events[index] = msg
index++
buffer[msgCount] = msg
msgCount++
}

if index >= maxBatchSize {
return index, nil
if msgCount >= maxBatchSize {
return msgCount, nil
}
case <-w.ticker.C:
return index, nil
return msgCount, nil
}
}
}

// group is responsible for grouping messages by the partition.
// group groups messages by its key.
func (w *worker) group(
events []mqEvent,
) map[TopicPartitionKey][]*dmlsink.RowChangeCallbackableEvent {
partitionedRows := make(map[TopicPartitionKey][]*dmlsink.RowChangeCallbackableEvent)
for _, event := range events {
msgs []mqEvent,
) map[codec.TopicPartitionKey][]*dmlsink.RowChangeCallbackableEvent {
groupedMsgs := make(map[codec.TopicPartitionKey][]*dmlsink.RowChangeCallbackableEvent)
for _, msg := range msgs {
// Skip this event when the table is stopping.
if event.rowEvent.GetTableSinkState() != state.TableSinkSinking {
event.rowEvent.Callback()
log.Debug("Skip event of stopped table", zap.Any("event", event.rowEvent))
if msg.rowEvent.GetTableSinkState() != state.TableSinkSinking {
msg.rowEvent.Callback()
log.Debug("Skip event of stopped table", zap.Any("event", msg.rowEvent))
continue
}
if _, ok := partitionedRows[event.key]; !ok {
partitionedRows[event.key] = make([]*dmlsink.RowChangeCallbackableEvent, 0)
if _, ok := groupedMsgs[msg.key]; !ok {
groupedMsgs[msg.key] = make([]*dmlsink.RowChangeCallbackableEvent, 0)
}
partitionedRows[event.key] = append(partitionedRows[event.key], event.rowEvent)
groupedMsgs[msg.key] = append(groupedMsgs[msg.key], msg.rowEvent)
}
return partitionedRows
return groupedMsgs
}

func (w *worker) sendMessages(ctx context.Context) error {
Expand All @@ -285,16 +280,16 @@ func (w *worker) sendMessages(ctx context.Context) error {
}()

var err error
inputCh := w.encoderGroup.Output()
outCh := w.encoderGroup.Output()
for {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-ticker.C:
metric.Set(float64(len(inputCh)))
case future, ok := <-inputCh:
metric.Set(float64(len(outCh)))
case future, ok := <-outCh:
if !ok {
log.Warn("MQ sink encode output channel closed",
log.Warn("MQ sink encoder's output channel closed",
zap.String("namespace", w.changeFeedID.Namespace),
zap.String("changefeed", w.changeFeedID.ID))
return nil
Expand All @@ -305,11 +300,11 @@ func (w *worker) sendMessages(ctx context.Context) error {
for _, message := range future.Messages {
start := time.Now()
if err = w.statistics.RecordBatchExecution(func() (int, int64, error) {
message.SetPartitionKey(future.PartitionKey)
message.SetPartitionKey(future.Key.PartitionKey)
if err := w.producer.AsyncSendMessage(
ctx,
future.Topic,
future.Partition,
future.Key.Topic,
future.Key.Partition,
message); err != nil {
return 0, 0, err
}
Expand Down
24 changes: 12 additions & 12 deletions cdc/sink/dmlsink/mq/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestNonBatchEncode_SendMessages(t *testing.T) {
worker, p := newNonBatchEncodeWorker(ctx, t)
defer worker.close()

key := TopicPartitionKey{
key := codec.TopicPartitionKey{
Topic: "test",
Partition: 1,
}
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestBatchEncode_Batch(t *testing.T) {
defer cancel()
worker, _ := newBatchEncodeWorker(ctx, t)
defer worker.close()
key := TopicPartitionKey{
key := codec.TopicPartitionKey{
Topic: "test",
Partition: 1,
}
Expand Down Expand Up @@ -168,15 +168,15 @@ func TestBatchEncode_Batch(t *testing.T) {
func TestBatchEncode_Group(t *testing.T) {
t.Parallel()

key1 := TopicPartitionKey{
key1 := codec.TopicPartitionKey{
Topic: "test",
Partition: 1,
}
key2 := TopicPartitionKey{
key2 := codec.TopicPartitionKey{
Topic: "test",
Partition: 2,
}
key3 := TopicPartitionKey{
key3 := codec.TopicPartitionKey{
Topic: "test1",
Partition: 2,
}
Expand Down Expand Up @@ -272,11 +272,11 @@ func TestBatchEncode_GroupWhenTableStopping(t *testing.T) {
tableInfo := model.WrapTableInfo(0, "test", 1, job.BinlogInfo.TableInfo)
_, _, colInfo := tableInfo.GetRowColInfos()

key1 := TopicPartitionKey{
key1 := codec.TopicPartitionKey{
Topic: "test",
Partition: 1,
}
key2 := TopicPartitionKey{
key2 := codec.TopicPartitionKey{
Topic: "test",
Partition: 2,
}
Expand Down Expand Up @@ -339,15 +339,15 @@ func TestBatchEncode_GroupWhenTableStopping(t *testing.T) {
}

func TestBatchEncode_SendMessages(t *testing.T) {
key1 := TopicPartitionKey{
key1 := codec.TopicPartitionKey{
Topic: "test",
Partition: 1,
}
key2 := TopicPartitionKey{
key2 := codec.TopicPartitionKey{
Topic: "test",
Partition: 2,
}
key3 := TopicPartitionKey{
key3 := codec.TopicPartitionKey{
Topic: "test1",
Partition: 2,
}
Expand Down Expand Up @@ -502,11 +502,11 @@ func TestBatchEncodeWorker_Abort(t *testing.T) {
}

func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
key1 := TopicPartitionKey{
key1 := codec.TopicPartitionKey{
Topic: "test",
Partition: 1,
}
key2 := TopicPartitionKey{
key2 := codec.TopicPartitionKey{
Topic: "test",
Partition: 2,
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/sink/codec/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,10 @@ func IsColumnValueEqual(preValue, updatedValue interface{}) bool {
// the value type should be the same
return preValue == updatedValue
}

// TopicPartitionKey contains the topic and partition key of the message.
type TopicPartitionKey struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this should not be a part of the codec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put it here can make the import graph happier, and actually, the encoderGroup in the codec package already relies on the topic and partition information.

Topic string
Partition int32
PartitionKey string
}
Loading
Loading