Skip to content

Commit

Permalink
sink(cdc): only check sink stuck for MQ sinks (#9742) (#9747)
Browse files Browse the repository at this point in the history
close #9736
  • Loading branch information
ti-chi-bot authored Sep 14, 2023
1 parent 41c8fdf commit 11b023a
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 13 deletions.
26 changes: 17 additions & 9 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,12 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er
}
}

func (m *SinkManager) needsStuckCheck() bool {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
return m.sinkFactory.f != nil && m.sinkFactory.f.Category() == factory.CategoryMQ
}

func (m *SinkManager) initSinkFactory() (chan error, uint64) {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
Expand Down Expand Up @@ -989,15 +995,17 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) TableStats {
}
stuckCheck := time.Duration(advanceTimeoutInSec) * time.Second

isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck)
if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) {
log.Warn("Table checkpoint is stuck too long, will restart the sink backend",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Any("checkpointTs", checkpointTs),
zap.Float64("stuckCheck", stuckCheck.Seconds()),
zap.Uint64("factoryVersion", version))
if m.needsStuckCheck() {
isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck)
if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) {
log.Warn("Table checkpoint is stuck too long, will restart the sink backend",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Any("checkpointTs", checkpointTs),
zap.Float64("stuckCheck", stuckCheck.Seconds()),
zap.Uint64("factoryVersion", sinkVersion))
}
}

var resolvedTs model.Ts
Expand Down
15 changes: 15 additions & 0 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,3 +365,18 @@ func TestSinkManagerRunWithErrors(t *testing.T) {
log.Panic("must get an error instead of a timeout")
}
}

func TestSinkManagerNeedsStuckCheck(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error, 16)
changefeedInfo := getChangefeedInfo()
manager, _ := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, errCh)
defer func() {
cancel()
manager.Close()
}()

require.False(t, manager.needsStuckCheck())
}
4 changes: 2 additions & 2 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"go.uber.org/zap"
)

var version uint64 = 0
var tableSinkWrapperVersion uint64 = 0

// tableSinkWrapper is a wrapper of TableSink, it is used in SinkManager to manage TableSink.
// Because in the SinkManager, we write data to TableSink and RedoManager concurrently,
Expand Down Expand Up @@ -110,7 +110,7 @@ func newTableSinkWrapper(
genReplicateTs func(ctx context.Context) (model.Ts, error),
) *tableSinkWrapper {
res := &tableSinkWrapper{
version: atomic.AddUint64(&version, 1),
version: atomic.AddUint64(&tableSinkWrapperVersion, 1),
changefeed: changefeed,
tableID: tableID,
tableSinkCreater: tableSinkCreater,
Expand Down
27 changes: 27 additions & 0 deletions cdc/sinkv2/eventsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

// Category is for different DML sink categories.
type Category = int

const (
// CategoryTxn is for Txn sink.
CategoryTxn Category = 1
// CategoryMQ is for MQ sink.
CategoryMQ = 2
// CategoryCloudStorage is for CloudStorage sink.
CategoryCloudStorage = 3
// CategoryBlackhole is for Blackhole sink.
CategoryBlackhole = 4
)

// SinkFactory is the factory of sink.
// It is responsible for creating sink and closing it.
// Because there is no way to convert the eventsink.EventSink[*model.RowChangedEvent]
Expand All @@ -41,6 +55,7 @@ type SinkFactory struct {
sinkType sink.Type
rowSink eventsink.EventSink[*model.RowChangedEvent]
txnSink eventsink.EventSink[*model.SingleTableTxn]
category Category
}

// New creates a new SinkFactory by schema.
Expand All @@ -64,6 +79,7 @@ func New(ctx context.Context,
}
s.txnSink = txnSink
s.sinkType = sink.TxnSink
s.category = CategoryTxn
case sink.KafkaScheme, sink.KafkaSSLScheme:
mqs, err := mq.NewKafkaDMLSink(ctx, sinkURI, cfg, errCh,
kafka.NewSaramaAdminClient, dmlproducer.NewKafkaDMLProducer)
Expand All @@ -72,17 +88,20 @@ func New(ctx context.Context,
}
s.txnSink = mqs
s.sinkType = sink.TxnSink
s.category = CategoryMQ
case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme:
storageSink, err := cloudstorage.NewCloudStorageSink(ctx, sinkURI, cfg, errCh)
if err != nil {
return nil, err
}
s.txnSink = storageSink
s.sinkType = sink.TxnSink
s.category = CategoryCloudStorage
case sink.BlackHoleScheme:
bs := blackhole.New()
s.rowSink = bs
s.sinkType = sink.RowSink
s.category = CategoryBlackhole
default:
return nil,
cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", schema)
Expand Down Expand Up @@ -146,3 +165,11 @@ func (s *SinkFactory) Close() {
panic("unknown sink type")
}
}

// Category returns category of s.
func (s *SinkFactory) Category() Category {
if s.category == 0 {
panic("should never happen")
}
return s.category
}
5 changes: 3 additions & 2 deletions tests/integration_tests/hang_sink_suicide/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ function run() {
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
# TODO: update the case to use kafka sink instead of mysql sink.
# run $*
# check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"

0 comments on commit 11b023a

Please sign in to comment.