Skip to content

Commit

Permalink
fix test.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Nov 8, 2023
1 parent 17e906b commit bce6762
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 56 deletions.
3 changes: 2 additions & 1 deletion cdc/sink/mq/manager/kafka_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/Shopify/sarama"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
kafkaconfig "github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -61,10 +62,10 @@ type kafkaTopicManager struct {
func NewKafkaTopicManager(
ctx context.Context,
defaultTopic string,
changefeedID model.ChangeFeedID,
admin kafka.ClusterAdminClient,
cfg *kafkaconfig.AutoCreateTopicConfig,
) *kafkaTopicManager {
changefeedID := contextutil.ChangefeedIDFromCtx(ctx)
mgr := &kafkaTopicManager{
defaultTopic: defaultTopic,
changefeedID: changefeedID,
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func NewKafkaSaramaSink(ctx context.Context, sinkURI *url.URL,
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

topicManager := manager.NewKafkaTopicManager(ctx, adminClient, baseConfig.DeriveTopicConfig())
topicManager := manager.NewKafkaTopicManager(ctx, topic, adminClient, baseConfig.DeriveTopicConfig())
if _, err := topicManager.CreateTopicAndWaitUntilVisible(topic); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err)
}
Expand Down
49 changes: 4 additions & 45 deletions cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,9 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go
leader, topic := initBroker(t, kafka.DefaultMockPartitionNum)
defer leader.Close()
=======
// partition-number is 2, so only send DDL events to 2 partitions.
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go
uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" +
"&max-message-bytes=1048576&partition-num=2" +
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol"
Expand Down Expand Up @@ -114,8 +111,7 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) {
err = s.WriteDDLEvent(ctx, ddl)
require.Nil(t, err)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(),
<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go
3, "All partitions should be broadcast")
2, "All partitions should be broadcast")
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{
Topic: "mock_topic",
Partition: 0,
Expand All @@ -124,15 +120,6 @@ func TestWriteDDLEventToAllPartitions(t *testing.T) {
Topic: "mock_topic",
Partition: 1,
}), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{
Topic: "mock_topic",
Partition: 2,
}), 1)
=======
2, "All partitions should be broadcast")
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 1), 1)
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go
}

func TestWriteDDLEventToZeroPartition(t *testing.T) {
Expand Down Expand Up @@ -192,12 +179,9 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go
leader, topic := initBroker(t, kafka.DefaultMockPartitionNum)
defer leader.Close()
=======
// partition-num is set to 2, so send checkpoint to 2 partitions.
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go
uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" +
"&max-message-bytes=1048576&partition-num=2" +
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip" +
Expand All @@ -220,8 +204,7 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) {
require.Nil(t, err)

require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(),
<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go
3, "All partitions should be broadcast")
2, "All partitions should be broadcast")
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{
Topic: "mock_topic",
Partition: 0,
Expand All @@ -230,15 +213,7 @@ func TestWriteCheckpointTsToDefaultTopic(t *testing.T) {
Topic: "mock_topic",
Partition: 1,
}), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{
Topic: "mock_topic",
Partition: 2,
}), 1)
=======
2, "All partitions should be broadcast")
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 1), 1)
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go

}

func TestWriteCheckpointTsToTableTopics(t *testing.T) {
Expand Down Expand Up @@ -298,20 +273,11 @@ func TestWriteCheckpointTsToTableTopics(t *testing.T) {
require.Nil(t, err)

require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetAllEvents(),
<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/mq_ddl_sink_test.go
6, "All topics and partitions should be broadcast")
4, "All topics and partitions should be broadcast")
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{
Topic: "mock_topic",
Partition: 0,
}), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{
Topic: "mock_topic",
Partition: 1,
}), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{
Topic: "mock_topic",
Partition: 2,
}), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents(mqv1.TopicPartitionKey{
Topic: "cdc_person",
Partition: 0,
Expand All @@ -324,13 +290,6 @@ func TestWriteCheckpointTsToTableTopics(t *testing.T) {
Topic: "cdc_person2",
Partition: 0,
}), 1)
=======
4, "All topics and partitions should be broadcast")
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("mock_topic", 0), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person", 0), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person1", 0), 1)
require.Len(t, s.producer.(*ddlproducer.MockDDLProducer).GetEvents("cdc_person2", 0), 1)
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/ddlsink/mq/mq_ddl_sink_test.go
}

func TestWriteCheckpointTsWhenCanalJsonTiDBExtensionIsDisable(t *testing.T) {
Expand Down
10 changes: 1 addition & 9 deletions cdc/sinkv2/util/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,8 @@ func GetTopicManagerAndTryCreateTopic(
topicCfg *kafka.AutoCreateTopicConfig,
adminClient pkafka.ClusterAdminClient,
) (manager.TopicManager, error) {
<<<<<<< HEAD:cdc/sinkv2/util/helper.go
topicManager := manager.NewKafkaTopicManager(ctx, adminClient, topicCfg)
topicManager := manager.NewKafkaTopicManager(ctx, topic, adminClient, topicCfg)
if _, err := topicManager.CreateTopicAndWaitUntilVisible(topic); err != nil {
=======
topicManager := manager.NewKafkaTopicManager(
ctx, topic, changefeedID, adminClient, topicCfg,
)

if _, err := topicManager.CreateTopicAndWaitUntilVisible(ctx, topic); err != nil {
>>>>>>> d30b4c3793 (kafka(ticdc): topic manager return the partition number specified in the sink-uri (#9955)):cdc/sink/util/helper.go
return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err)
}

Expand Down

0 comments on commit bce6762

Please sign in to comment.