diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index e795fd81edc..3b2305b7c4b 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -351,7 +351,7 @@ func (m *SinkManager) initSinkFactory() (chan error, uint64) { return m.sinkFactory.errors, m.sinkFactory.version } - m.sinkFactory.f, err = factory.New(m.managerCtx, m.changefeedID, uri, cfg, m.sinkFactory.errors) + m.sinkFactory.f, err = factory.New(m.managerCtx, m.changefeedID, m.up.PDClock, uri, cfg, m.sinkFactory.errors) if err != nil { emitError(err) return m.sinkFactory.errors, m.sinkFactory.version diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go index 73ffa99d992..7938ff92f07 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go @@ -28,10 +28,10 @@ import ( "github.com/pingcap/tiflow/cdc/sink/metrics" "github.com/pingcap/tiflow/cdc/sink/tablesink/state" "github.com/pingcap/tiflow/cdc/sink/util" - "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/chann" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/pingcap/tiflow/pkg/sink/codec/builder" @@ -96,6 +96,7 @@ type DMLSink struct { // NewDMLSink creates a cloud storage sink. func NewDMLSink(ctx context.Context, changefeedID model.ChangeFeedID, + pdClock pdutil.Clock, sinkURI *url.URL, replicaConfig *config.ReplicaConfig, errCh chan error, @@ -157,11 +158,10 @@ func NewDMLSink(ctx context.Context, // create defragmenter. s.defragmenter = newDefragmenter(encodedCh, workerChannels) // create a group of dml workers. - clock := clock.New() for i := 0; i < cfg.WorkerCount; i++ { inputCh := chann.NewAutoDrainChann[eventFragment]() s.workers[i] = newDMLWorker(i, s.changefeedID, storage, cfg, ext, - inputCh, clock, s.statistics) + inputCh, pdClock, s.statistics) workerChannels[i] = inputCh } diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go index b2c555346f1..a462d9837e2 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go @@ -31,13 +31,14 @@ import ( "github.com/pingcap/tiflow/cdc/sink/tablesink/state" "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" ) func setClock(s *DMLSink, clock clock.Clock) { for _, w := range s.workers { - w.filePathGenerator.SetClock(clock) + w.filePathGenerator.SetClock(pdutil.NewMonotonicClock4Test(clock)) } } @@ -129,6 +130,7 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) { errCh := make(chan error, 5) s, err := NewDMLSink(ctx, model.DefaultChangeFeedID("test"), + pdutil.NewMonotonicClock4Test(clock.New()), sinkURI, replicaConfig, errCh) require.Nil(t, err) var cnt uint64 = 0 @@ -197,11 +199,12 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { replicaConfig.Sink.FileIndexWidth = util.AddressOf(6) errCh := make(chan error, 5) + mockClock := clock.NewMock() s, err := NewDMLSink(ctx, - model.DefaultChangeFeedID("test"), sinkURI, replicaConfig, errCh) + model.DefaultChangeFeedID("test"), + pdutil.NewMonotonicClock4Test(mockClock), + sinkURI, replicaConfig, errCh) require.Nil(t, err) - mockClock := clock.NewMock() - setClock(s, mockClock) var cnt uint64 = 0 batch := 100 @@ -272,12 +275,14 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { // test table is scheduled from one node to another cnt = 0 ctx, cancel = context.WithCancel(context.Background()) - s, err = NewDMLSink(ctx, - model.DefaultChangeFeedID("test"), sinkURI, replicaConfig, errCh) - require.Nil(t, err) + mockClock = clock.NewMock() mockClock.Set(time.Date(2023, 3, 9, 0, 1, 10, 0, time.UTC)) - setClock(s, mockClock) + s, err = NewDMLSink(ctx, + model.DefaultChangeFeedID("test"), + pdutil.NewMonotonicClock4Test(mockClock), + sinkURI, replicaConfig, errCh) + require.Nil(t, err) err = s.WriteEvents(txns...) require.Nil(t, err) diff --git a/cdc/sink/dmlsink/cloudstorage/dml_worker.go b/cdc/sink/dmlsink/cloudstorage/dml_worker.go index 40de8505424..7e785cd0ccf 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_worker.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_worker.go @@ -25,9 +25,9 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/metrics" mcloudstorage "github.com/pingcap/tiflow/cdc/sink/metrics/cloudstorage" - "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/chann" "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/prometheus/client_golang/prometheus" @@ -107,7 +107,7 @@ func newDMLWorker( config *cloudstorage.Config, extension string, inputCh *chann.DrainableChann[eventFragment], - clock clock.Clock, + pdClock pdutil.Clock, statistics *metrics.Statistics, ) *dmlWorker { d := &dmlWorker{ @@ -118,7 +118,7 @@ func newDMLWorker( inputCh: inputCh, flushNotifyCh: make(chan dmlTask, 64), statistics: statistics, - filePathGenerator: cloudstorage.NewFilePathGenerator(config, storage, extension, clock), + filePathGenerator: cloudstorage.NewFilePathGenerator(config, storage, extension, pdClock), metricWriteBytes: mcloudstorage.CloudStorageWriteBytesGauge. WithLabelValues(changefeedID.Namespace, changefeedID.ID), metricFileCount: mcloudstorage.CloudStorageFileCountGauge. diff --git a/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go b/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go index c0e8ee47600..e0cf0edf146 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/chann" "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/pingcap/tiflow/pkg/sink/codec/common" @@ -52,8 +53,9 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker { statistics := metrics.NewStatistics(ctx, model.DefaultChangeFeedID("dml-worker-test"), sink.TxnSink) + pdlock := pdutil.NewMonotonicClock4Test(clock.New()) d := newDMLWorker(1, model.DefaultChangeFeedID("dml-worker-test"), storage, - cfg, ".json", chann.NewAutoDrainChann[eventFragment](), clock.New(), statistics) + cfg, ".json", chann.NewAutoDrainChann[eventFragment](), pdlock, statistics) return d } diff --git a/cdc/sink/dmlsink/factory/factory.go b/cdc/sink/dmlsink/factory/factory.go index 5d520630fe9..c2d512223a2 100644 --- a/cdc/sink/dmlsink/factory/factory.go +++ b/cdc/sink/dmlsink/factory/factory.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tiflow/cdc/sink/tablesink" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/sink/kafka" v2 "github.com/pingcap/tiflow/pkg/sink/kafka/v2" @@ -67,6 +68,7 @@ type SinkFactory struct { func New( ctx context.Context, changefeedID model.ChangeFeedID, + pdClock pdutil.Clock, sinkURIStr string, cfg *config.ReplicaConfig, errCh chan error, @@ -100,7 +102,7 @@ func New( s.txnSink = mqs s.category = CategoryMQ case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme: - storageSink, err := cloudstorage.NewDMLSink(ctx, changefeedID, sinkURI, cfg, errCh) + storageSink, err := cloudstorage.NewDMLSink(ctx, changefeedID, pdClock, sinkURI, cfg, errCh) if err != nil { return nil, err } diff --git a/pkg/pdutil/clock.go b/pkg/pdutil/clock.go index fa79ec4074b..f85138beb54 100644 --- a/pkg/pdutil/clock.go +++ b/pkg/pdutil/clock.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + pclock "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/retry" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" @@ -136,3 +137,19 @@ func (c *clock4Test) Run(ctx context.Context) { func (c *clock4Test) Stop() { } + +type monotonicClock4Test struct { + Clock + pClock pclock.Clock +} + +// NewMonotonicClock4Test return a new monotonic clock for test. +func NewMonotonicClock4Test(pClock pclock.Clock) Clock { + return &monotonicClock4Test{ + pClock: pClock, + } +} + +func (m *monotonicClock4Test) CurrentTime() time.Time { + return m.pClock.Now() +} diff --git a/pkg/sink/cloudstorage/path.go b/pkg/sink/cloudstorage/path.go index d4e492d5be5..c67c0cf31d3 100644 --- a/pkg/sink/cloudstorage/path.go +++ b/pkg/sink/cloudstorage/path.go @@ -29,10 +29,10 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/hash" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/util" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" @@ -135,7 +135,7 @@ type VersionedTableName struct { type FilePathGenerator struct { extension string config *Config - clock clock.Clock + pdClock pdutil.Clock storage storage.ExternalStorage fileIndex map[VersionedTableName]*indexWithDate @@ -148,13 +148,13 @@ func NewFilePathGenerator( config *Config, storage storage.ExternalStorage, extension string, - clock clock.Clock, + pdclock pdutil.Clock, ) *FilePathGenerator { return &FilePathGenerator{ config: config, extension: extension, storage: storage, - clock: clock, + pdClock: pdclock, fileIndex: make(map[VersionedTableName]*indexWithDate), hasher: hash.NewPositionInertia(), versionMap: make(map[VersionedTableName]uint64), @@ -247,8 +247,8 @@ func (f *FilePathGenerator) CheckOrWriteSchema( } // SetClock is used for unit test -func (f *FilePathGenerator) SetClock(clock clock.Clock) { - f.clock = clock +func (f *FilePathGenerator) SetClock(pdClock pdutil.Clock) { + f.pdClock = pdClock } // GenerateDateStr generates a date string base on current time @@ -256,7 +256,7 @@ func (f *FilePathGenerator) SetClock(clock clock.Clock) { func (f *FilePathGenerator) GenerateDateStr() string { var dateStr string - currTime := f.clock.Now() + currTime := f.pdClock.CurrentTime() switch f.config.DateSeparator { case config.DateSeparatorYear.String(): dateStr = currTime.Format("2006") diff --git a/pkg/sink/cloudstorage/path_test.go b/pkg/sink/cloudstorage/path_test.go index 973840dba95..15d8a126808 100644 --- a/pkg/sink/cloudstorage/path_test.go +++ b/pkg/sink/cloudstorage/path_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" @@ -49,7 +50,7 @@ func testFilePathGenerator(ctx context.Context, t *testing.T, dir string) *FileP err = cfg.Apply(ctx, sinkURI, replicaConfig) require.NoError(t, err) - f := NewFilePathGenerator(cfg, storage, ".json", clock.New()) + f := NewFilePathGenerator(cfg, storage, ".json", pdutil.NewMonotonicClock4Test(clock.New())) return f } @@ -84,7 +85,7 @@ func TestGenerateDataFilePath(t *testing.T) { f = testFilePathGenerator(ctx, t, dir) f.versionMap[table] = table.TableInfoVersion f.config.DateSeparator = config.DateSeparatorYear.String() - f.clock = mockClock + f.SetClock(pdutil.NewMonotonicClock4Test(mockClock)) mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC)) date = f.GenerateDateStr() path, err = f.GenerateDataFilePath(ctx, table, date) @@ -108,7 +109,8 @@ func TestGenerateDataFilePath(t *testing.T) { f = testFilePathGenerator(ctx, t, dir) f.versionMap[table] = table.TableInfoVersion f.config.DateSeparator = config.DateSeparatorMonth.String() - f.clock = mockClock + f.SetClock(pdutil.NewMonotonicClock4Test(mockClock)) + mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC)) date = f.GenerateDateStr() path, err = f.GenerateDataFilePath(ctx, table, date) @@ -132,7 +134,8 @@ func TestGenerateDataFilePath(t *testing.T) { f = testFilePathGenerator(ctx, t, dir) f.versionMap[table] = table.TableInfoVersion f.config.DateSeparator = config.DateSeparatorDay.String() - f.clock = mockClock + f.SetClock(pdutil.NewMonotonicClock4Test(mockClock)) + mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC)) date = f.GenerateDateStr() path, err = f.GenerateDataFilePath(ctx, table, date) @@ -210,7 +213,8 @@ func TestGenerateDataFilePathWithIndexFile(t *testing.T) { f := testFilePathGenerator(ctx, t, dir) mockClock := clock.NewMock() f.config.DateSeparator = config.DateSeparatorDay.String() - f.clock = mockClock + f.SetClock(pdutil.NewMonotonicClock4Test(mockClock)) + mockClock.Set(time.Date(2023, 3, 9, 23, 59, 59, 0, time.UTC)) table := VersionedTableName{ TableNameWithPhysicTableID: model.TableName{