Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#9955
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
3AceShowHand authored and ti-chi-bot committed Oct 30, 2023
1 parent 4439ae4 commit 1497f48
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 22 deletions.
16 changes: 7 additions & 9 deletions cdc/sink/ddlsink/mq/mq_ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// partition-number is 2, so only send DDL events to 2 partitions.
uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" +
"&max-message-bytes=1048576&partition-num=1" +
"&max-message-bytes=1048576&partition-num=2" +
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol"
uri := fmt.Sprintf(uriTemplate, "127.0.0.1:9092", kafka.DefaultMockTopicName)

Expand Down Expand Up @@ -87,10 +88,9 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) {
err = s.WriteDDLEvent(ctx, ddl)
require.NoError(t, err)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(),
3, "All partitions should be broadcast")
2, "All partitions should be broadcast")
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 1), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 2), 1)
}

func TestWriteDDLEventToZeroPartition(t *testing.T) {
Expand Down Expand Up @@ -142,8 +142,9 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// partition-num is set to 2, so send checkpoint to 2 partitions.
uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" +
"&max-message-bytes=1048576&partition-num=1" +
"&max-message-bytes=1048576&partition-num=2" +
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip" +
"&protocol=canal-json&enable-tidb-extension=true"
uri := fmt.Sprintf(uriTemplate, "127.0.0.1:9092", kafka.DefaultMockTopicName)
Expand All @@ -167,10 +168,9 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) {
require.Nil(t, err)

require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(),
3, "All partitions should be broadcast")
2, "All partitions should be broadcast")
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 1), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 2), 1)
}

func TestWriteCheckpointTsToTableTopics(t *testing.T) {
Expand Down Expand Up @@ -231,10 +231,8 @@ func TestWriteCheckpointTsToTableTopics(t *testing.T) {
require.NoError(t, err)

require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(),
6, "All topics and partitions should be broadcast")
4, "All topics and partitions should be broadcast")
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 1), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 2), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person", 0), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person1", 0), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person2", 0), 1)
Expand Down
25 changes: 23 additions & 2 deletions cdc/sink/dmlsink/mq/manager/kafka_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
type kafkaTopicManager struct {
changefeedID model.ChangeFeedID

defaultTopic string

admin kafka.ClusterAdminClient

cfg *kafka.AutoCreateTopicConfig
Expand All @@ -56,11 +58,17 @@ type kafkaTopicManager struct {
// NewKafkaTopicManager creates a new topic manager.
func NewKafkaTopicManager(
ctx context.Context,
<<<<<<< HEAD
=======
defaultTopic string,
changefeedID model.ChangeFeedID,
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955))
admin kafka.ClusterAdminClient,
cfg *kafka.AutoCreateTopicConfig,
) *kafkaTopicManager {
changefeedID := contextutil.ChangefeedIDFromCtx(ctx)
mgr := &kafkaTopicManager{
defaultTopic: defaultTopic,
changefeedID: changefeedID,
admin: admin,
cfg: cfg,
Expand Down Expand Up @@ -166,6 +174,15 @@ func (m *kafkaTopicManager) fetchAllTopicsPartitionsNum(
return nil, err
}

// it may happen the following case:
// 1. user create the default topic with partition number set as 3 manually
// 2. set the partition-number as 2 in the sink-uri.
// in the such case, we should use 2 instead of 3 as the partition number.
_, ok := numPartitions[m.defaultTopic]
if ok {
numPartitions[m.defaultTopic] = m.cfg.PartitionNum
}

log.Info(
"Kafka admin client describe topics success",
zap.String("namespace", m.changefeedID.Namespace),
Expand Down Expand Up @@ -272,8 +289,12 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(
return 0, errors.Trace(err)
}
if detail, ok := topicDetails[topicName]; ok {
m.tryUpdatePartitionsAndLogging(topicName, detail.NumPartitions)
return detail.NumPartitions, nil
numPartition := detail.NumPartitions
if topicName == m.defaultTopic {
numPartition = m.cfg.PartitionNum
}
m.tryUpdatePartitionsAndLogging(topicName, numPartition)
return numPartition, nil
}

partitionNum, err := m.createTopic(ctx, topicName)
Expand Down
45 changes: 34 additions & 11 deletions cdc/sink/dmlsink/mq/manager/kafka_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/require"
)

<<<<<<< HEAD
func TestPartitions(t *testing.T) {
t.Parallel()

Expand All @@ -43,6 +44,8 @@ func TestPartitions(t *testing.T) {
require.Equal(t, int32(3), partitionsNum)
}

=======
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955))
func TestCreateTopic(t *testing.T) {
t.Parallel()

Expand All @@ -54,23 +57,32 @@ func TestCreateTopic(t *testing.T) {
ReplicationFactor: 1,
}

changefeedID := model.DefaultChangeFeedID("test")
ctx := context.Background()
<<<<<<< HEAD
manager := NewKafkaTopicManager(ctx, adminClient, cfg)
=======
manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg)
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955))
defer manager.Close()
partitionNum, err := manager.CreateTopicAndWaitUntilVisible(ctx, kafka.DefaultMockTopicName)
require.Nil(t, err)
require.Equal(t, int32(3), partitionNum)
require.NoError(t, err)
require.Equal(t, int32(2), partitionNum)

