diff --git a/cdc/sink/mq/manager/kafka_manager.go b/cdc/sink/mq/manager/kafka_manager.go index 99e78858f8f..3d6ec99b74f 100644 --- a/cdc/sink/mq/manager/kafka_manager.go +++ b/cdc/sink/mq/manager/kafka_manager.go @@ -44,6 +44,8 @@ const ( type kafkaTopicManager struct { changefeedID model.ChangeFeedID + defaultTopic string + admin kafka.ClusterAdminClient cfg *kafkaconfig.AutoCreateTopicConfig @@ -59,11 +61,13 @@ type kafkaTopicManager struct { // NewKafkaTopicManager creates a new topic manager. func NewKafkaTopicManager( ctx context.Context, + defaultTopic string, admin kafka.ClusterAdminClient, cfg *kafkaconfig.AutoCreateTopicConfig, ) *kafkaTopicManager { changefeedID := contextutil.ChangefeedIDFromCtx(ctx) mgr := &kafkaTopicManager{ + defaultTopic: defaultTopic, changefeedID: changefeedID, admin: admin, cfg: cfg, @@ -113,8 +117,15 @@ func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) { zap.Error(err)) } + // it may happen the following case: + // 1. user create the default topic with partition number set as 3 manually + // 2. set the partition-number as 2 in the sink-uri. + // in the such case, we should use 2 instead of 3 as the partition number. for _, detail := range topicMetaList { partitionNum := int32(len(detail.Partitions)) + if detail.Name == m.defaultTopic { + partitionNum = m.cfg.PartitionNum + } m.tryUpdatePartitionsAndLogging(detail.Name, partitionNum) } @@ -296,6 +307,9 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(topicName string) (in zap.String("topic", topicName), zap.Int32("partitionNumber", int32(len(detail.Partitions)))) partitionNum := int32(len(detail.Partitions)) + if topicName == m.defaultTopic { + partitionNum = m.cfg.PartitionNum + } m.tryUpdatePartitionsAndLogging(topicName, partitionNum) return partitionNum, nil } diff --git a/cdc/sink/mq/manager/kafka_manager_test.go b/cdc/sink/mq/manager/kafka_manager_test.go index a1d61963770..6f817bc7613 100644 --- a/cdc/sink/mq/manager/kafka_manager_test.go +++ b/cdc/sink/mq/manager/kafka_manager_test.go @@ -35,11 +35,12 @@ func TestPartitions(t *testing.T) { ReplicationFactor: 1, } - manager := NewKafkaTopicManager(context.TODO(), adminClient, cfg) + ctx := context.Background() + manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, adminClient, cfg) defer manager.Close() partitionsNum, err := manager.GetPartitionNum(kafka.DefaultMockTopicName) - require.Nil(t, err) - require.Equal(t, int32(3), partitionsNum) + require.NoError(t, err) + require.Equal(t, int32(2), partitionsNum) } func TestCreateTopic(t *testing.T) { @@ -57,22 +58,22 @@ func TestCreateTopic(t *testing.T) { ReplicationFactor: 1, } - manager := NewKafkaTopicManager(ctx, adminClient, cfg) + manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, adminClient, cfg) defer manager.Close() partitionNum, err := manager.CreateTopicAndWaitUntilVisible(kafka.DefaultMockTopicName) - require.Nil(t, err) - require.Equal(t, int32(3), partitionNum) + require.NoError(t, err) + require.Equal(t, int32(2), partitionNum) partitionNum, err = manager.CreateTopicAndWaitUntilVisible("new-topic") - require.Nil(t, err) + require.NoError(t, err) require.Equal(t, int32(2), partitionNum) partitionsNum, err := manager.GetPartitionNum("new-topic") - require.Nil(t, err) + require.NoError(t, err) require.Equal(t, int32(2), partitionsNum) // Try to create a topic without auto create. cfg.AutoCreate = false - manager = NewKafkaTopicManager(ctx, adminClient, cfg) + manager = NewKafkaTopicManager(ctx, "new-topic2", adminClient, cfg) defer manager.Close() _, err = manager.CreateTopicAndWaitUntilVisible("new-topic2") require.Regexp( @@ -81,6 +82,7 @@ func TestCreateTopic(t *testing.T) { err, ) + topic := "new-topic-failed" // Invalid replication factor. // It happens when replication-factor is greater than the number of brokers. cfg = &kafkaconfig.AutoCreateTopicConfig{ @@ -88,9 +90,9 @@ func TestCreateTopic(t *testing.T) { PartitionNum: 2, ReplicationFactor: 4, } - manager = NewKafkaTopicManager(ctx, adminClient, cfg) + manager = NewKafkaTopicManager(ctx, topic, adminClient, cfg) defer manager.Close() - _, err = manager.CreateTopicAndWaitUntilVisible("new-topic-failed") + _, err = manager.CreateTopicAndWaitUntilVisible(topic) require.Regexp( t, "new sarama producer: kafka server: Replication-factor is invalid", @@ -111,13 +113,15 @@ func TestCreateTopicWithDelay(t *testing.T) { ReplicationFactor: 1, } - manager := NewKafkaTopicManager(context.TODO(), adminClient, cfg) + topic := "new_topic" + ctx := context.Background() + manager := NewKafkaTopicManager(ctx, topic, adminClient, cfg) defer manager.Close() - partitionNum, err := manager.createTopic("new_topic") - require.Nil(t, err) - err = adminClient.SetRemainingFetchesUntilTopicVisible("new_topic", 3) - require.Nil(t, err) - err = manager.waitUntilTopicVisible("new_topic") - require.Nil(t, err) + partitionNum, err := manager.createTopic(topic) + require.NoError(t, err) + err = adminClient.SetRemainingFetchesUntilTopicVisible(topic, 3) + require.NoError(t, err) + err = manager.waitUntilTopicVisible(topic) + require.NoError(t, err) require.Equal(t, int32(2), partitionNum) } diff --git a/cdc/sink/mq/mq.go b/cdc/sink/mq/mq.go index e7210e43726..ef9f98352ca 100644 --- a/cdc/sink/mq/mq.go +++ b/cdc/sink/mq/mq.go @@ -432,7 +432,7 @@ func NewKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } - topicManager := manager.NewKafkaTopicManager(ctx, adminClient, baseConfig.DeriveTopicConfig()) + topicManager := manager.NewKafkaTopicManager(ctx, topic, adminClient, baseConfig.DeriveTopicConfig()) if _, err := topicManager.CreateTopicAndWaitUntilVisible(topic); err != nil { return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err) } diff --git a/cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go b/cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go index d386fe39ddb..d6d4008f8db 100644 --- a/cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go +++ b/cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go @@ -82,8 +82,9 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) { leader, topic := initBroker(t, kafka.DefaultMockPartitionNum) defer leader.Close() + // partition-number is 2, so only send DDL events to 2 partitions. uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" + - "&max-message-bytes=1048576&partition-num=1" + + "&max-message-bytes=1048576&partition-num=2" + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol" uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic) @@ -110,7 +111,7 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) { err = s.WriteDDLEvent(ctx, ddl) require.Nil(t, err) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(), - 3, "All partitions should be broadcast") + 2, "All partitions should be broadcast") require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{ Topic: "mock_topic", Partition: 0, @@ -119,10 +120,6 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) { Topic: "mock_topic", Partition: 1, }), 1) - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{ - Topic: "mock_topic", - Partition: 2, - }), 1) } func TestWriteDDLEventToZeroPartition(t *testing.T) { @@ -184,8 +181,9 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) { leader, topic := initBroker(t, kafka.DefaultMockPartitionNum) defer leader.Close() + // partition-num is set to 2, so send checkpoint to 2 partitions. uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" + - "&max-message-bytes=1048576&partition-num=1" + + "&max-message-bytes=1048576&partition-num=2" + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip" + "&protocol=canal-json&enable-tidb-extension=true" uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic) @@ -206,7 +204,7 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) { require.Nil(t, err) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(), - 3, "All partitions should be broadcast") + 2, "All partitions should be broadcast") require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{ Topic: "mock_topic", Partition: 0, @@ -215,10 +213,6 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) { Topic: "mock_topic", Partition: 1, }), 1) - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{ - Topic: "mock_topic", - Partition: 2, - }), 1) } func TestWriteCheckpointTsToTableTopics(t *testing.T) { @@ -278,19 +272,11 @@ func TestWriteCheckpointTsToTableTopics(t *testing.T) { require.Nil(t, err) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(), - 6, "All topics and partitions should be broadcast") + 4, "All topics and partitions should be broadcast") require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{ Topic: "mock_topic", Partition: 0, }), 1) - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{ - Topic: "mock_topic", - Partition: 1, - }), 1) - require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{ - Topic: "mock_topic", - Partition: 2, - }), 1) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{ Topic: "cdc_person", Partition: 0, diff --git a/cdc/sinkv2/util/helper.go b/cdc/sinkv2/util/helper.go index aa49156a944..818e4681dfd 100644 --- a/cdc/sinkv2/util/helper.go +++ b/cdc/sinkv2/util/helper.go @@ -94,7 +94,7 @@ func GetTopicManagerAndTryCreateTopic( topicCfg *kafka.AutoCreateTopicConfig, adminClient pkafka.ClusterAdminClient, ) (manager.TopicManager, error) { - topicManager := manager.NewKafkaTopicManager(ctx, adminClient, topicCfg) + topicManager := manager.NewKafkaTopicManager(ctx, topic, adminClient, topicCfg) if _, err := topicManager.CreateTopicAndWaitUntilVisible(topic); err != nil { return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err) } diff --git a/cdc/sinkv2/util/helper_test.go b/cdc/sinkv2/util/helper_test.go index df7911a8595..635737de221 100644 --- a/cdc/sinkv2/util/helper_test.go +++ b/cdc/sinkv2/util/helper_test.go @@ -14,12 +14,54 @@ package util import ( + "context" "net/url" "testing" + "github.com/Shopify/sarama" + "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka" + pkafka "github.com/pingcap/tiflow/pkg/sink/kafka" "github.com/stretchr/testify/require" ) +func TestPartition(t *testing.T) { + t.Parallel() + + adminClient := pkafka.NewClusterAdminClientMockImpl() + defer adminClient.Close() + + cfg := &kafka.AutoCreateTopicConfig{ + AutoCreate: true, + PartitionNum: 2, + ReplicationFactor: 1, + } + + ctx := context.Background() + manager, err := GetTopicManagerAndTryCreateTopic(ctx, pkafka.DefaultMockTopicName, cfg, adminClient) + require.NoError(t, err) + defer manager.Close() + + // default topic, real partition is 3, but 2 is set in the sink-uri, so return 2. + partitionsNum, err := manager.GetPartitionNum(pkafka.DefaultMockTopicName) + require.NoError(t, err) + require.Equal(t, int32(2), partitionsNum) + + // new topic, create it with partition number as 2. + partitionsNum, err = manager.GetPartitionNum("new-topic") + require.NoError(t, err) + require.Equal(t, int32(2), partitionsNum) + + // assume a topic already exist, the not default topic won't be affected by the default topic's partition number. + err = adminClient.CreateTopic("new-topic-2", &sarama.TopicDetail{ + NumPartitions: 3, + }, false) + require.NoError(t, err) + + partitionsNum, err = manager.GetPartitionNum("new-topic-2") + require.NoError(t, err) + require.Equal(t, int32(3), partitionsNum) +} + func TestGetTopic(t *testing.T) { t.Parallel()