From 596aa002fe782ce8a9186ee4565f13aa88252f46 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 20 Sep 2023 15:47:37 +0800 Subject: [PATCH 1/6] add more logs to sarama admin client to help debug close the client issue. --- cdc/sink/ddlsink/mq/kafka_ddl_sink.go | 1 + cdc/sink/dmlsink/mq/kafka_dml_sink.go | 1 + cdc/sink/dmlsink/mq/manager/kafka_manager.go | 4 ++++ cdc/sink/dmlsink/mq/manager/kafka_manager_test.go | 11 ++++++----- cdc/sink/util/helper.go | 4 +++- pkg/sink/kafka/admin.go | 4 ++++ 6 files changed, 19 insertions(+), 6 deletions(-) diff --git a/cdc/sink/ddlsink/mq/kafka_ddl_sink.go b/cdc/sink/ddlsink/mq/kafka_ddl_sink.go index f77571db39d..70041e85f2e 100644 --- a/cdc/sink/ddlsink/mq/kafka_ddl_sink.go +++ b/cdc/sink/ddlsink/mq/kafka_ddl_sink.go @@ -83,6 +83,7 @@ func NewKafkaDDLSink( topic, options.DeriveTopicConfig(), adminClient, + tiflowutil.RoleOwner, ) if err != nil { return nil, errors.Trace(err) diff --git a/cdc/sink/dmlsink/mq/kafka_dml_sink.go b/cdc/sink/dmlsink/mq/kafka_dml_sink.go index c4c6bfc0f39..646f5eede62 100644 --- a/cdc/sink/dmlsink/mq/kafka_dml_sink.go +++ b/cdc/sink/dmlsink/mq/kafka_dml_sink.go @@ -86,6 +86,7 @@ func NewKafkaDMLSink( topic, options.DeriveTopicConfig(), adminClient, + tiflowutil.RoleProcessor, ) if err != nil { return nil, errors.Trace(err) diff --git a/cdc/sink/dmlsink/mq/manager/kafka_manager.go b/cdc/sink/dmlsink/mq/manager/kafka_manager.go index ea8c3991d3b..4727e0e684b 100644 --- a/cdc/sink/dmlsink/mq/manager/kafka_manager.go +++ b/cdc/sink/dmlsink/mq/manager/kafka_manager.go @@ -25,6 +25,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/sink/kafka" + "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) @@ -39,6 +40,7 @@ const ( // kafkaTopicManager is a manager for kafka topics. type kafkaTopicManager struct { changefeedID model.ChangeFeedID + role util.Role admin kafka.ClusterAdminClient @@ -58,9 +60,11 @@ func NewKafkaTopicManager( changefeedID model.ChangeFeedID, admin kafka.ClusterAdminClient, cfg *kafka.AutoCreateTopicConfig, + role util.Role, ) *kafkaTopicManager { mgr := &kafkaTopicManager{ changefeedID: changefeedID, + role: role, admin: admin, cfg: cfg, metaRefreshTicker: time.NewTicker(metaRefreshInterval), diff --git a/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go b/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go index 837ca1139d2..728230a06b2 100644 --- a/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go +++ b/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/sink/kafka" + "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" ) @@ -34,7 +35,7 @@ func TestPartitions(t *testing.T) { } ctx := context.Background() - manager := NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg) + manager := NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg, util.RoleTester) defer manager.Close() partitionsNum, err := manager.GetPartitionNum( @@ -56,7 +57,7 @@ func TestCreateTopic(t *testing.T) { } ctx := context.Background() - manager := NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg) + manager := NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg, util.RoleTester) defer manager.Close() partitionNum, err := manager.CreateTopicAndWaitUntilVisible(ctx, kafka.DefaultMockTopicName) require.Nil(t, err) @@ -71,7 +72,7 @@ func TestCreateTopic(t *testing.T) { // Try to create a topic without auto create. cfg.AutoCreate = false - manager = NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg) + manager = NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg, util.RoleTester) defer manager.Close() _, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic2") require.Regexp( @@ -87,7 +88,7 @@ func TestCreateTopic(t *testing.T) { PartitionNum: 2, ReplicationFactor: 4, } - manager = NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg) + manager = NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg, util.RoleTester) defer manager.Close() _, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic-failed") require.Regexp( @@ -109,7 +110,7 @@ func TestCreateTopicWithDelay(t *testing.T) { } ctx := context.Background() - manager := NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg) + manager := NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg, util.RoleTester) defer manager.Close() partitionNum, err := manager.createTopic(ctx, "new_topic") require.Nil(t, err) diff --git a/cdc/sink/util/helper.go b/cdc/sink/util/helper.go index 4ff6db3e6d0..e784651d028 100644 --- a/cdc/sink/util/helper.go +++ b/cdc/sink/util/helper.go @@ -24,6 +24,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/sink/kafka" + "github.com/pingcap/tiflow/pkg/util" ) // GetTopic returns the topic name from the sink URI. @@ -95,9 +96,10 @@ func GetTopicManagerAndTryCreateTopic( topic string, topicCfg *kafka.AutoCreateTopicConfig, adminClient kafka.ClusterAdminClient, + role util.Role, ) (manager.TopicManager, error) { topicManager := manager.NewKafkaTopicManager( - ctx, changefeedID, adminClient, topicCfg, + ctx, changefeedID, adminClient, topicCfg, role, ) if _, err := topicManager.CreateTopicAndWaitUntilVisible(ctx, topic); err != nil { diff --git a/pkg/sink/kafka/admin.go b/pkg/sink/kafka/admin.go index fbadd82a1dd..336031c5dc0 100644 --- a/pkg/sink/kafka/admin.go +++ b/pkg/sink/kafka/admin.go @@ -181,5 +181,9 @@ func (a *saramaAdminClient) Close() { zap.String("namespace", a.changefeed.Namespace), zap.String("changefeed", a.changefeed.ID), zap.Error(err)) + return } + log.Info("sarama admin client closed", + zap.String("namespace", a.changefeed.Namespace), + zap.String("changefeed", a.changefeed.ID)) } From 7806863e03541c3aad1236f921a9591e917a8e26 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 20 Sep 2023 15:57:03 +0800 Subject: [PATCH 2/6] remove background refresh topics metadata. --- cdc/sink/ddlsink/mq/mq_ddl_sink.go | 3 - cdc/sink/dmlsink/mq/manager/kafka_manager.go | 87 +------------------ .../dmlsink/mq/manager/kafka_manager_test.go | 17 ++-- cdc/sink/dmlsink/mq/manager/manager.go | 2 - cdc/sink/dmlsink/mq/manager/pulsar_manager.go | 4 - cdc/sink/dmlsink/mq/mq_dml_sink.go | 6 -- cdc/sink/util/helper.go | 4 +- 7 files changed, 12 insertions(+), 111 deletions(-) diff --git a/cdc/sink/ddlsink/mq/mq_ddl_sink.go b/cdc/sink/ddlsink/mq/mq_ddl_sink.go index d8167212b8a..93af9b4c96e 100644 --- a/cdc/sink/ddlsink/mq/mq_ddl_sink.go +++ b/cdc/sink/ddlsink/mq/mq_ddl_sink.go @@ -190,9 +190,6 @@ func (k *DDLSink) Close() { if k.producer != nil { k.producer.Close() } - if k.topicManager != nil { - k.topicManager.Close() - } if k.admin != nil { k.admin.Close() } diff --git a/cdc/sink/dmlsink/mq/manager/kafka_manager.go b/cdc/sink/dmlsink/mq/manager/kafka_manager.go index 4727e0e684b..552dcb5533b 100644 --- a/cdc/sink/dmlsink/mq/manager/kafka_manager.go +++ b/cdc/sink/dmlsink/mq/manager/kafka_manager.go @@ -29,14 +29,6 @@ import ( "go.uber.org/zap" ) -const ( - // metaRefreshInterval is the interval of refreshing metadata. - // We can't get the metadata too frequently, because it may cause - // the kafka cluster to be overloaded. Especially when there are - // many topics in the cluster or there are many TiCDC changefeeds. - metaRefreshInterval = 10 * time.Minute -) - // kafkaTopicManager is a manager for kafka topics. type kafkaTopicManager struct { changefeedID model.ChangeFeedID @@ -47,33 +39,22 @@ type kafkaTopicManager struct { cfg *kafka.AutoCreateTopicConfig topics sync.Map - - metaRefreshTicker *time.Ticker - - // cancel is used to cancel the background goroutine. - cancel context.CancelFunc } // NewKafkaTopicManager creates a new topic manager. func NewKafkaTopicManager( - ctx context.Context, changefeedID model.ChangeFeedID, admin kafka.ClusterAdminClient, cfg *kafka.AutoCreateTopicConfig, role util.Role, ) *kafkaTopicManager { mgr := &kafkaTopicManager{ - changefeedID: changefeedID, - role: role, - admin: admin, - cfg: cfg, - metaRefreshTicker: time.NewTicker(metaRefreshInterval), + changefeedID: changefeedID, + role: role, + admin: admin, + cfg: cfg, } - ctx, mgr.cancel = context.WithCancel(ctx) - // Background refresh metadata. - go mgr.backgroundRefreshMeta(ctx) - return mgr } @@ -96,26 +77,6 @@ func (m *kafkaTopicManager) GetPartitionNum( return partitionNum, nil } -func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) { - for { - select { - case <-ctx.Done(): - log.Info("Background refresh Kafka metadata goroutine exit.", - zap.String("namespace", m.changefeedID.Namespace), - zap.String("changefeed", m.changefeedID.ID), - ) - return - case <-m.metaRefreshTicker.C: - // We ignore the error here, because the error may be caused by the - // network problem, and we can try to get the metadata next time. - topicPartitionNums, _ := m.fetchAllTopicsPartitionsNum(ctx) - for topic, partitionNum := range topicPartitionNums { - m.tryUpdatePartitionsAndLogging(topic, partitionNum) - } - } - } -} - // tryUpdatePartitionsAndLogging try to update the partitions of the topic. func (m *kafkaTopicManager) tryUpdatePartitionsAndLogging(topic string, partitions int32) { oldPartitions, ok := m.topics.Load(topic) @@ -143,41 +104,6 @@ func (m *kafkaTopicManager) tryUpdatePartitionsAndLogging(topic string, partitio } } -// fetchAllTopicsPartitionsNum fetches all topics' partitions number. -// The error returned by this method could be a transient error that is fixable by the underlying logic. -// When handling this error, please be cautious. -// If you simply throw the error to the caller, it may impact the robustness of your program. -func (m *kafkaTopicManager) fetchAllTopicsPartitionsNum( - ctx context.Context, -) (map[string]int32, error) { - var topics []string - m.topics.Range(func(key, value any) bool { - topics = append(topics, key.(string)) - return true - }) - - start := time.Now() - numPartitions, err := m.admin.GetTopicsPartitionsNum(ctx, topics) - if err != nil { - log.Warn( - "Kafka admin client describe topics failed", - zap.String("namespace", m.changefeedID.Namespace), - zap.String("changefeed", m.changefeedID.ID), - zap.Error(err), - zap.Duration("duration", time.Since(start)), - ) - return nil, err - } - - log.Info( - "Kafka admin client describe topics success", - zap.String("namespace", m.changefeedID.Namespace), - zap.String("changefeed", m.changefeedID.ID), - zap.Duration("duration", time.Since(start))) - - return numPartitions, nil -} - // waitUntilTopicVisible is called after CreateTopic to make sure the topic // can be safely written to. The reason is that it may take several seconds after // CreateTopic returns success for all the brokers to become aware that the @@ -291,8 +217,3 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible( return partitionNum, nil } - -// Close exits the background goroutine. -func (m *kafkaTopicManager) Close() { - m.cancel() -} diff --git a/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go b/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go index 728230a06b2..167a7b56e77 100644 --- a/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go +++ b/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go @@ -35,8 +35,7 @@ func TestPartitions(t *testing.T) { } ctx := context.Background() - manager := NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg, util.RoleTester) - defer manager.Close() + manager := NewKafkaTopicManager(model.DefaultChangeFeedID("test"), adminClient, cfg, util.RoleTester) partitionsNum, err := manager.GetPartitionNum( ctx, @@ -57,8 +56,8 @@ func TestCreateTopic(t *testing.T) { } ctx := context.Background() - manager := NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg, util.RoleTester) - defer manager.Close() + manager := NewKafkaTopicManager(model.DefaultChangeFeedID("test"), adminClient, cfg, util.RoleTester) + partitionNum, err := manager.CreateTopicAndWaitUntilVisible(ctx, kafka.DefaultMockTopicName) require.Nil(t, err) require.Equal(t, int32(3), partitionNum) @@ -72,8 +71,7 @@ func TestCreateTopic(t *testing.T) { // Try to create a topic without auto create. cfg.AutoCreate = false - manager = NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg, util.RoleTester) - defer manager.Close() + manager = NewKafkaTopicManager(model.DefaultChangeFeedID("test"), adminClient, cfg, util.RoleTester) _, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic2") require.Regexp( t, @@ -88,8 +86,8 @@ func TestCreateTopic(t *testing.T) { PartitionNum: 2, ReplicationFactor: 4, } - manager = NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg, util.RoleTester) - defer manager.Close() + + manager = NewKafkaTopicManager(model.DefaultChangeFeedID("test"), adminClient, cfg, util.RoleTester) _, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic-failed") require.Regexp( t, @@ -110,8 +108,7 @@ func TestCreateTopicWithDelay(t *testing.T) { } ctx := context.Background() - manager := NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg, util.RoleTester) - defer manager.Close() + manager := NewKafkaTopicManager(model.DefaultChangeFeedID("test"), adminClient, cfg, util.RoleTester) partitionNum, err := manager.createTopic(ctx, "new_topic") require.Nil(t, err) err = adminClient.SetRemainingFetchesUntilTopicVisible("new_topic", 3) diff --git a/cdc/sink/dmlsink/mq/manager/manager.go b/cdc/sink/dmlsink/mq/manager/manager.go index a932d88f187..90a4afd31c0 100644 --- a/cdc/sink/dmlsink/mq/manager/manager.go +++ b/cdc/sink/dmlsink/mq/manager/manager.go @@ -23,6 +23,4 @@ type TopicManager interface { GetPartitionNum(ctx context.Context, topic string) (int32, error) // CreateTopicAndWaitUntilVisible creates the topic and wait for the topic completion. CreateTopicAndWaitUntilVisible(ctx context.Context, topicName string) (int32, error) - // Close closes the topic manager. - Close() } diff --git a/cdc/sink/dmlsink/mq/manager/pulsar_manager.go b/cdc/sink/dmlsink/mq/manager/pulsar_manager.go index 9bc686e4a46..9e335df8d30 100644 --- a/cdc/sink/dmlsink/mq/manager/pulsar_manager.go +++ b/cdc/sink/dmlsink/mq/manager/pulsar_manager.go @@ -60,10 +60,6 @@ func (m *pulsarTopicManager) CreateTopicAndWaitUntilVisible(ctx context.Context, return 0, nil } -// Close -func (m *pulsarTopicManager) Close() { -} - // str2Pointer returns the pointer of the string. func str2Pointer(str string) *string { return &str diff --git a/cdc/sink/dmlsink/mq/mq_dml_sink.go b/cdc/sink/dmlsink/mq/mq_dml_sink.go index b75cd0eefed..4d899278021 100644 --- a/cdc/sink/dmlsink/mq/mq_dml_sink.go +++ b/cdc/sink/dmlsink/mq/mq_dml_sink.go @@ -174,12 +174,6 @@ func (s *dmlSink) Close() { } s.wg.Wait() - s.alive.RLock() - if s.alive.topicManager != nil { - s.alive.topicManager.Close() - } - s.alive.RUnlock() - if s.adminClient != nil { s.adminClient.Close() } diff --git a/cdc/sink/util/helper.go b/cdc/sink/util/helper.go index e784651d028..fd27167f43b 100644 --- a/cdc/sink/util/helper.go +++ b/cdc/sink/util/helper.go @@ -98,9 +98,7 @@ func GetTopicManagerAndTryCreateTopic( adminClient kafka.ClusterAdminClient, role util.Role, ) (manager.TopicManager, error) { - topicManager := manager.NewKafkaTopicManager( - ctx, changefeedID, adminClient, topicCfg, role, - ) + topicManager := manager.NewKafkaTopicManager(changefeedID, adminClient, topicCfg, role) if _, err := topicManager.CreateTopicAndWaitUntilVisible(ctx, topic); err != nil { return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err) From 9f67ce3bdf680c43d3250f8b6ec3ee67ac59d61f Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 20 Sep 2023 16:09:33 +0800 Subject: [PATCH 3/6] refactor the topic manager. --- cdc/sink/dmlsink/mq/manager/kafka_manager.go | 29 +++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/cdc/sink/dmlsink/mq/manager/kafka_manager.go b/cdc/sink/dmlsink/mq/manager/kafka_manager.go index 552dcb5533b..037192417a1 100644 --- a/cdc/sink/dmlsink/mq/manager/kafka_manager.go +++ b/cdc/sink/dmlsink/mq/manager/kafka_manager.go @@ -184,8 +184,6 @@ func (m *kafkaTopicManager) createTopic( zap.Int16("replicationFactor", m.cfg.ReplicationFactor), zap.Duration("duration", time.Since(start)), ) - m.tryUpdatePartitionsAndLogging(topicName, m.cfg.PartitionNum) - return m.cfg.PartitionNum, nil } @@ -193,6 +191,10 @@ func (m *kafkaTopicManager) createTopic( func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible( ctx context.Context, topicName string, ) (int32, error) { + var ( + partitionNum int32 + err error + ) // If the topic is not in the cache, we try to get the metadata of the topic. // ignoreTopicErr is set to true to ignore the error if the topic is not found, // which means we should create the topic later. @@ -200,20 +202,21 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible( if err != nil { return 0, errors.Trace(err) } - if detail, ok := topicDetails[topicName]; ok { - m.tryUpdatePartitionsAndLogging(topicName, detail.NumPartitions) - return detail.NumPartitions, nil - } - partitionNum, err := m.createTopic(ctx, topicName) - if err != nil { - return 0, errors.Trace(err) - } + details, ok := topicDetails[topicName] + partitionNum = details.NumPartitions + if !ok { + partitionNum, err = m.createTopic(ctx, topicName) + if err != nil { + return 0, errors.Trace(err) + } - err = m.waitUntilTopicVisible(ctx, topicName) - if err != nil { - return 0, errors.Trace(err) + err = m.waitUntilTopicVisible(ctx, topicName) + if err != nil { + return 0, errors.Trace(err) + } } + m.tryUpdatePartitionsAndLogging(topicName, partitionNum) return partitionNum, nil } From deb93661fae113e9e7ed900059ab91410d3b2fec Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 25 Oct 2023 17:00:17 +0800 Subject: [PATCH 4/6] fix partition number incorrect. --- cdc/sink/dmlsink/mq/manager/kafka_manager.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cdc/sink/dmlsink/mq/manager/kafka_manager.go b/cdc/sink/dmlsink/mq/manager/kafka_manager.go index 037192417a1..d06e1aab941 100644 --- a/cdc/sink/dmlsink/mq/manager/kafka_manager.go +++ b/cdc/sink/dmlsink/mq/manager/kafka_manager.go @@ -48,14 +48,12 @@ func NewKafkaTopicManager( cfg *kafka.AutoCreateTopicConfig, role util.Role, ) *kafkaTopicManager { - mgr := &kafkaTopicManager{ + return &kafkaTopicManager{ changefeedID: changefeedID, role: role, admin: admin, cfg: cfg, } - - return mgr } // GetPartitionNum returns the number of partitions of the topic. @@ -217,6 +215,7 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible( } } - m.tryUpdatePartitionsAndLogging(topicName, partitionNum) + // store the partition number specified in the sink-uri which is adjusted. + m.tryUpdatePartitionsAndLogging(topicName, m.cfg.PartitionNum) return partitionNum, nil } From e1c91cc1930565004dc18648fb5c6b12e426a6d0 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 25 Oct 2023 17:15:49 +0800 Subject: [PATCH 5/6] remove role from the kafka topic manager. --- cdc/sink/ddlsink/mq/kafka_ddl_sink.go | 1 - cdc/sink/dmlsink/mq/kafka_dml_sink.go | 1 - cdc/sink/dmlsink/mq/manager/kafka_manager.go | 7 +------ cdc/sink/util/helper.go | 4 +--- 4 files changed, 2 insertions(+), 11 deletions(-) diff --git a/cdc/sink/ddlsink/mq/kafka_ddl_sink.go b/cdc/sink/ddlsink/mq/kafka_ddl_sink.go index 70041e85f2e..f77571db39d 100644 --- a/cdc/sink/ddlsink/mq/kafka_ddl_sink.go +++ b/cdc/sink/ddlsink/mq/kafka_ddl_sink.go @@ -83,7 +83,6 @@ func NewKafkaDDLSink( topic, options.DeriveTopicConfig(), adminClient, - tiflowutil.RoleOwner, ) if err != nil { return nil, errors.Trace(err) diff --git a/cdc/sink/dmlsink/mq/kafka_dml_sink.go b/cdc/sink/dmlsink/mq/kafka_dml_sink.go index 646f5eede62..c4c6bfc0f39 100644 --- a/cdc/sink/dmlsink/mq/kafka_dml_sink.go +++ b/cdc/sink/dmlsink/mq/kafka_dml_sink.go @@ -86,7 +86,6 @@ func NewKafkaDMLSink( topic, options.DeriveTopicConfig(), adminClient, - tiflowutil.RoleProcessor, ) if err != nil { return nil, errors.Trace(err) diff --git a/cdc/sink/dmlsink/mq/manager/kafka_manager.go b/cdc/sink/dmlsink/mq/manager/kafka_manager.go index d06e1aab941..b0ff0c5b2fa 100644 --- a/cdc/sink/dmlsink/mq/manager/kafka_manager.go +++ b/cdc/sink/dmlsink/mq/manager/kafka_manager.go @@ -25,16 +25,13 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/sink/kafka" - "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) // kafkaTopicManager is a manager for kafka topics. type kafkaTopicManager struct { changefeedID model.ChangeFeedID - role util.Role - - admin kafka.ClusterAdminClient + admin kafka.ClusterAdminClient cfg *kafka.AutoCreateTopicConfig @@ -46,11 +43,9 @@ func NewKafkaTopicManager( changefeedID model.ChangeFeedID, admin kafka.ClusterAdminClient, cfg *kafka.AutoCreateTopicConfig, - role util.Role, ) *kafkaTopicManager { return &kafkaTopicManager{ changefeedID: changefeedID, - role: role, admin: admin, cfg: cfg, } diff --git a/cdc/sink/util/helper.go b/cdc/sink/util/helper.go index fd27167f43b..35778157e32 100644 --- a/cdc/sink/util/helper.go +++ b/cdc/sink/util/helper.go @@ -24,7 +24,6 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/sink/kafka" - "github.com/pingcap/tiflow/pkg/util" ) // GetTopic returns the topic name from the sink URI. @@ -96,9 +95,8 @@ func GetTopicManagerAndTryCreateTopic( topic string, topicCfg *kafka.AutoCreateTopicConfig, adminClient kafka.ClusterAdminClient, - role util.Role, ) (manager.TopicManager, error) { - topicManager := manager.NewKafkaTopicManager(changefeedID, adminClient, topicCfg, role) + topicManager := manager.NewKafkaTopicManager(changefeedID, adminClient, topicCfg) if _, err := topicManager.CreateTopicAndWaitUntilVisible(ctx, topic); err != nil { return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err) From 0cbf92572089e3ac1a0f564952b0826cc277350f Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 25 Oct 2023 17:32:48 +0800 Subject: [PATCH 6/6] fix unit test. --- cdc/sink/dmlsink/mq/manager/kafka_manager_test.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go b/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go index 167a7b56e77..0fe5b98594d 100644 --- a/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go +++ b/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go @@ -19,7 +19,6 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/sink/kafka" - "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" ) @@ -35,7 +34,7 @@ func TestPartitions(t *testing.T) { } ctx := context.Background() - manager := NewKafkaTopicManager(model.DefaultChangeFeedID("test"), adminClient, cfg, util.RoleTester) + manager := NewKafkaTopicManager(model.DefaultChangeFeedID("test"), adminClient, cfg) partitionsNum, err := manager.GetPartitionNum( ctx, @@ -56,7 +55,7 @@ func TestCreateTopic(t *testing.T) { } ctx := context.Background() - manager := NewKafkaTopicManager(model.DefaultChangeFeedID("test"), adminClient, cfg, util.RoleTester) + manager := NewKafkaTopicManager(model.DefaultChangeFeedID("test"), adminClient, cfg) partitionNum, err := manager.CreateTopicAndWaitUntilVisible(ctx, kafka.DefaultMockTopicName) require.Nil(t, err) @@ -71,7 +70,7 @@ func TestCreateTopic(t *testing.T) { // Try to create a topic without auto create. cfg.AutoCreate = false - manager = NewKafkaTopicManager(model.DefaultChangeFeedID("test"), adminClient, cfg, util.RoleTester) + manager = NewKafkaTopicManager(model.DefaultChangeFeedID("test"), adminClient, cfg) _, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic2") require.Regexp( t, @@ -87,7 +86,7 @@ func TestCreateTopic(t *testing.T) { ReplicationFactor: 4, } - manager = NewKafkaTopicManager(model.DefaultChangeFeedID("test"), adminClient, cfg, util.RoleTester) + manager = NewKafkaTopicManager(model.DefaultChangeFeedID("test"), adminClient, cfg) _, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic-failed") require.Regexp( t, @@ -108,7 +107,7 @@ func TestCreateTopicWithDelay(t *testing.T) { } ctx := context.Background() - manager := NewKafkaTopicManager(model.DefaultChangeFeedID("test"), adminClient, cfg, util.RoleTester) + manager := NewKafkaTopicManager(model.DefaultChangeFeedID("test"), adminClient, cfg) partitionNum, err := manager.createTopic(ctx, "new_topic") require.Nil(t, err) err = adminClient.SetRemainingFetchesUntilTopicVisible("new_topic", 3)