partitionNum, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic")
require.Nil(t, err)
require.NoError(t, err)
require.Equal(t, int32(2), partitionNum)
partitionsNum, err := manager.GetPartitionNum(ctx, "new-topic")
require.Nil(t, err)
require.NoError(t, err)
require.Equal(t, int32(2), partitionsNum)

// Try to create a topic without auto create.
cfg.AutoCreate = false
<<<<<<< HEAD
manager = NewKafkaTopicManager(ctx, adminClient, cfg)
=======
manager = NewKafkaTopicManager(ctx, "new-topic2", changefeedID, adminClient, cfg)
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955))
defer manager.Close()
_, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic2")
require.Regexp(
Expand All @@ -79,16 +91,21 @@ func TestCreateTopic(t *testing.T) {
err,
)

topic := "new-topic-failed"
// Invalid replication factor.
// It happens when replication-factor is greater than the number of brokers.
cfg = &kafka.AutoCreateTopicConfig{
AutoCreate: true,
PartitionNum: 2,
ReplicationFactor: 4,
}
<<<<<<< HEAD
manager = NewKafkaTopicManager(ctx, adminClient, cfg)
=======
manager = NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg)
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955))
defer manager.Close()
_, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic-failed")
_, err = manager.CreateTopicAndWaitUntilVisible(ctx, topic)
require.Regexp(
t,
"kafka create topic failed: kafka server: Replication-factor is invalid",
Expand All @@ -107,14 +124,20 @@ func TestCreateTopicWithDelay(t *testing.T) {
ReplicationFactor: 1,
}

topic := "new_topic"
changefeedID := model.DefaultChangeFeedID("test")
ctx := context.Background()
<<<<<<< HEAD
manager := NewKafkaTopicManager(ctx, adminClient, cfg)
=======
manager := NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg)
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955))
defer manager.Close()
partitionNum, err := manager.createTopic(ctx, "new_topic")
require.Nil(t, err)
err = adminClient.SetRemainingFetchesUntilTopicVisible("new_topic", 3)
require.Nil(t, err)
err = manager.waitUntilTopicVisible(ctx, "new_topic")
require.Nil(t, err)
partitionNum, err := manager.createTopic(ctx, topic)
require.NoError(t, err)
err = adminClient.SetRemainingFetchesUntilTopicVisible(topic, 3)
require.NoError(t, err)
err = manager.waitUntilTopicVisible(ctx, topic)
require.NoError(t, err)
require.Equal(t, int32(2), partitionNum)
}
4 changes: 4 additions & 0 deletions cdc/sink/util/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,11 @@ func GetTopicManagerAndTryCreateTopic(
adminClient kafka.ClusterAdminClient,
) (manager.TopicManager, error) {
topicManager := manager.NewKafkaTopicManager(
<<<<<<< HEAD
ctx, adminClient, topicCfg,
=======
ctx, topic, changefeedID, adminClient, topicCfg,
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955))
)

if _, err := topicManager.CreateTopicAndWaitUntilVisible(ctx, topic); err != nil {
Expand Down
44 changes: 44 additions & 0 deletions cdc/sink/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,56 @@
package util

import (
"context"
"net/url"
"testing"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/sink/kafka"
"github.com/stretchr/testify/require"
)

func TestPartition(t *testing.T) {
t.Parallel()

adminClient := kafka.NewClusterAdminClientMockImpl()
defer adminClient.Close()

cfg := &kafka.AutoCreateTopicConfig{
AutoCreate: true,
PartitionNum: 2,
ReplicationFactor: 1,
}

changefeedID := model.DefaultChangeFeedID("test")
ctx := context.Background()

manager, err := GetTopicManagerAndTryCreateTopic(ctx, changefeedID, kafka.DefaultMockTopicName, cfg, adminClient)
require.NoError(t, err)
defer manager.Close()

// default topic, real partition is 3, but 2 is set in the sink-uri, so return 2.
partitionsNum, err := manager.GetPartitionNum(ctx, kafka.DefaultMockTopicName)
require.NoError(t, err)
require.Equal(t, int32(2), partitionsNum)

// new topic, create it with partition number as 2.
partitionsNum, err = manager.GetPartitionNum(ctx, "new-topic")
require.NoError(t, err)
require.Equal(t, int32(2), partitionsNum)

// assume a topic already exist, the not default topic won't be affected by the default topic's partition number.
err = adminClient.CreateTopic(ctx, &kafka.TopicDetail{
Name: "new-topic-2",
NumPartitions: 3,
}, false)
require.NoError(t, err)

partitionsNum, err = manager.GetPartitionNum(ctx, "new-topic-2")
require.NoError(t, err)
require.Equal(t, int32(3), partitionsNum)
}

func TestGetTopic(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 1497f48

Please sign in to comment.