Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#9955
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
3AceShowHand authored and ti-chi-bot committed Oct 30, 2023
1 parent 4febedd commit 485aa85
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 2 deletions.
27 changes: 27 additions & 0 deletions cdc/sink/mq/manager/kafka_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
type kafkaTopicManager struct {
changefeedID model.ChangeFeedID

defaultTopic string

admin kafka.ClusterAdminClient

cfg *kafkaconfig.AutoCreateTopicConfig
Expand All @@ -59,11 +61,17 @@ type kafkaTopicManager struct {
// NewKafkaTopicManager creates a new topic manager.
func NewKafkaTopicManager(
ctx context.Context,
<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager.go
=======
defaultTopic string,
changefeedID model.ChangeFeedID,
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager.go
admin kafka.ClusterAdminClient,
cfg *kafkaconfig.AutoCreateTopicConfig,
) *kafkaTopicManager {
changefeedID := contextutil.ChangefeedIDFromCtx(ctx)
mgr := &kafkaTopicManager{
defaultTopic: defaultTopic,
changefeedID: changefeedID,
admin: admin,
cfg: cfg,
Expand Down Expand Up @@ -172,6 +180,15 @@ func (m *kafkaTopicManager) getMetadataOfTopics() ([]*sarama.TopicMetadata, erro
return nil, err
}

// it may happen the following case:
// 1. user create the default topic with partition number set as 3 manually
// 2. set the partition-number as 2 in the sink-uri.
// in the such case, we should use 2 instead of 3 as the partition number.
_, ok := numPartitions[m.defaultTopic]
if ok {
numPartitions[m.defaultTopic] = m.cfg.PartitionNum
}

log.Info(
"Kafka admin client describe topics success",
zap.String("namespace", m.changefeedID.Namespace),
Expand Down Expand Up @@ -287,6 +304,7 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(topicName string) (in
if err != nil {
return 0, errors.Trace(err)
}
<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager.go
for _, detail := range topicDetails {
if detail.Err == sarama.ErrNoError {
if detail.Name == topicName {
Expand All @@ -307,6 +325,15 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(topicName string) (in
zap.Error(detail.Err))
return 0, errors.Trace(detail.Err)
}
=======
if detail, ok := topicDetails[topicName]; ok {
numPartition := detail.NumPartitions
if topicName == m.defaultTopic {
numPartition = m.cfg.PartitionNum
}
m.tryUpdatePartitionsAndLogging(topicName, numPartition)
return numPartition, nil
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager.go
}

partitionNum, err := m.createTopic(topicName)
Expand Down
44 changes: 44 additions & 0 deletions cdc/sink/mq/manager/kafka_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/require"
)

<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager_test.go
func TestPartitions(t *testing.T) {
t.Parallel()

Expand All @@ -42,6 +43,8 @@ func TestPartitions(t *testing.T) {
require.Equal(t, int32(3), partitionsNum)
}

=======
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager_test.go
func TestCreateTopic(t *testing.T) {
t.Parallel()

Expand All @@ -57,6 +60,7 @@ func TestCreateTopic(t *testing.T) {
ReplicationFactor: 1,
}

<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager_test.go
manager := NewKafkaTopicManager(ctx, adminClient, cfg)
defer manager.Close()
partitionNum, err := manager.CreateTopicAndWaitUntilVisible(kafka.DefaultMockTopicName)
Expand All @@ -68,11 +72,30 @@ func TestCreateTopic(t *testing.T) {
require.Equal(t, int32(2), partitionNum)
partitionsNum, err := manager.GetPartitionNum("new-topic")
require.Nil(t, err)
=======
changefeedID := model.DefaultChangeFeedID("test")
ctx := context.Background()
manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg)
defer manager.Close()
partitionNum, err := manager.CreateTopicAndWaitUntilVisible(ctx, kafka.DefaultMockTopicName)
require.NoError(t, err)
require.Equal(t, int32(2), partitionNum)

partitionNum, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic")
require.NoError(t, err)
require.Equal(t, int32(2), partitionNum)
partitionsNum, err := manager.GetPartitionNum(ctx, "new-topic")
require.NoError(t, err)
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager_test.go
require.Equal(t, int32(2), partitionsNum)

// Try to create a topic without auto create.
cfg.AutoCreate = false
<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager_test.go
manager = NewKafkaTopicManager(ctx, adminClient, cfg)
=======
manager = NewKafkaTopicManager(ctx, "new-topic2", changefeedID, adminClient, cfg)
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager_test.go
defer manager.Close()
_, err = manager.CreateTopicAndWaitUntilVisible("new-topic2")
require.Regexp(
Expand All @@ -81,16 +104,23 @@ func TestCreateTopic(t *testing.T) {
err,
)

topic := "new-topic-failed"
// Invalid replication factor.
// It happens when replication-factor is greater than the number of brokers.
cfg = &kafkaconfig.AutoCreateTopicConfig{
AutoCreate: true,
PartitionNum: 2,
ReplicationFactor: 4,
}
<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager_test.go
manager = NewKafkaTopicManager(ctx, adminClient, cfg)
defer manager.Close()
_, err = manager.CreateTopicAndWaitUntilVisible("new-topic-failed")
=======
manager = NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg)
defer manager.Close()
_, err = manager.CreateTopicAndWaitUntilVisible(ctx, topic)
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager_test.go
require.Regexp(
t,
"new sarama producer: kafka server: Replication-factor is invalid",
Expand All @@ -111,6 +141,7 @@ func TestCreateTopicWithDelay(t *testing.T) {
ReplicationFactor: 1,
}

<<<<<<< HEAD:cdc/sink/mq/manager/kafka_manager_test.go
manager := NewKafkaTopicManager(context.TODO(), adminClient, cfg)
defer manager.Close()
partitionNum, err := manager.createTopic("new_topic")
Expand All @@ -119,5 +150,18 @@ func TestCreateTopicWithDelay(t *testing.T) {
require.Nil(t, err)
err = manager.waitUntilTopicVisible("new_topic")
require.Nil(t, err)
=======
topic := "new_topic"
changefeedID := model.DefaultChangeFeedID("test")
ctx := context.Background()
manager := NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg)
defer manager.Close()
partitionNum, err := manager.createTopic(ctx, topic)
require.NoError(t, err)
err = adminClient.SetRemainingFetchesUntilTopicVisible(topic, 3)
require.NoError(t, err)
err = manager.waitUntilTopicVisible(ctx, topic)
require.NoError(t, err)
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/dmlsink/mq/manager/kafka_manager_test.go
require.Equal(t, int32(2), partitionNum)
}
32 changes: 30 additions & 2 deletions cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,14 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go
leader, topic := initBroker(t, kafka.DefaultMockPartitionNum)
defer leader.Close()
=======
// partition-number is 2, so only send DDL events to 2 partitions.
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go
uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" +
"&max-message-bytes=1048576&partition-num=1" +
"&max-message-bytes=1048576&partition-num=2" +
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol"
uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic)

Expand All @@ -110,6 +114,7 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) {
err = s.WriteDDLEvent(ctx, ddl)
require.Nil(t, err)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(),
<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go
3, "All partitions should be broadcast")
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{
Topic: "mock_topic",
Expand All @@ -123,6 +128,11 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) {
Topic: "mock_topic",
Partition: 2,
}), 1)
=======
2, "All partitions should be broadcast")
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 1), 1)
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go
}

func TestWriteDDLEventToZeroPartition(t *testing.T) {
Expand Down Expand Up @@ -182,10 +192,14 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go
leader, topic := initBroker(t, kafka.DefaultMockPartitionNum)
defer leader.Close()
=======
// partition-num is set to 2, so send checkpoint to 2 partitions.
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go
uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" +
"&max-message-bytes=1048576&partition-num=1" +
"&max-message-bytes=1048576&partition-num=2" +
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip" +
"&protocol=canal-json&enable-tidb-extension=true"
uri := fmt.Sprintf(uriTemplate, leader.Addr(), topic)
Expand All @@ -206,6 +220,7 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) {
require.Nil(t, err)

require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(),
<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go
3, "All partitions should be broadcast")
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{
Topic: "mock_topic",
Expand All @@ -219,6 +234,11 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) {
Topic: "mock_topic",
Partition: 2,
}), 1)
=======
2, "All partitions should be broadcast")
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 1), 1)
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go
}

