diff --git a/cdc/sink/ddlsink/mq/mq_ddl_sink.go b/cdc/sink/ddlsink/mq/mq_ddl_sink.go index d8167212b8a..93af9b4c96e 100644 --- a/cdc/sink/ddlsink/mq/mq_ddl_sink.go +++ b/cdc/sink/ddlsink/mq/mq_ddl_sink.go @@ -190,9 +190,6 @@ func (k *DDLSink) Close() { if k.producer != nil { k.producer.Close() } - if k.topicManager != nil { - k.topicManager.Close() - } if k.admin != nil { k.admin.Close() } diff --git a/cdc/sink/dmlsink/mq/manager/kafka_manager.go b/cdc/sink/dmlsink/mq/manager/kafka_manager.go index ea8c3991d3b..b0ff0c5b2fa 100644 --- a/cdc/sink/dmlsink/mq/manager/kafka_manager.go +++ b/cdc/sink/dmlsink/mq/manager/kafka_manager.go @@ -28,49 +28,27 @@ import ( "go.uber.org/zap" ) -const ( - // metaRefreshInterval is the interval of refreshing metadata. - // We can't get the metadata too frequently, because it may cause - // the kafka cluster to be overloaded. Especially when there are - // many topics in the cluster or there are many TiCDC changefeeds. - metaRefreshInterval = 10 * time.Minute -) - // kafkaTopicManager is a manager for kafka topics. type kafkaTopicManager struct { changefeedID model.ChangeFeedID - - admin kafka.ClusterAdminClient + admin kafka.ClusterAdminClient cfg *kafka.AutoCreateTopicConfig topics sync.Map - - metaRefreshTicker *time.Ticker - - // cancel is used to cancel the background goroutine. - cancel context.CancelFunc } // NewKafkaTopicManager creates a new topic manager. func NewKafkaTopicManager( - ctx context.Context, changefeedID model.ChangeFeedID, admin kafka.ClusterAdminClient, cfg *kafka.AutoCreateTopicConfig, ) *kafkaTopicManager { - mgr := &kafkaTopicManager{ - changefeedID: changefeedID, - admin: admin, - cfg: cfg, - metaRefreshTicker: time.NewTicker(metaRefreshInterval), + return &kafkaTopicManager{ + changefeedID: changefeedID, + admin: admin, + cfg: cfg, } - - ctx, mgr.cancel = context.WithCancel(ctx) - // Background refresh metadata. - go mgr.backgroundRefreshMeta(ctx) - - return mgr } // GetPartitionNum returns the number of partitions of the topic. @@ -92,26 +70,6 @@ func (m *kafkaTopicManager) GetPartitionNum( return partitionNum, nil } -func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) { - for { - select { - case <-ctx.Done(): - log.Info("Background refresh Kafka metadata goroutine exit.", - zap.String("namespace", m.changefeedID.Namespace), - zap.String("changefeed", m.changefeedID.ID), - ) - return - case <-m.metaRefreshTicker.C: - // We ignore the error here, because the error may be caused by the - // network problem, and we can try to get the metadata next time. - topicPartitionNums, _ := m.fetchAllTopicsPartitionsNum(ctx) - for topic, partitionNum := range topicPartitionNums { - m.tryUpdatePartitionsAndLogging(topic, partitionNum) - } - } - } -} - // tryUpdatePartitionsAndLogging try to update the partitions of the topic. func (m *kafkaTopicManager) tryUpdatePartitionsAndLogging(topic string, partitions int32) { oldPartitions, ok := m.topics.Load(topic) @@ -139,41 +97,6 @@ func (m *kafkaTopicManager) tryUpdatePartitionsAndLogging(topic string, partitio } } -// fetchAllTopicsPartitionsNum fetches all topics' partitions number. -// The error returned by this method could be a transient error that is fixable by the underlying logic. -// When handling this error, please be cautious. -// If you simply throw the error to the caller, it may impact the robustness of your program. -func (m *kafkaTopicManager) fetchAllTopicsPartitionsNum( - ctx context.Context, -) (map[string]int32, error) { - var topics []string - m.topics.Range(func(key, value any) bool { - topics = append(topics, key.(string)) - return true - }) - - start := time.Now() - numPartitions, err := m.admin.GetTopicsPartitionsNum(ctx, topics) - if err != nil { - log.Warn( - "Kafka admin client describe topics failed", - zap.String("namespace", m.changefeedID.Namespace), - zap.String("changefeed", m.changefeedID.ID), - zap.Error(err), - zap.Duration("duration", time.Since(start)), - ) - return nil, err - } - - log.Info( - "Kafka admin client describe topics success", - zap.String("namespace", m.changefeedID.Namespace), - zap.String("changefeed", m.changefeedID.ID), - zap.Duration("duration", time.Since(start))) - - return numPartitions, nil -} - // waitUntilTopicVisible is called after CreateTopic to make sure the topic // can be safely written to. The reason is that it may take several seconds after // CreateTopic returns success for all the brokers to become aware that the @@ -254,8 +177,6 @@ func (m *kafkaTopicManager) createTopic( zap.Int16("replicationFactor", m.cfg.ReplicationFactor), zap.Duration("duration", time.Since(start)), ) - m.tryUpdatePartitionsAndLogging(topicName, m.cfg.PartitionNum) - return m.cfg.PartitionNum, nil } @@ -263,6 +184,10 @@ func (m *kafkaTopicManager) createTopic( func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible( ctx context.Context, topicName string, ) (int32, error) { + var ( + partitionNum int32 + err error + ) // If the topic is not in the cache, we try to get the metadata of the topic. // ignoreTopicErr is set to true to ignore the error if the topic is not found, // which means we should create the topic later. @@ -270,25 +195,22 @@ func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible( if err != nil { return 0, errors.Trace(err) } - if detail, ok := topicDetails[topicName]; ok { - m.tryUpdatePartitionsAndLogging(topicName, detail.NumPartitions) - return detail.NumPartitions, nil - } - partitionNum, err := m.createTopic(ctx, topicName) - if err != nil { - return 0, errors.Trace(err) - } + details, ok := topicDetails[topicName] + partitionNum = details.NumPartitions + if !ok { + partitionNum, err = m.createTopic(ctx, topicName) + if err != nil { + return 0, errors.Trace(err) + } - err = m.waitUntilTopicVisible(ctx, topicName) - if err != nil { - return 0, errors.Trace(err) + err = m.waitUntilTopicVisible(ctx, topicName) + if err != nil { + return 0, errors.Trace(err) + } } + // store the partition number specified in the sink-uri which is adjusted. + m.tryUpdatePartitionsAndLogging(topicName, m.cfg.PartitionNum) return partitionNum, nil } - -// Close exits the background goroutine. -func (m *kafkaTopicManager) Close() { - m.cancel() -} diff --git a/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go b/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go index 837ca1139d2..0fe5b98594d 100644 --- a/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go +++ b/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go @@ -34,8 +34,7 @@ func TestPartitions(t *testing.T) { } ctx := context.Background() - manager := NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg) - defer manager.Close() + manager := NewKafkaTopicManager(model.DefaultChangeFeedID("test"), adminClient, cfg) partitionsNum, err := manager.GetPartitionNum( ctx, @@ -56,8 +55,8 @@ func TestCreateTopic(t *testing.T) { } ctx := context.Background() - manager := NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg) - defer manager.Close() + manager := NewKafkaTopicManager(model.DefaultChangeFeedID("test"), adminClient, cfg) + partitionNum, err := manager.CreateTopicAndWaitUntilVisible(ctx, kafka.DefaultMockTopicName) require.Nil(t, err) require.Equal(t, int32(3), partitionNum) @@ -71,8 +70,7 @@ func TestCreateTopic(t *testing.T) { // Try to create a topic without auto create. cfg.AutoCreate = false - manager = NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg) - defer manager.Close() + manager = NewKafkaTopicManager(model.DefaultChangeFeedID("test"), adminClient, cfg) _, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic2") require.Regexp( t, @@ -87,8 +85,8 @@ func TestCreateTopic(t *testing.T) { PartitionNum: 2, ReplicationFactor: 4, } - manager = NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg) - defer manager.Close() + + manager = NewKafkaTopicManager(model.DefaultChangeFeedID("test"), adminClient, cfg) _, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic-failed") require.Regexp( t, @@ -109,8 +107,7 @@ func TestCreateTopicWithDelay(t *testing.T) { } ctx := context.Background() - manager := NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg) - defer manager.Close() + manager := NewKafkaTopicManager(model.DefaultChangeFeedID("test"), adminClient, cfg) partitionNum, err := manager.createTopic(ctx, "new_topic") require.Nil(t, err) err = adminClient.SetRemainingFetchesUntilTopicVisible("new_topic", 3) diff --git a/cdc/sink/dmlsink/mq/manager/manager.go b/cdc/sink/dmlsink/mq/manager/manager.go index a932d88f187..90a4afd31c0 100644 --- a/cdc/sink/dmlsink/mq/manager/manager.go +++ b/cdc/sink/dmlsink/mq/manager/manager.go @@ -23,6 +23,4 @@ type TopicManager interface { GetPartitionNum(ctx context.Context, topic string) (int32, error) // CreateTopicAndWaitUntilVisible creates the topic and wait for the topic completion. CreateTopicAndWaitUntilVisible(ctx context.Context, topicName string) (int32, error) - // Close closes the topic manager. - Close() } diff --git a/cdc/sink/dmlsink/mq/manager/pulsar_manager.go b/cdc/sink/dmlsink/mq/manager/pulsar_manager.go index 9bc686e4a46..9e335df8d30 100644 --- a/cdc/sink/dmlsink/mq/manager/pulsar_manager.go +++ b/cdc/sink/dmlsink/mq/manager/pulsar_manager.go @@ -60,10 +60,6 @@ func (m *pulsarTopicManager) CreateTopicAndWaitUntilVisible(ctx context.Context, return 0, nil } -// Close -func (m *pulsarTopicManager) Close() { -} - // str2Pointer returns the pointer of the string. func str2Pointer(str string) *string { return &str diff --git a/cdc/sink/dmlsink/mq/mq_dml_sink.go b/cdc/sink/dmlsink/mq/mq_dml_sink.go index b75cd0eefed..4d899278021 100644 --- a/cdc/sink/dmlsink/mq/mq_dml_sink.go +++ b/cdc/sink/dmlsink/mq/mq_dml_sink.go @@ -174,12 +174,6 @@ func (s *dmlSink) Close() { } s.wg.Wait() - s.alive.RLock() - if s.alive.topicManager != nil { - s.alive.topicManager.Close() - } - s.alive.RUnlock() - if s.adminClient != nil { s.adminClient.Close() } diff --git a/cdc/sink/util/helper.go b/cdc/sink/util/helper.go index 4ff6db3e6d0..35778157e32 100644 --- a/cdc/sink/util/helper.go +++ b/cdc/sink/util/helper.go @@ -96,9 +96,7 @@ func GetTopicManagerAndTryCreateTopic( topicCfg *kafka.AutoCreateTopicConfig, adminClient kafka.ClusterAdminClient, ) (manager.TopicManager, error) { - topicManager := manager.NewKafkaTopicManager( - ctx, changefeedID, adminClient, topicCfg, - ) + topicManager := manager.NewKafkaTopicManager(changefeedID, adminClient, topicCfg) if _, err := topicManager.CreateTopicAndWaitUntilVisible(ctx, topic); err != nil { return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err) diff --git a/pkg/sink/kafka/admin.go b/pkg/sink/kafka/admin.go index fbadd82a1dd..336031c5dc0 100644 --- a/pkg/sink/kafka/admin.go +++ b/pkg/sink/kafka/admin.go @@ -181,5 +181,9 @@ func (a *saramaAdminClient) Close() { zap.String("namespace", a.changefeed.Namespace), zap.String("changefeed", a.changefeed.ID), zap.Error(err)) + return } + log.Info("sarama admin client closed", + zap.String("namespace", a.changefeed.Namespace), + zap.String("changefeed", a.changefeed.ID)) }