diff --git a/cdc/sink/mq/manager/kafka_manager.go b/cdc/sink/mq/manager/kafka_manager.go index 99e78858f8f..93221a6c46c 100644 --- a/cdc/sink/mq/manager/kafka_manager.go +++ b/cdc/sink/mq/manager/kafka_manager.go @@ -44,6 +44,8 @@ const ( type kafkaTopicManager struct { changefeedID model.ChangeFeedID + defaultTopic string + admin kafka.ClusterAdminClient cfg *kafkaconfig.AutoCreateTopicConfig @@ -59,11 +61,17 @@ type kafkaTopicManager struct { // NewKafkaTopicManager creates a new topic manager. func NewKafkaTopicManager( ctx context.Context, +<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager.go +======= + defaultTopic string, + changefeedID model.ChangeFeedID, +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager.go admin kafka.ClusterAdminClient, cfg *kafkaconfig.AutoCreateTopicConfig, ) *kafkaTopicManager { changefeedID := contextutil.ChangefeedIDFromCtx(ctx) mgr := &kafkaTopicManager{ + defaultTopic: defaultTopic, changefeedID: changefeedID, admin: admin, cfg: cfg, @@ -172,6 +180,15 @@ func (m *kafkaTopicManager) getMetadataOfTopics() ([]*sarama.TopicMetadata, erro return nil, err } + // it may happen the following case: + // 1. user create the default topic with partition number set as 3 manually + // 2. set the partition-number as 2 in the sink-uri. + // in the such case, we should use 2 instead of 3 as the partition number. + _, ok := numPartitions[m.defaultTopic] + if ok { + numPartitions[m.defaultTopic] = m.cfg.PartitionNum + } + log.Info( "Kafka admin client describe topics success", zap.String("namespace", m.changefeedID.Namespace), @@ -287,6 +304,7 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(topicName string) (in if err != nil { return 0, errors.Trace(err) } +<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager.go for _, detail := range topicDetails { if detail.Err == sarama.ErrNoError { if detail.Name == topicName { @@ -307,6 +325,15 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(topicName string) (in zap.Error(detail.Err)) return 0, errors.Trace(detail.Err) } +======= + if detail, ok := topicDetails[topicName]; ok { + numPartition := detail.NumPartitions + if topicName == m.defaultTopic { + numPartition = m.cfg.PartitionNum + } + m.tryUpdatePartitionsAndLogging(topicName, numPartition) + return numPartition, nil +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager.go } partitionNum, err := m.createTopic(topicName) diff --git a/cdc/sink/mq/manager/kafka_manager_test.go b/cdc/sink/mq/manager/kafka_manager_test.go index a1d61963770..4bf405561ce 100644 --- a/cdc/sink/mq/manager/kafka_manager_test.go +++ b/cdc/sink/mq/manager/kafka_manager_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" ) +<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager_test.go func TestPartitions(t *testing.T) { t.Parallel() @@ -42,6 +43,8 @@ func TestPartitions(t *testing.T) { require.Equal(t, int32(3), partitionsNum) } +======= +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager_test.go func TestCreateTopic(t *testing.T) { t.Parallel() @@ -57,6 +60,7 @@ func TestCreateTopic(t *testing.T) { ReplicationFactor: 1, } +<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager_test.go manager := NewKafkaTopicManager(ctx, adminClient, cfg) defer manager.Close() partitionNum, err := manager.CreateTopicAndWaitUntilVisible(kafka.DefaultMockTopicName) @@ -68,11 +72,30 @@ func TestCreateTopic(t *testing.T) { require.Equal(t, int32(2), partitionNum) partitionsNum, err := manager.GetPartitionNum("new-topic") require.Nil(t, err) +======= + changefeedID := model.DefaultChangeFeedID("test") + ctx := context.Background() + manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg) + defer manager.Close() + partitionNum, err := manager.CreateTopicAndWaitUntilVisible(ctx, kafka.DefaultMockTopicName) + require.NoError(t, err) + require.Equal(t, int32(2), partitionNum) + + partitionNum, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic") + require.NoError(t, err) + require.Equal(t, int32(2), partitionNum) + partitionsNum, err := manager.GetPartitionNum(ctx, "new-topic") + require.NoError(t, err) +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager_test.go require.Equal(t, int32(2), partitionsNum) // Try to create a topic without auto create. cfg.AutoCreate = false +<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager_test.go manager = NewKafkaTopicManager(ctx, adminClient, cfg) +======= + manager = NewKafkaTopicManager(ctx, "new-topic2", changefeedID, adminClient, cfg) +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager_test.go defer manager.Close() _, err = manager.CreateTopicAndWaitUntilVisible("new-topic2") require.Regexp( @@ -81,6 +104,7 @@ func TestCreateTopic(t *testing.T) { err, ) + topic := "new-topic-failed" // Invalid replication factor. // It happens when replication-factor is greater than the number of brokers. cfg = &kafkaconfig.AutoCreateTopicConfig{ @@ -88,9 +112,15 @@ func TestCreateTopic(t *testing.T) { PartitionNum: 2, ReplicationFactor: 4, } +<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager_test.go manager = NewKafkaTopicManager(ctx, adminClient, cfg) defer manager.Close() _, err = manager.CreateTopicAndWaitUntilVisible("new-topic-failed") +======= + manager = NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg) + defer manager.Close() + _, err = manager.CreateTopicAndWaitUntilVisible(ctx, topic) +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager_test.go require.Regexp( t, "new sarama producer: kafka server: Replication-factor is invalid", @@ -111,6 +141,7 @@ func TestCreateTopicWithDelay(t *testing.T) { ReplicationFactor: 1, } +<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager_test.go manager := NewKafkaTopicManager(context.TODO(), adminClient, cfg) defer manager.Close() partitionNum, err := manager.createTopic("new_topic") @@ -119,5 +150,18 @@ func TestCreateTopicWithDelay(t *testing.T) { require.Nil(t, err) err = manager.waitUntilTopicVisible("new_topic") require.Nil(t, err) +======= + topic := "new_topic" + changefeedID := model.DefaultChangeFeedID("test") + ctx := context.Background() + manager := NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg) + defer manager.Close() + partitionNum, err := manager.createTopic(ctx, topic) + require.NoError(t, err) + err = adminClient.SetRemainingFetchesUntilTopicVisible(topic, 3) + require.NoError(t, err) + err = manager.waitUntilTopicVisible(ctx, topic) + require.NoError(t, err) +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager_test.go require.Equal(t, int32(2), partitionNum) } diff --git a/cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go b/cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go index d386fe39ddb..967d6594ba6 100644 --- a/cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go +++ b/cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go @@ -80,10 +80,14 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() +<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go leader, topic := initBroker(t, kafka.DefaultMockPartitionNum) defer leader.Close() +======= + // partition-number is 2, so only send DDL events to 2 partitions. +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" + - "&max-message-bytes=1048576&partition-num=1" + + "&max-message-bytes=1048576&partition-num=2" + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol" uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic) @@ -110,6 +114,7 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) { err = s.WriteDDLEvent(ctx, ddl) require.Nil(t, err) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(), +<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go 3, "All partitions should be broadcast") require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{ Topic: "mock_topic", @@ -123,6 +128,11 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) { Topic: "mock_topic", Partition: 2, }), 1) +======= + 2, "All partitions should be broadcast") + require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1) + require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 1), 1) +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go } func TestWriteDDLEventToZeroPartition(t *testing.T) { @@ -182,10 +192,14 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() +<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go leader, topic := initBroker(t, kafka.DefaultMockPartitionNum) defer leader.Close() +======= + // partition-num is set to 2, so send checkpoint to 2 partitions. +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" + - "&max-message-bytes=1048576&partition-num=1" + + "&max-message-bytes=1048576&partition-num=2" + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip" + "&protocol=canal-json&enable-tidb-extension=true" uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic) @@ -206,6 +220,7 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) { require.Nil(t, err) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(), +<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go 3, "All partitions should be broadcast") require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{ Topic: "mock_topic", @@ -219,6 +234,11 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) { Topic: "mock_topic", Partition: 2, }), 1) +======= + 2, "All partitions should be broadcast") + require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1) + require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 1), 1) +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go } func TestWriteCheckpointTsToTableTopics(t *testing.T) { @@ -278,6 +298,7 @@ func TestWriteCheckpointTsToTableTopics(t *testing.T) { require.Nil(t, err) require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(), +<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go 6, "All topics and partitions should be broadcast") require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{ Topic: "mock_topic", @@ -303,6 +324,13 @@ func TestWriteCheckpointTsToTableTopics(t *testing.T) { Topic: "cdc_person2", Partition: 0, }), 1) +======= + 4, "All topics and partitions should be broadcast") + require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1) + require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person", 0), 1) + require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person1", 0), 1) + require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person2", 0), 1) +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go } func TestWriteCheckpointTsWhenCanalJsonTiDBExtensionIsDisable(t *testing.T) { diff --git a/cdc/sinkv2/util/helper.go b/cdc/sinkv2/util/helper.go index aa49156a944..cfc7bb62940 100644 --- a/cdc/sinkv2/util/helper.go +++ b/cdc/sinkv2/util/helper.go @@ -94,8 +94,16 @@ func GetTopicManagerAndTryCreateTopic( topicCfg *kafka.AutoCreateTopicConfig, adminClient pkafka.ClusterAdminClient, ) (manager.TopicManager, error) { +<<<<<<< HEAD:cdc/sinkv2/util/helper.go topicManager := manager.NewKafkaTopicManager(ctx, adminClient, topicCfg) if _, err := topicManager.CreateTopicAndWaitUntilVisible(topic); err != nil { +======= + topicManager := manager.NewKafkaTopicManager( + ctx, topic, changefeedID, adminClient, topicCfg, + ) + + if _, err := topicManager.CreateTopicAndWaitUntilVisible(ctx, topic); err != nil { +>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/util/helper.go return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err) } diff --git a/cdc/sinkv2/util/helper_test.go b/cdc/sinkv2/util/helper_test.go index df7911a8595..6db38bb54d5 100644 --- a/cdc/sinkv2/util/helper_test.go +++ b/cdc/sinkv2/util/helper_test.go @@ -14,12 +14,56 @@ package util import ( + "context" "net/url" "testing" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/sink/kafka" "github.com/stretchr/testify/require" ) +func TestPartition(t *testing.T) { + t.Parallel() + + adminClient := kafka.NewClusterAdminClientMockImpl() + defer adminClient.Close() + + cfg := &kafka.AutoCreateTopicConfig{ + AutoCreate: true, + PartitionNum: 2, + ReplicationFactor: 1, + } + + changefeedID := model.DefaultChangeFeedID("test") + ctx := context.Background() + + manager, err := GetTopicManagerAndTryCreateTopic(ctx, changefeedID, kafka.DefaultMockTopicName, cfg, adminClient) + require.NoError(t, err) + defer manager.Close() + + // default topic, real partition is 3, but 2 is set in the sink-uri, so return 2. + partitionsNum, err := manager.GetPartitionNum(ctx, kafka.DefaultMockTopicName) + require.NoError(t, err) + require.Equal(t, int32(2), partitionsNum) + + // new topic, create it with partition number as 2. + partitionsNum, err = manager.GetPartitionNum(ctx, "new-topic") + require.NoError(t, err) + require.Equal(t, int32(2), partitionsNum) + + // assume a topic already exist, the not default topic won't be affected by the default topic's partition number. + err = adminClient.CreateTopic(ctx, &kafka.TopicDetail{ + Name: "new-topic-2", + NumPartitions: 3, + }, false) + require.NoError(t, err) + + partitionsNum, err = manager.GetPartitionNum(ctx, "new-topic-2") + require.NoError(t, err) + require.Equal(t, int32(3), partitionsNum) +} + func TestGetTopic(t *testing.T) { t.Parallel()