diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 7e07f051ba4..bf92f812f2f 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -314,6 +314,12 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er } } +func (m *SinkManager) needsStuckCheck() bool { + m.sinkFactory.Lock() + defer m.sinkFactory.Unlock() + return m.sinkFactory.f != nil && m.sinkFactory.f.Category() == factory.CategoryMQ +} + func (m *SinkManager) initSinkFactory() (chan error, uint64) { m.sinkFactory.Lock() defer m.sinkFactory.Unlock() @@ -989,15 +995,17 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) TableStats { } stuckCheck := time.Duration(advanceTimeoutInSec) * time.Second - isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck) - if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) { - log.Warn("Table checkpoint is stuck too long, will restart the sink backend", - zap.String("namespace", m.changefeedID.Namespace), - zap.String("changefeed", m.changefeedID.ID), - zap.Int64("tableID", tableID), - zap.Any("checkpointTs", checkpointTs), - zap.Float64("stuckCheck", stuckCheck.Seconds()), - zap.Uint64("factoryVersion", version)) + if m.needsStuckCheck() { + isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck) + if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) { + log.Warn("Table checkpoint is stuck too long, will restart the sink backend", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Int64("tableID", tableID), + zap.Any("checkpointTs", checkpointTs), + zap.Float64("stuckCheck", stuckCheck.Seconds()), + zap.Uint64("factoryVersion", sinkVersion)) + } } var resolvedTs model.Ts diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 63d79185c2f..d33fcb5aeae 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -365,3 +365,18 @@ func TestSinkManagerRunWithErrors(t *testing.T) { log.Panic("must get an error instead of a timeout") } } + +func TestSinkManagerNeedsStuckCheck(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error, 16) + changefeedInfo := getChangefeedInfo() + manager, _ := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, errCh) + defer func() { + cancel() + manager.Close() + }() + + require.False(t, manager.needsStuckCheck()) +} diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 7dc99436a4f..c1372d27345 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -33,7 +33,7 @@ import ( "go.uber.org/zap" ) -var version uint64 = 0 +var tableSinkWrapperVersion uint64 = 0 // tableSinkWrapper is a wrapper of TableSink, it is used in SinkManager to manage TableSink. // Because in the SinkManager, we write data to TableSink and RedoManager concurrently, @@ -110,7 +110,7 @@ func newTableSinkWrapper( genReplicateTs func(ctx context.Context) (model.Ts, error), ) *tableSinkWrapper { res := &tableSinkWrapper{ - version: atomic.AddUint64(&version, 1), + version: atomic.AddUint64(&tableSinkWrapperVersion, 1), changefeed: changefeed, tableID: tableID, tableSinkCreater: tableSinkCreater, diff --git a/cdc/sinkv2/eventsink/factory/factory.go b/cdc/sinkv2/eventsink/factory/factory.go index 06f3d49baf1..b91fa6fda7a 100644 --- a/cdc/sinkv2/eventsink/factory/factory.go +++ b/cdc/sinkv2/eventsink/factory/factory.go @@ -32,6 +32,20 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// Category is for different DML sink categories. +type Category = int + +const ( + // CategoryTxn is for Txn sink. + CategoryTxn Category = 1 + // CategoryMQ is for MQ sink. + CategoryMQ = 2 + // CategoryCloudStorage is for CloudStorage sink. + CategoryCloudStorage = 3 + // CategoryBlackhole is for Blackhole sink. + CategoryBlackhole = 4 +) + // SinkFactory is the factory of sink. // It is responsible for creating sink and closing it. // Because there is no way to convert the eventsink.EventSink[*model.RowChangedEvent] @@ -41,6 +55,7 @@ type SinkFactory struct { sinkType sink.Type rowSink eventsink.EventSink[*model.RowChangedEvent] txnSink eventsink.EventSink[*model.SingleTableTxn] + category Category } // New creates a new SinkFactory by schema. @@ -64,6 +79,7 @@ func New(ctx context.Context, } s.txnSink = txnSink s.sinkType = sink.TxnSink + s.category = CategoryTxn case sink.KafkaScheme, sink.KafkaSSLScheme: mqs, err := mq.NewKafkaDMLSink(ctx, sinkURI, cfg, errCh, kafka.NewSaramaAdminClient, dmlproducer.NewKafkaDMLProducer) @@ -72,6 +88,7 @@ func New(ctx context.Context, } s.txnSink = mqs s.sinkType = sink.TxnSink + s.category = CategoryMQ case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme: storageSink, err := cloudstorage.NewCloudStorageSink(ctx, sinkURI, cfg, errCh) if err != nil { @@ -79,10 +96,12 @@ func New(ctx context.Context, } s.txnSink = storageSink s.sinkType = sink.TxnSink + s.category = CategoryCloudStorage case sink.BlackHoleScheme: bs := blackhole.New() s.rowSink = bs s.sinkType = sink.RowSink + s.category = CategoryBlackhole default: return nil, cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", schema) @@ -146,3 +165,11 @@ func (s *SinkFactory) Close() { panic("unknown sink type") } } + +// Category returns category of s. +func (s *SinkFactory) Category() Category { + if s.category == 0 { + panic("should never happen") + } + return s.category +} diff --git a/tests/integration_tests/hang_sink_suicide/run.sh b/tests/integration_tests/hang_sink_suicide/run.sh index 3489df74e05..e4e663cb975 100644 --- a/tests/integration_tests/hang_sink_suicide/run.sh +++ b/tests/integration_tests/hang_sink_suicide/run.sh @@ -41,6 +41,7 @@ function run() { } trap stop_tidb_cluster EXIT -run $* -check_logs $WORK_DIR +# TODO: update the case to use kafka sink instead of mysql sink. +# run $* +# check_logs $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"