diff --git a/cdc/sink/ddlsink/mq/mq_ddl_sink_test.go b/cdc/sink/ddlsink/mq/mq_ddl_sink_test.go index b0aa7e49ec8..434cf80ac96 100644 --- a/cdc/sink/ddlsink/mq/mq_ddl_sink_test.go +++ b/cdc/sink/ddlsink/mq/mq_ddl_sink_test.go @@ -57,8 +57,9 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // partition-number is 2, so only send DDL events to 2 partitions. uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" + - "&max-message-bytes=1048576&partition-num=1" + + "&max-message-bytes=1048576&partition-num=2" + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol" uri := fmt.Sprintf(uriTemplate, "127.0.0.1:9092", kafka.DefaultMockTopicName) @@ -87,10 +88,9 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) { err = s.WriteDDLEvent(ctx, ddl) require.NoError(t, err) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(), - 3, "All partitions should be broadcast") + 2, "All partitions should be broadcast") require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 1), 1) - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 2), 1) } func TestWriteDDLEventToZeroPartition(t *testing.T) { @@ -142,8 +142,9 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // partition-num is set to 2, so send checkpoint to 2 partitions. uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" + - "&max-message-bytes=1048576&partition-num=1" + + "&max-message-bytes=1048576&partition-num=2" + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip" + "&protocol=canal-json&enable-tidb-extension=true" uri := fmt.Sprintf(uriTemplate, "127.0.0.1:9092", kafka.DefaultMockTopicName) @@ -167,10 +168,9 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) { require.Nil(t, err) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(), - 3, "All partitions should be broadcast") + 2, "All partitions should be broadcast") require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 1), 1) - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 2), 1) } func TestWriteCheckpointTsToTableTopics(t *testing.T) { @@ -231,10 +231,8 @@ func TestWriteCheckpointTsToTableTopics(t *testing.T) { require.NoError(t, err) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(), - 6, "All topics and partitions should be broadcast") + 4, "All topics and partitions should be broadcast") require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1) - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 1), 1) - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 2), 1) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person", 0), 1) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person1", 0), 1) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person2", 0), 1) diff --git a/cdc/sink/dmlsink/mq/manager/kafka_manager.go b/cdc/sink/dmlsink/mq/manager/kafka_manager.go index 6de54617b95..c716a60305c 100644 --- a/cdc/sink/dmlsink/mq/manager/kafka_manager.go +++ b/cdc/sink/dmlsink/mq/manager/kafka_manager.go @@ -41,6 +41,8 @@ const ( type kafkaTopicManager struct { changefeedID model.ChangeFeedID + defaultTopic string + admin kafka.ClusterAdminClient cfg *kafka.AutoCreateTopicConfig @@ -56,11 +58,13 @@ type kafkaTopicManager struct { // NewKafkaTopicManager creates a new topic manager. func NewKafkaTopicManager( ctx context.Context, + defaultTopic string, admin kafka.ClusterAdminClient, cfg *kafka.AutoCreateTopicConfig, ) *kafkaTopicManager { changefeedID := contextutil.ChangefeedIDFromCtx(ctx) mgr := &kafkaTopicManager{ + defaultTopic: defaultTopic, changefeedID: changefeedID, admin: admin, cfg: cfg, @@ -166,6 +170,15 @@ func (m *kafkaTopicManager) fetchAllTopicsPartitionsNum( return nil, err } + // it may happen the following case: + // 1. user create the default topic with partition number set as 3 manually + // 2. set the partition-number as 2 in the sink-uri. + // in the such case, we should use 2 instead of 3 as the partition number. + _, ok := numPartitions[m.defaultTopic] + if ok { + numPartitions[m.defaultTopic] = m.cfg.PartitionNum + } + log.Info( "Kafka admin client describe topics success", zap.String("namespace", m.changefeedID.Namespace), @@ -272,8 +285,12 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible( return 0, errors.Trace(err) } if detail, ok := topicDetails[topicName]; ok { - m.tryUpdatePartitionsAndLogging(topicName, detail.NumPartitions) - return detail.NumPartitions, nil + numPartition := detail.NumPartitions + if topicName == m.defaultTopic { + numPartition = m.cfg.PartitionNum + } + m.tryUpdatePartitionsAndLogging(topicName, numPartition) + return numPartition, nil } partitionNum, err := m.createTopic(ctx, topicName) diff --git a/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go b/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go index 174e14652d2..b747f89a0e1 100644 --- a/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go +++ b/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go @@ -21,28 +21,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestPartitions(t *testing.T) { - t.Parallel() - - adminClient := kafka.NewClusterAdminClientMockImpl() - defer adminClient.Close() - cfg := &kafka.AutoCreateTopicConfig{ - AutoCreate: true, - PartitionNum: 2, - ReplicationFactor: 1, - } - - ctx := context.Background() - manager := NewKafkaTopicManager(ctx, adminClient, cfg) - defer manager.Close() - - partitionsNum, err := manager.GetPartitionNum( - ctx, - kafka.DefaultMockTopicName) - require.Nil(t, err) - require.Equal(t, int32(3), partitionsNum) -} - func TestCreateTopic(t *testing.T) { t.Parallel() @@ -55,22 +33,22 @@ func TestCreateTopic(t *testing.T) { } ctx := context.Background() - manager := NewKafkaTopicManager(ctx, adminClient, cfg) + manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, adminClient, cfg) defer manager.Close() partitionNum, err := manager.CreateTopicAndWaitUntilVisible(ctx, kafka.DefaultMockTopicName) - require.Nil(t, err) - require.Equal(t, int32(3), partitionNum) + require.NoError(t, err) + require.Equal(t, int32(2), partitionNum) partitionNum, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic") - require.Nil(t, err) + require.NoError(t, err) require.Equal(t, int32(2), partitionNum) partitionsNum, err := manager.GetPartitionNum(ctx, "new-topic") - require.Nil(t, err) + require.NoError(t, err) require.Equal(t, int32(2), partitionsNum) // Try to create a topic without auto create. cfg.AutoCreate = false - manager = NewKafkaTopicManager(ctx, adminClient, cfg) + manager = NewKafkaTopicManager(ctx, "new-topic2", adminClient, cfg) defer manager.Close() _, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic2") require.Regexp( @@ -79,6 +57,7 @@ func TestCreateTopic(t *testing.T) { err, ) + topic := "new-topic-failed" // Invalid replication factor. // It happens when replication-factor is greater than the number of brokers. cfg = &kafka.AutoCreateTopicConfig{ @@ -86,9 +65,9 @@ func TestCreateTopic(t *testing.T) { PartitionNum: 2, ReplicationFactor: 4, } - manager = NewKafkaTopicManager(ctx, adminClient, cfg) + manager = NewKafkaTopicManager(ctx, topic, adminClient, cfg) defer manager.Close() - _, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic-failed") + _, err = manager.CreateTopicAndWaitUntilVisible(ctx, topic) require.Regexp( t, "kafka create topic failed: kafka server: Replication-factor is invalid", @@ -107,14 +86,15 @@ func TestCreateTopicWithDelay(t *testing.T) { ReplicationFactor: 1, } + topic := "new_topic" ctx := context.Background() - manager := NewKafkaTopicManager(ctx, adminClient, cfg) + manager := NewKafkaTopicManager(ctx, topic, adminClient, cfg) defer manager.Close() - partitionNum, err := manager.createTopic(ctx, "new_topic") - require.Nil(t, err) - err = adminClient.SetRemainingFetchesUntilTopicVisible("new_topic", 3) - require.Nil(t, err) - err = manager.waitUntilTopicVisible(ctx, "new_topic") - require.Nil(t, err) + partitionNum, err := manager.createTopic(ctx, topic) + require.NoError(t, err) + err = adminClient.SetRemainingFetchesUntilTopicVisible(topic, 3) + require.NoError(t, err) + err = manager.waitUntilTopicVisible(ctx, topic) + require.NoError(t, err) require.Equal(t, int32(2), partitionNum) } diff --git a/cdc/sink/util/helper.go b/cdc/sink/util/helper.go index 37528b8e9b1..b895b8fac78 100644 --- a/cdc/sink/util/helper.go +++ b/cdc/sink/util/helper.go @@ -94,7 +94,7 @@ func GetTopicManagerAndTryCreateTopic( adminClient kafka.ClusterAdminClient, ) (manager.TopicManager, error) { topicManager := manager.NewKafkaTopicManager( - ctx, adminClient, topicCfg, + ctx, topic, adminClient, topicCfg, ) if _, err := topicManager.CreateTopicAndWaitUntilVisible(ctx, topic); err != nil { diff --git a/cdc/sink/util/helper_test.go b/cdc/sink/util/helper_test.go index a1a4af4d8c3..8d62df9eb32 100644 --- a/cdc/sink/util/helper_test.go +++ b/cdc/sink/util/helper_test.go @@ -14,12 +14,53 @@ package util import ( + "context" "net/url" "testing" + "github.com/pingcap/tiflow/pkg/sink/kafka" "github.com/stretchr/testify/require" ) +func TestPartition(t *testing.T) { + t.Parallel() + + adminClient := kafka.NewClusterAdminClientMockImpl() + defer adminClient.Close() + + cfg := &kafka.AutoCreateTopicConfig{ + AutoCreate: true, + PartitionNum: 2, + ReplicationFactor: 1, + } + + ctx := context.Background() + manager, err := GetTopicManagerAndTryCreateTopic(ctx, kafka.DefaultMockTopicName, cfg, adminClient) + require.NoError(t, err) + defer manager.Close() + + // default topic, real partition is 3, but 2 is set in the sink-uri, so return 2. + partitionsNum, err := manager.GetPartitionNum(ctx, kafka.DefaultMockTopicName) + require.NoError(t, err) + require.Equal(t, int32(2), partitionsNum) + + // new topic, create it with partition number as 2. + partitionsNum, err = manager.GetPartitionNum(ctx, "new-topic") + require.NoError(t, err) + require.Equal(t, int32(2), partitionsNum) + + // assume a topic already exist, the not default topic won't be affected by the default topic's partition number. + err = adminClient.CreateTopic(ctx, &kafka.TopicDetail{ + Name: "new-topic-2", + NumPartitions: 3, + }, false) + require.NoError(t, err) + + partitionsNum, err = manager.GetPartitionNum(ctx, "new-topic-2") + require.NoError(t, err) + require.Equal(t, int32(3), partitionsNum) +} + func TestGetTopic(t *testing.T) { t.Parallel()