Skip to content

Commit

Permalink
fix test.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Nov 8, 2023
1 parent 9e2c25d commit 0ab2464
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 20 deletions.
14 changes: 5 additions & 9 deletions cdc/sink/mq/manager/kafka_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"testing"

"github.com/pingcap/tiflow/cdc/model"
kafkaconfig "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka"
"github.com/pingcap/tiflow/pkg/sink/kafka"
"github.com/stretchr/testify/require"
Expand All @@ -37,8 +36,7 @@ func TestPartitions(t *testing.T) {
}

ctx := context.Background()
changefeedID := model.DefaultChangeFeedID("test")
manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg)
manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, adminClient, cfg)
defer manager.Close()
partitionsNum, err := manager.GetPartitionNum(kafka.DefaultMockTopicName)
require.NoError(t, err)
Expand All @@ -60,8 +58,7 @@ func TestCreateTopic(t *testing.T) {
ReplicationFactor: 1,
}

changefeedID := model.DefaultChangeFeedID("test")
manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg)
manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, adminClient, cfg)
defer manager.Close()
partitionNum, err := manager.CreateTopicAndWaitUntilVisible(kafka.DefaultMockTopicName)
require.NoError(t, err)
Expand All @@ -76,7 +73,7 @@ func TestCreateTopic(t *testing.T) {

// Try to create a topic without auto create.
cfg.AutoCreate = false
manager = NewKafkaTopicManager(ctx, "new-topic2", changefeedID, adminClient, cfg)
manager = NewKafkaTopicManager(ctx, "new-topic2", adminClient, cfg)
defer manager.Close()
_, err = manager.CreateTopicAndWaitUntilVisible("new-topic2")
require.Regexp(
Expand All @@ -93,7 +90,7 @@ func TestCreateTopic(t *testing.T) {
PartitionNum: 2,
ReplicationFactor: 4,
}
manager = NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg)
manager = NewKafkaTopicManager(ctx, topic, adminClient, cfg)
defer manager.Close()
_, err = manager.CreateTopicAndWaitUntilVisible(topic)
require.Regexp(
Expand All @@ -117,9 +114,8 @@ func TestCreateTopicWithDelay(t *testing.T) {
}

topic := "new_topic"
changefeedID := model.DefaultChangeFeedID("test")
ctx := context.Background()
manager := NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg)
manager := NewKafkaTopicManager(ctx, topic, adminClient, cfg)
defer manager.Close()
partitionNum, err := manager.createTopic(topic)
require.NoError(t, err)
Expand Down
22 changes: 11 additions & 11 deletions cdc/sinkv2/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ import (
"net/url"
"testing"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/sink/kafka"
"github.com/Shopify/sarama"
pkafka "github.com/pingcap/tiflow/pkg/sink/kafka"
"github.com/stretchr/testify/require"

"github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka"
)

func TestPartition(t *testing.T) {
t.Parallel()

adminClient := kafka.NewClusterAdminClientMockImpl()
adminClient := pkafka.NewClusterAdminClientMockImpl()
defer adminClient.Close()

cfg := &kafka.AutoCreateTopicConfig{
Expand All @@ -35,31 +37,29 @@ func TestPartition(t *testing.T) {
ReplicationFactor: 1,
}

changefeedID := model.DefaultChangeFeedID("test")
ctx := context.Background()

manager, err := GetTopicManagerAndTryCreateTopic(ctx, changefeedID, kafka.DefaultMockTopicName, cfg, adminClient)
manager, err := GetTopicManagerAndTryCreateTopic(ctx, pkafka.DefaultMockTopicName, cfg, adminClient)
require.NoError(t, err)
defer manager.Close()

// default topic, real partition is 3, but 2 is set in the sink-uri, so return 2.
partitionsNum, err := manager.GetPartitionNum(ctx, kafka.DefaultMockTopicName)
partitionsNum, err := manager.GetPartitionNum(pkafka.DefaultMockTopicName)
require.NoError(t, err)
require.Equal(t, int32(2), partitionsNum)

// new topic, create it with partition number as 2.
partitionsNum, err = manager.GetPartitionNum(ctx, "new-topic")
partitionsNum, err = manager.GetPartitionNum("new-topic")
require.NoError(t, err)
require.Equal(t, int32(2), partitionsNum)

// assume a topic already exist, the not default topic won't be affected by the default topic's partition number.
err = adminClient.CreateTopic(ctx, &kafka.TopicDetail{
Name: "new-topic-2",
err = adminClient.CreateTopic("new-topic-2", &sarama.TopicDetail{

NumPartitions: 3,
}, false)
require.NoError(t, err)

partitionsNum, err = manager.GetPartitionNum(ctx, "new-topic-2")
partitionsNum, err = manager.GetPartitionNum("new-topic-2")
require.NoError(t, err)
require.Equal(t, int32(3), partitionsNum)
}
Expand Down

0 comments on commit 0ab2464

Please sign in to comment.