Skip to content

Commit

Permalink
add one unit test.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Oct 27, 2023
1 parent 97998aa commit 2ec2ef4
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 1 deletion.
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/mq/manager/kafka_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (m *kafkaTopicManager) fetchAllTopicsPartitionsNum(
}

// it may happen the following case:
// 1. use create the default topic with partition number set as 3 manually
// 1. user create the default topic with partition number set as 3 manually
// 2. set the partition-number as 2 in the sink-uri.
// in the such case, we should use 2 instead of 3 as the partition number.
_, ok := numPartitions[m.defaultTopic]
Expand Down
11 changes: 11 additions & 0 deletions cdc/sink/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ func TestPartition(t *testing.T) {
partitionsNum, err = manager.GetPartitionNum(ctx, "new-topic")
require.NoError(t, err)
require.Equal(t, int32(2), partitionsNum)

// assume a topic already exist, the not default topic won't be affected by the default topic's partition number.
err = adminClient.CreateTopic(ctx, &kafka.TopicDetail{
Name: "new-topic-2",
NumPartitions: 3,
}, false)
require.NoError(t, err)

partitionsNum, err = manager.GetPartitionNum(ctx, "new-topic-2")
require.NoError(t, err)
require.Equal(t, int32(3), partitionsNum)
}

func TestGetTopic(t *testing.T) {
Expand Down

0 comments on commit 2ec2ef4

Please sign in to comment.