From 0243014fe1a956d7df8571bc4b1a86bf8939e860 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 25 Oct 2023 18:02:55 +0800 Subject: [PATCH 1/4] fix mq query partition. --- cdc/sink/dmlsink/mq/manager/kafka_manager.go | 13 +++++++++++ .../dmlsink/mq/manager/kafka_manager_test.go | 23 +++++++++++-------- cdc/sink/util/helper.go | 2 +- 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/cdc/sink/dmlsink/mq/manager/kafka_manager.go b/cdc/sink/dmlsink/mq/manager/kafka_manager.go index ea8c3991d3b..f5de129ddc4 100644 --- a/cdc/sink/dmlsink/mq/manager/kafka_manager.go +++ b/cdc/sink/dmlsink/mq/manager/kafka_manager.go @@ -40,6 +40,8 @@ const ( type kafkaTopicManager struct { changefeedID model.ChangeFeedID + defaultTopic string + admin kafka.ClusterAdminClient cfg *kafka.AutoCreateTopicConfig @@ -55,11 +57,13 @@ type kafkaTopicManager struct { // NewKafkaTopicManager creates a new topic manager. func NewKafkaTopicManager( ctx context.Context, + defaultTopic string, changefeedID model.ChangeFeedID, admin kafka.ClusterAdminClient, cfg *kafka.AutoCreateTopicConfig, ) *kafkaTopicManager { mgr := &kafkaTopicManager{ + defaultTopic: defaultTopic, changefeedID: changefeedID, admin: admin, cfg: cfg, @@ -165,6 +169,15 @@ func (m *kafkaTopicManager) fetchAllTopicsPartitionsNum( return nil, err } + // it may happen the following case: + // 1. use create the default topic with partition number set as 3 manually + // 2. set the partition-number as 2 in the sink-uri. + // in the such case, we should use 2 instead of 3 as the partition number. + _, ok := numPartitions[m.defaultTopic] + if ok { + numPartitions[m.defaultTopic] = m.cfg.PartitionNum + } + log.Info( "Kafka admin client describe topics success", zap.String("namespace", m.changefeedID.Namespace), diff --git a/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go b/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go index 837ca1139d2..66f01896e54 100644 --- a/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go +++ b/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go @@ -33,8 +33,9 @@ func TestPartitions(t *testing.T) { ReplicationFactor: 1, } + changefeedID := model.DefaultChangeFeedID("test") ctx := context.Background() - manager := NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg) + manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg) defer manager.Close() partitionsNum, err := manager.GetPartitionNum( @@ -55,8 +56,9 @@ func TestCreateTopic(t *testing.T) { ReplicationFactor: 1, } + changefeedID := model.DefaultChangeFeedID("test") ctx := context.Background() - manager := NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg) + manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg) defer manager.Close() partitionNum, err := manager.CreateTopicAndWaitUntilVisible(ctx, kafka.DefaultMockTopicName) require.Nil(t, err) @@ -71,7 +73,7 @@ func TestCreateTopic(t *testing.T) { // Try to create a topic without auto create. cfg.AutoCreate = false - manager = NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg) + manager = NewKafkaTopicManager(ctx, "new-topic2", changefeedID, adminClient, cfg) defer manager.Close() _, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic2") require.Regexp( @@ -80,6 +82,7 @@ func TestCreateTopic(t *testing.T) { err, ) + topic := "new-topic-failed" // Invalid replication factor. // It happens when replication-factor is greater than the number of brokers. cfg = &kafka.AutoCreateTopicConfig{ @@ -87,9 +90,9 @@ func TestCreateTopic(t *testing.T) { PartitionNum: 2, ReplicationFactor: 4, } - manager = NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg) + manager = NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg) defer manager.Close() - _, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic-failed") + _, err = manager.CreateTopicAndWaitUntilVisible(ctx, topic) require.Regexp( t, "kafka create topic failed: kafka server: Replication-factor is invalid", @@ -108,14 +111,16 @@ func TestCreateTopicWithDelay(t *testing.T) { ReplicationFactor: 1, } + topic := "new_topic" + changefeedID := model.DefaultChangeFeedID("test") ctx := context.Background() - manager := NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg) + manager := NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg) defer manager.Close() - partitionNum, err := manager.createTopic(ctx, "new_topic") + partitionNum, err := manager.createTopic(ctx, topic) require.Nil(t, err) - err = adminClient.SetRemainingFetchesUntilTopicVisible("new_topic", 3) + err = adminClient.SetRemainingFetchesUntilTopicVisible(topic, 3) require.Nil(t, err) - err = manager.waitUntilTopicVisible(ctx, "new_topic") + err = manager.waitUntilTopicVisible(ctx, topic) require.Nil(t, err) require.Equal(t, int32(2), partitionNum) } diff --git a/cdc/sink/util/helper.go b/cdc/sink/util/helper.go index 4ff6db3e6d0..cec952cbe5c 100644 --- a/cdc/sink/util/helper.go +++ b/cdc/sink/util/helper.go @@ -97,7 +97,7 @@ func GetTopicManagerAndTryCreateTopic( adminClient kafka.ClusterAdminClient, ) (manager.TopicManager, error) { topicManager := manager.NewKafkaTopicManager( - ctx, changefeedID, adminClient, topicCfg, + ctx, topic, changefeedID, adminClient, topicCfg, ) if _, err := topicManager.CreateTopicAndWaitUntilVisible(ctx, topic); err != nil { From b123944a6fa7be3d7c1edbac53be3029f48d9509 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 26 Oct 2023 15:12:51 +0800 Subject: [PATCH 2/4] add unit test --- cdc/sink/dmlsink/mq/manager/kafka_manager.go | 8 +++- .../dmlsink/mq/manager/kafka_manager_test.go | 37 ++++--------------- cdc/sink/util/helper_test.go | 33 +++++++++++++++++ 3 files changed, 46 insertions(+), 32 deletions(-) diff --git a/cdc/sink/dmlsink/mq/manager/kafka_manager.go b/cdc/sink/dmlsink/mq/manager/kafka_manager.go index f5de129ddc4..a9ba9b0ccb4 100644 --- a/cdc/sink/dmlsink/mq/manager/kafka_manager.go +++ b/cdc/sink/dmlsink/mq/manager/kafka_manager.go @@ -284,8 +284,12 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible( return 0, errors.Trace(err) } if detail, ok := topicDetails[topicName]; ok { - m.tryUpdatePartitionsAndLogging(topicName, detail.NumPartitions) - return detail.NumPartitions, nil + numPartition := detail.NumPartitions + if topicName == m.defaultTopic { + numPartition = m.cfg.PartitionNum + } + m.tryUpdatePartitionsAndLogging(topicName, numPartition) + return numPartition, nil } partitionNum, err := m.createTopic(ctx, topicName) diff --git a/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go b/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go index 66f01896e54..d4fb885502d 100644 --- a/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go +++ b/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go @@ -22,29 +22,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestPartitions(t *testing.T) { - t.Parallel() - - adminClient := kafka.NewClusterAdminClientMockImpl() - defer adminClient.Close() - cfg := &kafka.AutoCreateTopicConfig{ - AutoCreate: true, - PartitionNum: 2, - ReplicationFactor: 1, - } - - changefeedID := model.DefaultChangeFeedID("test") - ctx := context.Background() - manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg) - defer manager.Close() - - partitionsNum, err := manager.GetPartitionNum( - ctx, - kafka.DefaultMockTopicName) - require.Nil(t, err) - require.Equal(t, int32(3), partitionsNum) -} - func TestCreateTopic(t *testing.T) { t.Parallel() @@ -61,14 +38,14 @@ func TestCreateTopic(t *testing.T) { manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg) defer manager.Close() partitionNum, err := manager.CreateTopicAndWaitUntilVisible(ctx, kafka.DefaultMockTopicName) - require.Nil(t, err) - require.Equal(t, int32(3), partitionNum) + require.NoError(t, err) + require.Equal(t, int32(2), partitionNum) partitionNum, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic") - require.Nil(t, err) + require.NoError(t, err) require.Equal(t, int32(2), partitionNum) partitionsNum, err := manager.GetPartitionNum(ctx, "new-topic") - require.Nil(t, err) + require.NoError(t, err) require.Equal(t, int32(2), partitionsNum) // Try to create a topic without auto create. @@ -117,10 +94,10 @@ func TestCreateTopicWithDelay(t *testing.T) { manager := NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg) defer manager.Close() partitionNum, err := manager.createTopic(ctx, topic) - require.Nil(t, err) + require.NoError(t, err) err = adminClient.SetRemainingFetchesUntilTopicVisible(topic, 3) - require.Nil(t, err) + require.NoError(t, err) err = manager.waitUntilTopicVisible(ctx, topic) - require.Nil(t, err) + require.NoError(t, err) require.Equal(t, int32(2), partitionNum) } diff --git a/cdc/sink/util/helper_test.go b/cdc/sink/util/helper_test.go index ff07f9c2d4d..913ca3253b7 100644 --- a/cdc/sink/util/helper_test.go +++ b/cdc/sink/util/helper_test.go @@ -14,12 +14,45 @@ package util import ( + "context" "net/url" "testing" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/sink/kafka" "github.com/stretchr/testify/require" ) +func TestPartition(t *testing.T) { + t.Parallel() + + adminClient := kafka.NewClusterAdminClientMockImpl() + defer adminClient.Close() + + cfg := &kafka.AutoCreateTopicConfig{ + AutoCreate: true, + PartitionNum: 2, + ReplicationFactor: 1, + } + + changefeedID := model.DefaultChangeFeedID("test") + ctx := context.Background() + + manager, err := GetTopicManagerAndTryCreateTopic(ctx, changefeedID, kafka.DefaultMockTopicName, cfg, adminClient) + require.NoError(t, err) + defer manager.Close() + + // default topic, real partition is 3, but 2 is set in the sink-uri, so return 2. + partitionsNum, err := manager.GetPartitionNum(ctx, kafka.DefaultMockTopicName) + require.NoError(t, err) + require.Equal(t, int32(2), partitionsNum) + + // new topic, create it with partition number as 2. + partitionsNum, err = manager.GetPartitionNum(ctx, "new-topic") + require.NoError(t, err) + require.Equal(t, int32(2), partitionsNum) +} + func TestGetTopic(t *testing.T) { t.Parallel() From 2fd31d0fd6af8e94c113787f0382e90a9506ce53 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 26 Oct 2023 18:33:49 +0800 Subject: [PATCH 3/4] fix unit test. --- cdc/sink/ddlsink/mq/mq_ddl_sink_test.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/cdc/sink/ddlsink/mq/mq_ddl_sink_test.go b/cdc/sink/ddlsink/mq/mq_ddl_sink_test.go index 4403128cdd9..6408a1a7ecb 100644 --- a/cdc/sink/ddlsink/mq/mq_ddl_sink_test.go +++ b/cdc/sink/ddlsink/mq/mq_ddl_sink_test.go @@ -59,8 +59,9 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // partition-number is 2, so only send DDL events to 2 partitions. uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" + - "&max-message-bytes=1048576&partition-num=1" + + "&max-message-bytes=1048576&partition-num=2" + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol" uri := fmt.Sprintf(uriTemplate, "127.0.0.1:9092", kafka.DefaultMockTopicName) @@ -89,10 +90,9 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) { err = s.WriteDDLEvent(ctx, ddl) require.NoError(t, err) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(), - 3, "All partitions should be broadcast") + 2, "All partitions should be broadcast") require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 1), 1) - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 2), 1) } func TestWriteDDLEventToZeroPartition(t *testing.T) { @@ -144,8 +144,9 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // partition-num is set to 2, so send checkpoint to 2 partitions. uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" + - "&max-message-bytes=1048576&partition-num=1" + + "&max-message-bytes=1048576&partition-num=2" + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip" + "&protocol=canal-json&enable-tidb-extension=true" uri := fmt.Sprintf(uriTemplate, "127.0.0.1:9092", kafka.DefaultMockTopicName) @@ -169,10 +170,9 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) { require.Nil(t, err) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(), - 3, "All partitions should be broadcast") + 2, "All partitions should be broadcast") require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 1), 1) - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 2), 1) } func TestWriteCheckpointTsToTableTopics(t *testing.T) { @@ -233,10 +233,8 @@ func TestWriteCheckpointTsToTableTopics(t *testing.T) { require.NoError(t, err) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(), - 6, "All topics and partitions should be broadcast") + 4, "All topics and partitions should be broadcast") require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1) - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 1), 1) - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 2), 1) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person", 0), 1) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person1", 0), 1) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person2", 0), 1) From a96269a141e3cf553ac855b5b3a6214f61e83f09 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 27 Oct 2023 14:46:40 +0800 Subject: [PATCH 4/4] add one unit test. --- cdc/sink/dmlsink/mq/manager/kafka_manager.go | 2 +- cdc/sink/util/helper_test.go | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/cdc/sink/dmlsink/mq/manager/kafka_manager.go b/cdc/sink/dmlsink/mq/manager/kafka_manager.go index a9ba9b0ccb4..bd0af80fdc5 100644 --- a/cdc/sink/dmlsink/mq/manager/kafka_manager.go +++ b/cdc/sink/dmlsink/mq/manager/kafka_manager.go @@ -170,7 +170,7 @@ func (m *kafkaTopicManager) fetchAllTopicsPartitionsNum( } // it may happen the following case: - // 1. use create the default topic with partition number set as 3 manually + // 1. user create the default topic with partition number set as 3 manually // 2. set the partition-number as 2 in the sink-uri. // in the such case, we should use 2 instead of 3 as the partition number. _, ok := numPartitions[m.defaultTopic] diff --git a/cdc/sink/util/helper_test.go b/cdc/sink/util/helper_test.go index 913ca3253b7..31f9f7cc838 100644 --- a/cdc/sink/util/helper_test.go +++ b/cdc/sink/util/helper_test.go @@ -51,6 +51,17 @@ func TestPartition(t *testing.T) { partitionsNum, err = manager.GetPartitionNum(ctx, "new-topic") require.NoError(t, err) require.Equal(t, int32(2), partitionsNum) + + // assume a topic already exist, the not default topic won't be affected by the default topic's partition number. + err = adminClient.CreateTopic(ctx, &kafka.TopicDetail{ + Name: "new-topic-2", + NumPartitions: 3, + }, false) + require.NoError(t, err) + + partitionsNum, err = manager.GetPartitionNum(ctx, "new-topic-2") + require.NoError(t, err) + require.Equal(t, int32(3), partitionsNum) } func TestGetTopic(t *testing.T) {