func TestWriteCheckpointTsToTableTopics(t *testing.T) {
Expand Down Expand Up @@ -278,6 +298,7 @@ func TestWriteCheckpointTsToTableTopics(t *testing.T) {
require.Nil(t, err)

require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(),
<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go
6, "All topics and partitions should be broadcast")
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{
Topic: "mock_topic",
Expand All @@ -303,6 +324,13 @@ func TestWriteCheckpointTsToTableTopics(t *testing.T) {
Topic: "cdc_person2",
Partition: 0,
}), 1)
=======
4, "All topics and partitions should be broadcast")
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person", 0), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person1", 0), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person2", 0), 1)
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go
}

func TestWriteCheckpointTsWhenCanalJsonTiDBExtensionIsDisable(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions cdc/sinkv2/util/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,16 @@ func GetTopicManagerAndTryCreateTopic(
topicCfg *kafka.AutoCreateTopicConfig,
adminClient pkafka.ClusterAdminClient,
) (manager.TopicManager, error) {
<<<<<<< HEAD:cdc/sinkv2/util/helper.go
topicManager := manager.NewKafkaTopicManager(ctx, adminClient, topicCfg)
if _, err := topicManager.CreateTopicAndWaitUntilVisible(topic); err != nil {
=======
topicManager := manager.NewKafkaTopicManager(
ctx, topic, changefeedID, adminClient, topicCfg,
)

if _, err := topicManager.CreateTopicAndWaitUntilVisible(ctx, topic); err != nil {
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/util/helper.go
return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err)
}

Expand Down
44 changes: 44 additions & 0 deletions cdc/sinkv2/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,56 @@
package util

import (
"context"
"net/url"
"testing"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/sink/kafka"
"github.com/stretchr/testify/require"
)

func TestPartition(t *testing.T) {
t.Parallel()

adminClient := kafka.NewClusterAdminClientMockImpl()
defer adminClient.Close()

cfg := &kafka.AutoCreateTopicConfig{
AutoCreate: true,
PartitionNum: 2,
ReplicationFactor: 1,
}

changefeedID := model.DefaultChangeFeedID("test")
ctx := context.Background()

manager, err := GetTopicManagerAndTryCreateTopic(ctx, changefeedID, kafka.DefaultMockTopicName, cfg, adminClient)
require.NoError(t, err)
defer manager.Close()

// default topic, real partition is 3, but 2 is set in the sink-uri, so return 2.
partitionsNum, err := manager.GetPartitionNum(ctx, kafka.DefaultMockTopicName)
require.NoError(t, err)
require.Equal(t, int32(2), partitionsNum)

// new topic, create it with partition number as 2.
partitionsNum, err = manager.GetPartitionNum(ctx, "new-topic")
require.NoError(t, err)
require.Equal(t, int32(2), partitionsNum)

// assume a topic already exist, the not default topic won't be affected by the default topic's partition number.
err = adminClient.CreateTopic(ctx, &kafka.TopicDetail{
Name: "new-topic-2",
NumPartitions: 3,
}, false)
require.NoError(t, err)

partitionsNum, err = manager.GetPartitionNum(ctx, "new-topic-2")
require.NoError(t, err)
require.Equal(t, int32(3), partitionsNum)
}

func TestGetTopic(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 485aa85

Please sign in to comment.