diff --git a/cdc/sink/mq/manager/kafka_manager_test.go b/cdc/sink/mq/manager/kafka_manager_test.go index 4e87085eaac..6f817bc7613 100644 --- a/cdc/sink/mq/manager/kafka_manager_test.go +++ b/cdc/sink/mq/manager/kafka_manager_test.go @@ -17,7 +17,6 @@ import ( "context" "testing" - "github.com/pingcap/tiflow/cdc/model" kafkaconfig "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka" "github.com/pingcap/tiflow/pkg/sink/kafka" "github.com/stretchr/testify/require" @@ -37,8 +36,7 @@ func TestPartitions(t *testing.T) { } ctx := context.Background() - changefeedID := model.DefaultChangeFeedID("test") - manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg) + manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, adminClient, cfg) defer manager.Close() partitionsNum, err := manager.GetPartitionNum(kafka.DefaultMockTopicName) require.NoError(t, err) @@ -60,8 +58,7 @@ func TestCreateTopic(t *testing.T) { ReplicationFactor: 1, } - changefeedID := model.DefaultChangeFeedID("test") - manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg) + manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, adminClient, cfg) defer manager.Close() partitionNum, err := manager.CreateTopicAndWaitUntilVisible(kafka.DefaultMockTopicName) require.NoError(t, err) @@ -76,7 +73,7 @@ func TestCreateTopic(t *testing.T) { // Try to create a topic without auto create. cfg.AutoCreate = false - manager = NewKafkaTopicManager(ctx, "new-topic2", changefeedID, adminClient, cfg) + manager = NewKafkaTopicManager(ctx, "new-topic2", adminClient, cfg) defer manager.Close() _, err = manager.CreateTopicAndWaitUntilVisible("new-topic2") require.Regexp( @@ -93,7 +90,7 @@ func TestCreateTopic(t *testing.T) { PartitionNum: 2, ReplicationFactor: 4, } - manager = NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg) + manager = NewKafkaTopicManager(ctx, topic, adminClient, cfg) defer manager.Close() _, err = manager.CreateTopicAndWaitUntilVisible(topic) require.Regexp( @@ -117,9 +114,8 @@ func TestCreateTopicWithDelay(t *testing.T) { } topic := "new_topic" - changefeedID := model.DefaultChangeFeedID("test") ctx := context.Background() - manager := NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg) + manager := NewKafkaTopicManager(ctx, topic, adminClient, cfg) defer manager.Close() partitionNum, err := manager.createTopic(topic) require.NoError(t, err) diff --git a/cdc/sinkv2/util/helper_test.go b/cdc/sinkv2/util/helper_test.go index 6db38bb54d5..10424197baf 100644 --- a/cdc/sinkv2/util/helper_test.go +++ b/cdc/sinkv2/util/helper_test.go @@ -18,15 +18,17 @@ import ( "net/url" "testing" - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/sink/kafka" + "github.com/Shopify/sarama" + pkafka "github.com/pingcap/tiflow/pkg/sink/kafka" "github.com/stretchr/testify/require" + + "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka" ) func TestPartition(t *testing.T) { t.Parallel() - adminClient := kafka.NewClusterAdminClientMockImpl() + adminClient := pkafka.NewClusterAdminClientMockImpl() defer adminClient.Close() cfg := &kafka.AutoCreateTopicConfig{ @@ -35,31 +37,29 @@ func TestPartition(t *testing.T) { ReplicationFactor: 1, } - changefeedID := model.DefaultChangeFeedID("test") ctx := context.Background() - - manager, err := GetTopicManagerAndTryCreateTopic(ctx, changefeedID, kafka.DefaultMockTopicName, cfg, adminClient) + manager, err := GetTopicManagerAndTryCreateTopic(ctx, pkafka.DefaultMockTopicName, cfg, adminClient) require.NoError(t, err) defer manager.Close() // default topic, real partition is 3, but 2 is set in the sink-uri, so return 2. - partitionsNum, err := manager.GetPartitionNum(ctx, kafka.DefaultMockTopicName) + partitionsNum, err := manager.GetPartitionNum(pkafka.DefaultMockTopicName) require.NoError(t, err) require.Equal(t, int32(2), partitionsNum) // new topic, create it with partition number as 2. - partitionsNum, err = manager.GetPartitionNum(ctx, "new-topic") + partitionsNum, err = manager.GetPartitionNum("new-topic") require.NoError(t, err) require.Equal(t, int32(2), partitionsNum) // assume a topic already exist, the not default topic won't be affected by the default topic's partition number. - err = adminClient.CreateTopic(ctx, &kafka.TopicDetail{ - Name: "new-topic-2", + err = adminClient.CreateTopic("new-topic-2", &sarama.TopicDetail{ + NumPartitions: 3, }, false) require.NoError(t, err) - partitionsNum, err = manager.GetPartitionNum(ctx, "new-topic-2") + partitionsNum, err = manager.GetPartitionNum("new-topic-2") require.NoError(t, err) require.Equal(t, int32(3), partitionsNum) }