Skip to content

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Oct 26, 2023
1 parent 8950e9b commit 600384c
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 32 deletions.
8 changes: 6 additions & 2 deletions cdc/sink/dmlsink/mq/manager/kafka_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,12 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(
return 0, errors.Trace(err)
}
if detail, ok := topicDetails[topicName]; ok {
m.tryUpdatePartitionsAndLogging(topicName, detail.NumPartitions)
return detail.NumPartitions, nil
numPartition := detail.NumPartitions
if topicName == m.defaultTopic {
numPartition = m.cfg.PartitionNum
}
m.tryUpdatePartitionsAndLogging(topicName, numPartition)
return numPartition, nil
}

partitionNum, err := m.createTopic(ctx, topicName)
Expand Down
37 changes: 7 additions & 30 deletions cdc/sink/dmlsink/mq/manager/kafka_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,6 @@ import (
"github.com/stretchr/testify/require"
)

func TestPartitions(t *testing.T) {
t.Parallel()

adminClient := kafka.NewClusterAdminClientMockImpl()
defer adminClient.Close()
cfg := &kafka.AutoCreateTopicConfig{
AutoCreate: true,
PartitionNum: 2,
ReplicationFactor: 1,
}

changefeedID := model.DefaultChangeFeedID("test")
ctx := context.Background()
manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg)
defer manager.Close()

partitionsNum, err := manager.GetPartitionNum(
ctx,
kafka.DefaultMockTopicName)
require.Nil(t, err)
require.Equal(t, int32(3), partitionsNum)
}

func TestCreateTopic(t *testing.T) {
t.Parallel()

Expand All @@ -61,14 +38,14 @@ func TestCreateTopic(t *testing.T) {
manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg)
defer manager.Close()
partitionNum, err := manager.CreateTopicAndWaitUntilVisible(ctx, kafka.DefaultMockTopicName)
require.Nil(t, err)
require.Equal(t, int32(3), partitionNum)
require.NoError(t, err)
require.Equal(t, int32(2), partitionNum)

partitionNum, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic")
require.Nil(t, err)
require.NoError(t, err)
require.Equal(t, int32(2), partitionNum)
partitionsNum, err := manager.GetPartitionNum(ctx, "new-topic")
require.Nil(t, err)
require.NoError(t, err)
require.Equal(t, int32(2), partitionsNum)

// Try to create a topic without auto create.
Expand Down Expand Up @@ -117,10 +94,10 @@ func TestCreateTopicWithDelay(t *testing.T) {
manager := NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg)
defer manager.Close()
partitionNum, err := manager.createTopic(ctx, topic)
require.Nil(t, err)
require.NoError(t, err)
err = adminClient.SetRemainingFetchesUntilTopicVisible(topic, 3)
require.Nil(t, err)
require.NoError(t, err)
err = manager.waitUntilTopicVisible(ctx, topic)
require.Nil(t, err)
require.NoError(t, err)
require.Equal(t, int32(2), partitionNum)
}
33 changes: 33 additions & 0 deletions cdc/sink/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,45 @@
package util

import (
"context"
"net/url"
"testing"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/sink/kafka"
"github.com/stretchr/testify/require"
)

func TestPartition(t *testing.T) {
t.Parallel()

adminClient := kafka.NewClusterAdminClientMockImpl()
defer adminClient.Close()

cfg := &kafka.AutoCreateTopicConfig{
AutoCreate: true,
PartitionNum: 2,
ReplicationFactor: 1,
}

changefeedID := model.DefaultChangeFeedID("test")
ctx := context.Background()

manager, err := GetTopicManagerAndTryCreateTopic(ctx, changefeedID, kafka.DefaultMockTopicName, cfg, adminClient)
require.NoError(t, err)
defer manager.Close()

// default topic, real partition is 3, but 2 is set in the sink-uri, so return 2.
partitionsNum, err := manager.GetPartitionNum(ctx, kafka.DefaultMockTopicName)
require.NoError(t, err)
require.Equal(t, int32(2), partitionsNum)

// new topic, create it with partition number as 2.
partitionsNum, err = manager.GetPartitionNum(ctx, "new-topic")
require.NoError(t, err)
require.Equal(t, int32(2), partitionsNum)
}

func TestGetTopic(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 600384c

Please sign in to comment.