From 485aa85adbc9430129e125d3bf7838d5034392d7 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Mon, 30 Oct 2023 04:15:36 -0500 Subject: [PATCH 1/7] This is an automated cherry-pick of #9955 Signed-off-by: ti-chi-bot --- cdc/sink/mq/manager/kafka_manager.go | 27 ++++++++++++++ cdc/sink/mq/manager/kafka_manager_test.go | 44 +++++++++++++++++++++++ cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go | 32 +++++++++++++++-- cdc/sinkv2/util/helper.go | 8 +++++ cdc/sinkv2/util/helper_test.go | 44 +++++++++++++++++++++++ 5 files changed, 153 insertions(+), 2 deletions(-) diff --git a/cdc/sink/mq/manager/kafka_manager.go b/cdc/sink/mq/manager/kafka_manager.go index 99e78858f8f..93221a6c46c 100644 --- a/cdc/sink/mq/manager/kafka_manager.go +++ b/cdc/sink/mq/manager/kafka_manager.go @@ -44,6 +44,8 @@ const ( type kafkaTopicManager struct { changefeedID model.ChangeFeedID + defaultTopic string + admin kafka.ClusterAdminClient cfg *kafkaconfig.AutoCreateTopicConfig @@ -59,11 +61,17 @@ type kafkaTopicManager struct { // NewKafkaTopicManager creates a new topic manager. func NewKafkaTopicManager( ctx context.Context, +<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager.go +======= + defaultTopic string, + changefeedID model.ChangeFeedID, +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager.go admin kafka.ClusterAdminClient, cfg *kafkaconfig.AutoCreateTopicConfig, ) *kafkaTopicManager { changefeedID := contextutil.ChangefeedIDFromCtx(ctx) mgr := &kafkaTopicManager{ + defaultTopic: defaultTopic, changefeedID: changefeedID, admin: admin, cfg: cfg, @@ -172,6 +180,15 @@ func (m *kafkaTopicManager) getMetadataOfTopics() ([]*sarama.TopicMetadata, erro return nil, err } + // it may happen the following case: + // 1. user create the default topic with partition number set as 3 manually + // 2. set the partition-number as 2 in the sink-uri. + // in the such case, we should use 2 instead of 3 as the partition number. + _, ok := numPartitions[m.defaultTopic] + if ok { + numPartitions[m.defaultTopic] = m.cfg.PartitionNum + } + log.Info( "Kafka admin client describe topics success", zap.String("namespace", m.changefeedID.Namespace), @@ -287,6 +304,7 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(topicName string) (in if err != nil { return 0, errors.Trace(err) } +<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager.go for _, detail := range topicDetails { if detail.Err == sarama.ErrNoError { if detail.Name == topicName { @@ -307,6 +325,15 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(topicName string) (in zap.Error(detail.Err)) return 0, errors.Trace(detail.Err) } +======= + if detail, ok := topicDetails[topicName]; ok { + numPartition := detail.NumPartitions + if topicName == m.defaultTopic { + numPartition = m.cfg.PartitionNum + } + m.tryUpdatePartitionsAndLogging(topicName, numPartition) + return numPartition, nil +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager.go } partitionNum, err := m.createTopic(topicName) diff --git a/cdc/sink/mq/manager/kafka_manager_test.go b/cdc/sink/mq/manager/kafka_manager_test.go index a1d61963770..4bf405561ce 100644 --- a/cdc/sink/mq/manager/kafka_manager_test.go +++ b/cdc/sink/mq/manager/kafka_manager_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" ) +<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager_test.go func TestPartitions(t *testing.T) { t.Parallel() @@ -42,6 +43,8 @@ func TestPartitions(t *testing.T) { require.Equal(t, int32(3), partitionsNum) } +======= +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager_test.go func TestCreateTopic(t *testing.T) { t.Parallel() @@ -57,6 +60,7 @@ func TestCreateTopic(t *testing.T) { ReplicationFactor: 1, } +<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager_test.go manager := NewKafkaTopicManager(ctx, adminClient, cfg) defer manager.Close() partitionNum, err := manager.CreateTopicAndWaitUntilVisible(kafka.DefaultMockTopicName) @@ -68,11 +72,30 @@ func TestCreateTopic(t *testing.T) { require.Equal(t, int32(2), partitionNum) partitionsNum, err := manager.GetPartitionNum("new-topic") require.Nil(t, err) +======= + changefeedID := model.DefaultChangeFeedID("test") + ctx := context.Background() + manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg) + defer manager.Close() + partitionNum, err := manager.CreateTopicAndWaitUntilVisible(ctx, kafka.DefaultMockTopicName) + require.NoError(t, err) + require.Equal(t, int32(2), partitionNum) + + partitionNum, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic") + require.NoError(t, err) + require.Equal(t, int32(2), partitionNum) + partitionsNum, err := manager.GetPartitionNum(ctx, "new-topic") + require.NoError(t, err) +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager_test.go require.Equal(t, int32(2), partitionsNum) // Try to create a topic without auto create. cfg.AutoCreate = false +<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager_test.go manager = NewKafkaTopicManager(ctx, adminClient, cfg) +======= + manager = NewKafkaTopicManager(ctx, "new-topic2", changefeedID, adminClient, cfg) +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager_test.go defer manager.Close() _, err = manager.CreateTopicAndWaitUntilVisible("new-topic2") require.Regexp( @@ -81,6 +104,7 @@ func TestCreateTopic(t *testing.T) { err, ) + topic := "new-topic-failed" // Invalid replication factor. // It happens when replication-factor is greater than the number of brokers. cfg = &kafkaconfig.AutoCreateTopicConfig{ @@ -88,9 +112,15 @@ func TestCreateTopic(t *testing.T) { PartitionNum: 2, ReplicationFactor: 4, } +<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager_test.go manager = NewKafkaTopicManager(ctx, adminClient, cfg) defer manager.Close() _, err = manager.CreateTopicAndWaitUntilVisible("new-topic-failed") +======= + manager = NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg) + defer manager.Close() + _, err = manager.CreateTopicAndWaitUntilVisible(ctx, topic) +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager_test.go require.Regexp( t, "new sarama producer: kafka server: Replication-factor is invalid", @@ -111,6 +141,7 @@ func TestCreateTopicWithDelay(t *testing.T) { ReplicationFactor: 1, } +<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager_test.go manager := NewKafkaTopicManager(context.TODO(), adminClient, cfg) defer manager.Close() partitionNum, err := manager.createTopic("new_topic") @@ -119,5 +150,18 @@ func TestCreateTopicWithDelay(t *testing.T) { require.Nil(t, err) err = manager.waitUntilTopicVisible("new_topic") require.Nil(t, err) +======= + topic := "new_topic" + changefeedID := model.DefaultChangeFeedID("test") + ctx := context.Background() + manager := NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg) + defer manager.Close() + partitionNum, err := manager.createTopic(ctx, topic) + require.NoError(t, err) + err = adminClient.SetRemainingFetchesUntilTopicVisible(topic, 3) + require.NoError(t, err) + err = manager.waitUntilTopicVisible(ctx, topic) + require.NoError(t, err) +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager_test.go require.Equal(t, int32(2), partitionNum) } diff --git a/cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go b/cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go index d386fe39ddb..967d6594ba6 100644 --- a/cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go +++ b/cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go @@ -80,10 +80,14 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() +<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go leader, topic := initBroker(t, kafka.DefaultMockPartitionNum) defer leader.Close() +======= + // partition-number is 2, so only send DDL events to 2 partitions. +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" + - "&max-message-bytes=1048576&partition-num=1" + + "&max-message-bytes=1048576&partition-num=2" + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol" uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic) @@ -110,6 +114,7 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) { err = s.WriteDDLEvent(ctx, ddl) require.Nil(t, err) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(), +<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go 3, "All partitions should be broadcast") require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{ Topic: "mock_topic", @@ -123,6 +128,11 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) { Topic: "mock_topic", Partition: 2, }), 1) +======= + 2, "All partitions should be broadcast") + require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1) + require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 1), 1) +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go } func TestWriteDDLEventToZeroPartition(t *testing.T) { @@ -182,10 +192,14 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() +<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go leader, topic := initBroker(t, kafka.DefaultMockPartitionNum) defer leader.Close() +======= + // partition-num is set to 2, so send checkpoint to 2 partitions. +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" + - "&max-message-bytes=1048576&partition-num=1" + + "&max-message-bytes=1048576&partition-num=2" + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip" + "&protocol=canal-json&enable-tidb-extension=true" uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic) @@ -206,6 +220,7 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) { require.Nil(t, err) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(), +<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go 3, "All partitions should be broadcast") require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{ Topic: "mock_topic", @@ -219,6 +234,11 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) { Topic: "mock_topic", Partition: 2, }), 1) +======= + 2, "All partitions should be broadcast") + require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1) + require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 1), 1) +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go } func TestWriteCheckpointTsToTableTopics(t *testing.T) { @@ -278,6 +298,7 @@ func TestWriteCheckpointTsToTableTopics(t *testing.T) { require.Nil(t, err) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(), +<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go 6, "All topics and partitions should be broadcast") require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{ Topic: "mock_topic", @@ -303,6 +324,13 @@ func TestWriteCheckpointTsToTableTopics(t *testing.T) { Topic: "cdc_person2", Partition: 0, }), 1) +======= + 4, "All topics and partitions should be broadcast") + require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1) + require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person", 0), 1) + require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person1", 0), 1) + require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person2", 0), 1) +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go } func TestWriteCheckpointTsWhenCanalJsonTiDBExtensionIsDisable(t *testing.T) { diff --git a/cdc/sinkv2/util/helper.go b/cdc/sinkv2/util/helper.go index aa49156a944..cfc7bb62940 100644 --- a/cdc/sinkv2/util/helper.go +++ b/cdc/sinkv2/util/helper.go @@ -94,8 +94,16 @@ func GetTopicManagerAndTryCreateTopic( topicCfg *kafka.AutoCreateTopicConfig, adminClient pkafka.ClusterAdminClient, ) (manager.TopicManager, error) { +<<<<<<< HEAD:cdc/sinkv2/util/helper.go topicManager := manager.NewKafkaTopicManager(ctx, adminClient, topicCfg) if _, err := topicManager.CreateTopicAndWaitUntilVisible(topic); err != nil { +======= + topicManager := manager.NewKafkaTopicManager( + ctx, topic, changefeedID, adminClient, topicCfg, + ) + + if _, err := topicManager.CreateTopicAndWaitUntilVisible(ctx, topic); err != nil { +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/util/helper.go return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err) } diff --git a/cdc/sinkv2/util/helper_test.go b/cdc/sinkv2/util/helper_test.go index df7911a8595..6db38bb54d5 100644 --- a/cdc/sinkv2/util/helper_test.go +++ b/cdc/sinkv2/util/helper_test.go @@ -14,12 +14,56 @@ package util import ( + "context" "net/url" "testing" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/sink/kafka" "github.com/stretchr/testify/require" ) +func TestPartition(t *testing.T) { + t.Parallel() + + adminClient := kafka.NewClusterAdminClientMockImpl() + defer adminClient.Close() + + cfg := &kafka.AutoCreateTopicConfig{ + AutoCreate: true, + PartitionNum: 2, + ReplicationFactor: 1, + } + + changefeedID := model.DefaultChangeFeedID("test") + ctx := context.Background() + + manager, err := GetTopicManagerAndTryCreateTopic(ctx, changefeedID, kafka.DefaultMockTopicName, cfg, adminClient) + require.NoError(t, err) + defer manager.Close() + + // default topic, real partition is 3, but 2 is set in the sink-uri, so return 2. + partitionsNum, err := manager.GetPartitionNum(ctx, kafka.DefaultMockTopicName) + require.NoError(t, err) + require.Equal(t, int32(2), partitionsNum) + + // new topic, create it with partition number as 2. + partitionsNum, err = manager.GetPartitionNum(ctx, "new-topic") + require.NoError(t, err) + require.Equal(t, int32(2), partitionsNum) + + // assume a topic already exist, the not default topic won't be affected by the default topic's partition number. + err = adminClient.CreateTopic(ctx, &kafka.TopicDetail{ + Name: "new-topic-2", + NumPartitions: 3, + }, false) + require.NoError(t, err) + + partitionsNum, err = manager.GetPartitionNum(ctx, "new-topic-2") + require.NoError(t, err) + require.Equal(t, int32(3), partitionsNum) +} + func TestGetTopic(t *testing.T) { t.Parallel() From 17e906b89f648e5fb81d023a8ee630647d1c2572 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 8 Nov 2023 15:37:20 +0800 Subject: [PATCH 2/7] fix kafka manager. --- cdc/sink/mq/manager/kafka_manager.go | 34 ++++--------- cdc/sink/mq/manager/kafka_manager_test.go | 60 +++++------------------ 2 files changed, 22 insertions(+), 72 deletions(-) diff --git a/cdc/sink/mq/manager/kafka_manager.go b/cdc/sink/mq/manager/kafka_manager.go index 93221a6c46c..18ede4cc745 100644 --- a/cdc/sink/mq/manager/kafka_manager.go +++ b/cdc/sink/mq/manager/kafka_manager.go @@ -23,7 +23,6 @@ import ( "github.com/Shopify/sarama" "github.com/pingcap/errors" "github.com/pingcap/log" - "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" kafkaconfig "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -61,15 +60,11 @@ type kafkaTopicManager struct { // NewKafkaTopicManager creates a new topic manager. func NewKafkaTopicManager( ctx context.Context, -<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager.go -======= defaultTopic string, changefeedID model.ChangeFeedID, ->>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager.go admin kafka.ClusterAdminClient, cfg *kafkaconfig.AutoCreateTopicConfig, ) *kafkaTopicManager { - changefeedID := contextutil.ChangefeedIDFromCtx(ctx) mgr := &kafkaTopicManager{ defaultTopic: defaultTopic, changefeedID: changefeedID, @@ -121,8 +116,15 @@ func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) { zap.Error(err)) } + // it may happen the following case: + // 1. user create the default topic with partition number set as 3 manually + // 2. set the partition-number as 2 in the sink-uri. + // in the such case, we should use 2 instead of 3 as the partition number. for _, detail := range topicMetaList { partitionNum := int32(len(detail.Partitions)) + if detail.Name == m.defaultTopic { + partitionNum = m.cfg.PartitionNum + } m.tryUpdatePartitionsAndLogging(detail.Name, partitionNum) } @@ -180,15 +182,6 @@ func (m *kafkaTopicManager) getMetadataOfTopics() ([]*sarama.TopicMetadata, erro return nil, err } - // it may happen the following case: - // 1. user create the default topic with partition number set as 3 manually - // 2. set the partition-number as 2 in the sink-uri. - // in the such case, we should use 2 instead of 3 as the partition number. - _, ok := numPartitions[m.defaultTopic] - if ok { - numPartitions[m.defaultTopic] = m.cfg.PartitionNum - } - log.Info( "Kafka admin client describe topics success", zap.String("namespace", m.changefeedID.Namespace), @@ -304,7 +297,6 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(topicName string) (in if err != nil { return 0, errors.Trace(err) } -<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager.go for _, detail := range topicDetails { if detail.Err == sarama.ErrNoError { if detail.Name == topicName { @@ -314,6 +306,9 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(topicName string) (in zap.String("topic", topicName), zap.Int32("partitionNumber", int32(len(detail.Partitions)))) partitionNum := int32(len(detail.Partitions)) + if topicName == m.defaultTopic { + partitionNum = m.cfg.PartitionNum + } m.tryUpdatePartitionsAndLogging(topicName, partitionNum) return partitionNum, nil } @@ -325,15 +320,6 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(topicName string) (in zap.Error(detail.Err)) return 0, errors.Trace(detail.Err) } -======= - if detail, ok := topicDetails[topicName]; ok { - numPartition := detail.NumPartitions - if topicName == m.defaultTopic { - numPartition = m.cfg.PartitionNum - } - m.tryUpdatePartitionsAndLogging(topicName, numPartition) - return numPartition, nil ->>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager.go } partitionNum, err := m.createTopic(topicName) diff --git a/cdc/sink/mq/manager/kafka_manager_test.go b/cdc/sink/mq/manager/kafka_manager_test.go index 4bf405561ce..4e87085eaac 100644 --- a/cdc/sink/mq/manager/kafka_manager_test.go +++ b/cdc/sink/mq/manager/kafka_manager_test.go @@ -17,12 +17,12 @@ import ( "context" "testing" + "github.com/pingcap/tiflow/cdc/model" kafkaconfig "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka" "github.com/pingcap/tiflow/pkg/sink/kafka" "github.com/stretchr/testify/require" ) -<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager_test.go func TestPartitions(t *testing.T) { t.Parallel() @@ -36,15 +36,15 @@ func TestPartitions(t *testing.T) { ReplicationFactor: 1, } - manager := NewKafkaTopicManager(context.TODO(), adminClient, cfg) + ctx := context.Background() + changefeedID := model.DefaultChangeFeedID("test") + manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg) defer manager.Close() partitionsNum, err := manager.GetPartitionNum(kafka.DefaultMockTopicName) - require.Nil(t, err) - require.Equal(t, int32(3), partitionsNum) + require.NoError(t, err) + require.Equal(t, int32(2), partitionsNum) } -======= ->>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager_test.go func TestCreateTopic(t *testing.T) { t.Parallel() @@ -60,42 +60,23 @@ func TestCreateTopic(t *testing.T) { ReplicationFactor: 1, } -<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager_test.go - manager := NewKafkaTopicManager(ctx, adminClient, cfg) - defer manager.Close() - partitionNum, err := manager.CreateTopicAndWaitUntilVisible(kafka.DefaultMockTopicName) - require.Nil(t, err) - require.Equal(t, int32(3), partitionNum) - - partitionNum, err = manager.CreateTopicAndWaitUntilVisible("new-topic") - require.Nil(t, err) - require.Equal(t, int32(2), partitionNum) - partitionsNum, err := manager.GetPartitionNum("new-topic") - require.Nil(t, err) -======= changefeedID := model.DefaultChangeFeedID("test") - ctx := context.Background() manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg) defer manager.Close() - partitionNum, err := manager.CreateTopicAndWaitUntilVisible(ctx, kafka.DefaultMockTopicName) + partitionNum, err := manager.CreateTopicAndWaitUntilVisible(kafka.DefaultMockTopicName) require.NoError(t, err) require.Equal(t, int32(2), partitionNum) - partitionNum, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic") + partitionNum, err = manager.CreateTopicAndWaitUntilVisible("new-topic") require.NoError(t, err) require.Equal(t, int32(2), partitionNum) - partitionsNum, err := manager.GetPartitionNum(ctx, "new-topic") + partitionsNum, err := manager.GetPartitionNum("new-topic") require.NoError(t, err) ->>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager_test.go require.Equal(t, int32(2), partitionsNum) // Try to create a topic without auto create. cfg.AutoCreate = false -<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager_test.go - manager = NewKafkaTopicManager(ctx, adminClient, cfg) -======= manager = NewKafkaTopicManager(ctx, "new-topic2", changefeedID, adminClient, cfg) ->>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager_test.go defer manager.Close() _, err = manager.CreateTopicAndWaitUntilVisible("new-topic2") require.Regexp( @@ -112,15 +93,9 @@ func TestCreateTopic(t *testing.T) { PartitionNum: 2, ReplicationFactor: 4, } -<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager_test.go - manager = NewKafkaTopicManager(ctx, adminClient, cfg) - defer manager.Close() - _, err = manager.CreateTopicAndWaitUntilVisible("new-topic-failed") -======= manager = NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg) defer manager.Close() - _, err = manager.CreateTopicAndWaitUntilVisible(ctx, topic) ->>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager_test.go + _, err = manager.CreateTopicAndWaitUntilVisible(topic) require.Regexp( t, "new sarama producer: kafka server: Replication-factor is invalid", @@ -141,27 +116,16 @@ func TestCreateTopicWithDelay(t *testing.T) { ReplicationFactor: 1, } -<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager_test.go - manager := NewKafkaTopicManager(context.TODO(), adminClient, cfg) - defer manager.Close() - partitionNum, err := manager.createTopic("new_topic") - require.Nil(t, err) - err = adminClient.SetRemainingFetchesUntilTopicVisible("new_topic", 3) - require.Nil(t, err) - err = manager.waitUntilTopicVisible("new_topic") - require.Nil(t, err) -======= topic := "new_topic" changefeedID := model.DefaultChangeFeedID("test") ctx := context.Background() manager := NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg) defer manager.Close() - partitionNum, err := manager.createTopic(ctx, topic) + partitionNum, err := manager.createTopic(topic) require.NoError(t, err) err = adminClient.SetRemainingFetchesUntilTopicVisible(topic, 3) require.NoError(t, err) - err = manager.waitUntilTopicVisible(ctx, topic) + err = manager.waitUntilTopicVisible(topic) require.NoError(t, err) ->>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager_test.go require.Equal(t, int32(2), partitionNum) } From bce6762bfbc163a0ec171f17fccc31f9b1cf4dbe Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 8 Nov 2023 15:48:01 +0800 Subject: [PATCH 3/7] fix test. --- cdc/sink/mq/manager/kafka_manager.go | 3 +- cdc/sink/mq/mq.go | 2 +- cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go | 49 ++--------------------- cdc/sinkv2/util/helper.go | 10 +---- 4 files changed, 8 insertions(+), 56 deletions(-) diff --git a/cdc/sink/mq/manager/kafka_manager.go b/cdc/sink/mq/manager/kafka_manager.go index 18ede4cc745..3d6ec99b74f 100644 --- a/cdc/sink/mq/manager/kafka_manager.go +++ b/cdc/sink/mq/manager/kafka_manager.go @@ -23,6 +23,7 @@ import ( "github.com/Shopify/sarama" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" kafkaconfig "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -61,10 +62,10 @@ type kafkaTopicManager struct { func NewKafkaTopicManager( ctx context.Context, defaultTopic string, - changefeedID model.ChangeFeedID, admin kafka.ClusterAdminClient, cfg *kafkaconfig.AutoCreateTopicConfig, ) *kafkaTopicManager { + changefeedID := contextutil.ChangefeedIDFromCtx(ctx) mgr := &kafkaTopicManager{ defaultTopic: defaultTopic, changefeedID: changefeedID, diff --git a/cdc/sink/mq/mq.go b/cdc/sink/mq/mq.go index e7210e43726..ef9f98352ca 100644 --- a/cdc/sink/mq/mq.go +++ b/cdc/sink/mq/mq.go @@ -432,7 +432,7 @@ func NewKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } - topicManager := manager.NewKafkaTopicManager(ctx, adminClient, baseConfig.DeriveTopicConfig()) + topicManager := manager.NewKafkaTopicManager(ctx, topic, adminClient, baseConfig.DeriveTopicConfig()) if _, err := topicManager.CreateTopicAndWaitUntilVisible(topic); err != nil { return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err) } diff --git a/cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go b/cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go index 967d6594ba6..86ccef864ba 100644 --- a/cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go +++ b/cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go @@ -80,12 +80,9 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() -<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go leader, topic := initBroker(t, kafka.DefaultMockPartitionNum) defer leader.Close() -======= // partition-number is 2, so only send DDL events to 2 partitions. ->>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" + "&max-message-bytes=1048576&partition-num=2" + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol" @@ -114,8 +111,7 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) { err = s.WriteDDLEvent(ctx, ddl) require.Nil(t, err) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(), -<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go - 3, "All partitions should be broadcast") + 2, "All partitions should be broadcast") require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{ Topic: "mock_topic", Partition: 0, @@ -124,15 +120,6 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) { Topic: "mock_topic", Partition: 1, }), 1) - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{ - Topic: "mock_topic", - Partition: 2, - }), 1) -======= - 2, "All partitions should be broadcast") - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1) - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 1), 1) ->>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go } func TestWriteDDLEventToZeroPartition(t *testing.T) { @@ -192,12 +179,9 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() -<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go leader, topic := initBroker(t, kafka.DefaultMockPartitionNum) defer leader.Close() -======= // partition-num is set to 2, so send checkpoint to 2 partitions. ->>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" + "&max-message-bytes=1048576&partition-num=2" + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip" + @@ -220,8 +204,7 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) { require.Nil(t, err) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(), -<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go - 3, "All partitions should be broadcast") + 2, "All partitions should be broadcast") require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{ Topic: "mock_topic", Partition: 0, @@ -230,15 +213,7 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) { Topic: "mock_topic", Partition: 1, }), 1) - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{ - Topic: "mock_topic", - Partition: 2, - }), 1) -======= - 2, "All partitions should be broadcast") - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1) - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 1), 1) ->>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go + } func TestWriteCheckpointTsToTableTopics(t *testing.T) { @@ -298,20 +273,11 @@ func TestWriteCheckpointTsToTableTopics(t *testing.T) { require.Nil(t, err) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(), -<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go - 6, "All topics and partitions should be broadcast") + 4, "All topics and partitions should be broadcast") require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{ Topic: "mock_topic", Partition: 0, }), 1) - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{ - Topic: "mock_topic", - Partition: 1, - }), 1) - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{ - Topic: "mock_topic", - Partition: 2, - }), 1) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{ Topic: "cdc_person", Partition: 0, @@ -324,13 +290,6 @@ func TestWriteCheckpointTsToTableTopics(t *testing.T) { Topic: "cdc_person2", Partition: 0, }), 1) -======= - 4, "All topics and partitions should be broadcast") - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1) - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person", 0), 1) - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person1", 0), 1) - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person2", 0), 1) ->>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go } func TestWriteCheckpointTsWhenCanalJsonTiDBExtensionIsDisable(t *testing.T) { diff --git a/cdc/sinkv2/util/helper.go b/cdc/sinkv2/util/helper.go index cfc7bb62940..818e4681dfd 100644 --- a/cdc/sinkv2/util/helper.go +++ b/cdc/sinkv2/util/helper.go @@ -94,16 +94,8 @@ func GetTopicManagerAndTryCreateTopic( topicCfg *kafka.AutoCreateTopicConfig, adminClient pkafka.ClusterAdminClient, ) (manager.TopicManager, error) { -<<<<<<< HEAD:cdc/sinkv2/util/helper.go - topicManager := manager.NewKafkaTopicManager(ctx, adminClient, topicCfg) + topicManager := manager.NewKafkaTopicManager(ctx, topic, adminClient, topicCfg) if _, err := topicManager.CreateTopicAndWaitUntilVisible(topic); err != nil { -======= - topicManager := manager.NewKafkaTopicManager( - ctx, topic, changefeedID, adminClient, topicCfg, - ) - - if _, err := topicManager.CreateTopicAndWaitUntilVisible(ctx, topic); err != nil { ->>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/util/helper.go return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err) } From 9e2c25df04a6ea7cf611c4a0aee9c6914e872f22 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 8 Nov 2023 16:29:05 +0800 Subject: [PATCH 4/7] fix make fmt --- cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go b/cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go index 86ccef864ba..d6d4008f8db 100644 --- a/cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go +++ b/cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go @@ -213,7 +213,6 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) { Topic: "mock_topic", Partition: 1, }), 1) - } func TestWriteCheckpointTsToTableTopics(t *testing.T) { From 0ab2464c70a4ae53c7882d5977ce77501f354983 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 8 Nov 2023 18:03:29 +0800 Subject: [PATCH 5/7] fix test. --- cdc/sink/mq/manager/kafka_manager_test.go | 14 +++++--------- cdc/sinkv2/util/helper_test.go | 22 +++++++++++----------- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/cdc/sink/mq/manager/kafka_manager_test.go b/cdc/sink/mq/manager/kafka_manager_test.go index 4e87085eaac..6f817bc7613 100644 --- a/cdc/sink/mq/manager/kafka_manager_test.go +++ b/cdc/sink/mq/manager/kafka_manager_test.go @@ -17,7 +17,6 @@ import ( "context" "testing" - "github.com/pingcap/tiflow/cdc/model" kafkaconfig "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka" "github.com/pingcap/tiflow/pkg/sink/kafka" "github.com/stretchr/testify/require" @@ -37,8 +36,7 @@ func TestPartitions(t *testing.T) { } ctx := context.Background() - changefeedID := model.DefaultChangeFeedID("test") - manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg) + manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, adminClient, cfg) defer manager.Close() partitionsNum, err := manager.GetPartitionNum(kafka.DefaultMockTopicName) require.NoError(t, err) @@ -60,8 +58,7 @@ func TestCreateTopic(t *testing.T) { ReplicationFactor: 1, } - changefeedID := model.DefaultChangeFeedID("test") - manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg) + manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, adminClient, cfg) defer manager.Close() partitionNum, err := manager.CreateTopicAndWaitUntilVisible(kafka.DefaultMockTopicName) require.NoError(t, err) @@ -76,7 +73,7 @@ func TestCreateTopic(t *testing.T) { // Try to create a topic without auto create. cfg.AutoCreate = false - manager = NewKafkaTopicManager(ctx, "new-topic2", changefeedID, adminClient, cfg) + manager = NewKafkaTopicManager(ctx, "new-topic2", adminClient, cfg) defer manager.Close() _, err = manager.CreateTopicAndWaitUntilVisible("new-topic2") require.Regexp( @@ -93,7 +90,7 @@ func TestCreateTopic(t *testing.T) { PartitionNum: 2, ReplicationFactor: 4, } - manager = NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg) + manager = NewKafkaTopicManager(ctx, topic, adminClient, cfg) defer manager.Close() _, err = manager.CreateTopicAndWaitUntilVisible(topic) require.Regexp( @@ -117,9 +114,8 @@ func TestCreateTopicWithDelay(t *testing.T) { } topic := "new_topic" - changefeedID := model.DefaultChangeFeedID("test") ctx := context.Background() - manager := NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg) + manager := NewKafkaTopicManager(ctx, topic, adminClient, cfg) defer manager.Close() partitionNum, err := manager.createTopic(topic) require.NoError(t, err) diff --git a/cdc/sinkv2/util/helper_test.go b/cdc/sinkv2/util/helper_test.go index 6db38bb54d5..10424197baf 100644 --- a/cdc/sinkv2/util/helper_test.go +++ b/cdc/sinkv2/util/helper_test.go @@ -18,15 +18,17 @@ import ( "net/url" "testing" - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/sink/kafka" + "github.com/Shopify/sarama" + pkafka "github.com/pingcap/tiflow/pkg/sink/kafka" "github.com/stretchr/testify/require" + + "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka" ) func TestPartition(t *testing.T) { t.Parallel() - adminClient := kafka.NewClusterAdminClientMockImpl() + adminClient := pkafka.NewClusterAdminClientMockImpl() defer adminClient.Close() cfg := &kafka.AutoCreateTopicConfig{ @@ -35,31 +37,29 @@ func TestPartition(t *testing.T) { ReplicationFactor: 1, } - changefeedID := model.DefaultChangeFeedID("test") ctx := context.Background() - - manager, err := GetTopicManagerAndTryCreateTopic(ctx, changefeedID, kafka.DefaultMockTopicName, cfg, adminClient) + manager, err := GetTopicManagerAndTryCreateTopic(ctx, pkafka.DefaultMockTopicName, cfg, adminClient) require.NoError(t, err) defer manager.Close() // default topic, real partition is 3, but 2 is set in the sink-uri, so return 2. - partitionsNum, err := manager.GetPartitionNum(ctx, kafka.DefaultMockTopicName) + partitionsNum, err := manager.GetPartitionNum(pkafka.DefaultMockTopicName) require.NoError(t, err) require.Equal(t, int32(2), partitionsNum) // new topic, create it with partition number as 2. - partitionsNum, err = manager.GetPartitionNum(ctx, "new-topic") + partitionsNum, err = manager.GetPartitionNum("new-topic") require.NoError(t, err) require.Equal(t, int32(2), partitionsNum) // assume a topic already exist, the not default topic won't be affected by the default topic's partition number. - err = adminClient.CreateTopic(ctx, &kafka.TopicDetail{ - Name: "new-topic-2", + err = adminClient.CreateTopic("new-topic-2", &sarama.TopicDetail{ + NumPartitions: 3, }, false) require.NoError(t, err) - partitionsNum, err = manager.GetPartitionNum(ctx, "new-topic-2") + partitionsNum, err = manager.GetPartitionNum("new-topic-2") require.NoError(t, err) require.Equal(t, int32(3), partitionsNum) } From 368b0398f56e2eb247c2acd62d831912bb1ce275 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 10 Nov 2023 14:15:56 +0800 Subject: [PATCH 6/7] fix make fmt --- cdc/sinkv2/util/helper_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cdc/sinkv2/util/helper_test.go b/cdc/sinkv2/util/helper_test.go index 10424197baf..2dffde7c557 100644 --- a/cdc/sinkv2/util/helper_test.go +++ b/cdc/sinkv2/util/helper_test.go @@ -19,10 +19,9 @@ import ( "testing" "github.com/Shopify/sarama" + "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka" pkafka "github.com/pingcap/tiflow/pkg/sink/kafka" "github.com/stretchr/testify/require" - - "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka" ) func TestPartition(t *testing.T) { @@ -54,7 +53,7 @@ func TestPartition(t *testing.T) { // assume a topic already exist, the not default topic won't be affected by the default topic's partition number. err = adminClient.CreateTopic("new-topic-2", &sarama.TopicDetail{ - + NumPartitions: 3, }, false) require.NoError(t, err) From 891daea399ef175c114e6d81e69c4951e20e2543 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 10 Nov 2023 16:22:51 +0800 Subject: [PATCH 7/7] fix make fmt --- cdc/sinkv2/util/helper_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cdc/sinkv2/util/helper_test.go b/cdc/sinkv2/util/helper_test.go index 2dffde7c557..635737de221 100644 --- a/cdc/sinkv2/util/helper_test.go +++ b/cdc/sinkv2/util/helper_test.go @@ -53,7 +53,6 @@ func TestPartition(t *testing.T) { // assume a topic already exist, the not default topic won't be affected by the default topic's partition number. err = adminClient.CreateTopic("new-topic-2", &sarama.TopicDetail{ - NumPartitions: 3, }, false) require.NoError(t, err)