Skip to content

Commit

Permalink
sink(ticdc): use pd clock in storage sink (pingcap#10351)
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jan 24, 2024
1 parent 62d1d01 commit cff9687
Show file tree
Hide file tree
Showing 16 changed files with 105 additions and 50 deletions.
4 changes: 2 additions & 2 deletions cdc/api/v1/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func verifyCreateChangefeedConfig(
return nil, cerror.ErrAPIInvalidParam.Wrap(errors.Annotatef(err, "invalid timezone:%s", changefeedConfig.TimeZone))
}
ctx = contextutil.PutTimezoneInCtx(ctx, tz)
if err := sink.Validate(ctx, info.SinkURI, info.Config); err != nil {
if err := sink.Validate(ctx, info.SinkURI, info.Config, up.PDClock); err != nil {
return nil, err
}

Expand Down Expand Up @@ -228,7 +228,7 @@ func VerifyUpdateChangefeedConfig(ctx context.Context,
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}

if err := sink.Validate(ctx, newInfo.SinkURI, newInfo.Config); err != nil {
if err := sink.Validate(ctx, newInfo.SinkURI, newInfo.Config, nil); err != nil {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig(
}

// verify sink
if err := sink.Validate(ctx, cfg.SinkURI, replicaCfg); err != nil {
if err := sink.Validate(ctx, cfg.SinkURI, replicaCfg, nil); err != nil {
return nil, err
}

Expand Down Expand Up @@ -321,7 +321,7 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}

if err := sink.Validate(ctx, newInfo.SinkURI, newInfo.Config); err != nil {
if err := sink.Validate(ctx, newInfo.SinkURI, newInfo.Config, nil); err != nil {
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
log.Info("Try to create sinkV2")
sinkV2Factory, err := factory.New(stdCtx, p.changefeed.Info.SinkURI,
p.changefeed.Info.Config,
errCh)
errCh, p.upstream.PDClock)
if err != nil {
log.Error("processor creates sink failed",
zap.String("namespace", p.changefeedID.Namespace),
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func (m *SinkManager) initSinkFactory() (chan error, uint64) {
return m.sinkFactory.errors, m.sinkFactory.version
}

m.sinkFactory.f, err = factory.New(m.managerCtx, uri, cfg, m.sinkFactory.errors)
m.sinkFactory.f, err = factory.New(m.managerCtx, uri, cfg, m.sinkFactory.errors, m.up.PDClock)
if err != nil {
emitError(err)
return m.sinkFactory.errors, m.sinkFactory.version
Expand Down
5 changes: 3 additions & 2 deletions cdc/sink/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink/factory"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
pmysql "github.com/pingcap/tiflow/pkg/sink/mysql"
"github.com/pingcap/tiflow/pkg/util"
Expand All @@ -30,7 +31,7 @@ import (
// Validate sink if given valid parameters.
// TODO: For now, we create a real sink instance and validate it.
// Maybe we should support the dry-run mode to validate sink.
func Validate(ctx context.Context, sinkURI string, cfg *config.ReplicaConfig) error {
func Validate(ctx context.Context, sinkURI string, cfg *config.ReplicaConfig, pdClock pdutil.Clock) error {
var err error
var uri *url.URL
if uri, err = preCheckSinkURI(sinkURI); err != nil {
Expand Down Expand Up @@ -60,7 +61,7 @@ func Validate(ctx context.Context, sinkURI string, cfg *config.ReplicaConfig) er
err = s.Close(ctx)
} else {
var s *factory.SinkFactory
s, err = factory.New(ctx, sinkURI, cfg, errCh)
s, err = factory.New(ctx, sinkURI, cfg, errCh, pdClock)
if err != nil {
cancel()
return err
Expand Down
6 changes: 3 additions & 3 deletions cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
"github.com/pingcap/tiflow/cdc/sinkv2/metrics"
"github.com/pingcap/tiflow/cdc/sinkv2/tablesink/state"
"github.com/pingcap/tiflow/cdc/sinkv2/util"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
putil "github.com/pingcap/tiflow/pkg/util"
Expand Down Expand Up @@ -94,6 +94,7 @@ type dmlSink struct {
// NewCloudStorageSink creates a cloud storage sink.
func NewCloudStorageSink(
ctx context.Context,
pdClock pdutil.Clock,
sinkURI *url.URL,
replicaConfig *config.ReplicaConfig,
errCh chan error,
Expand Down Expand Up @@ -153,11 +154,10 @@ func NewCloudStorageSink(
// create defragmenter.
s.defragmenter = newDefragmenter(encodedCh, workerChannels)
// create a group of dml workers.
clock := clock.New()
for i := 0; i < cfg.WorkerCount; i++ {
inputCh := chann.NewDrainableChann[eventFragment]()
s.workers[i] = newDMLWorker(i, s.changefeedID, storage, cfg, ext,
inputCh, clock, s.statistics)
inputCh, pdClock, s.statistics)
workerChannels[i] = inputCh
}

Expand Down
16 changes: 8 additions & 8 deletions cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ import (
"github.com/pingcap/tiflow/cdc/sinkv2/tablesink/state"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/stretchr/testify/require"
)

func setClock(s *dmlSink, clock clock.Clock) {
for _, w := range s.workers {
w.filePathGenerator.SetClock(clock)
w.filePathGenerator.SetClock(pdutil.NewMonotonicClock(clock))
}
}

Expand Down Expand Up @@ -126,7 +127,7 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) {
replicaConfig.Sink.Protocol = config.ProtocolCsv.String()
replicaConfig.Sink.FileIndexWidth = 6
errCh := make(chan error, 5)
s, err := NewCloudStorageSink(ctx, sinkURI, replicaConfig, errCh)
s, err := NewCloudStorageSink(ctx, pdutil.NewMonotonicClock(clock.New()), sinkURI, replicaConfig, errCh)
require.Nil(t, err)
var cnt uint64 = 0
batch := 100
Expand Down Expand Up @@ -194,10 +195,9 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
replicaConfig.Sink.FileIndexWidth = 6

errCh := make(chan error, 5)
s, err := NewCloudStorageSink(ctx, sinkURI, replicaConfig, errCh)
require.Nil(t, err)
mockClock := clock.NewMock()
setClock(s, mockClock)
s, err := NewCloudStorageSink(ctx, pdutil.NewMonotonicClock(mockClock), sinkURI, replicaConfig, errCh)
require.Nil(t, err)

var cnt uint64 = 0
batch := 100
Expand Down Expand Up @@ -268,11 +268,11 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
// test table is scheduled from one node to another
cnt = 0
ctx, cancel = context.WithCancel(context.Background())
s, err = NewCloudStorageSink(ctx, sinkURI, replicaConfig, errCh)
require.Nil(t, err)
mockClock = clock.NewMock()

s, err = NewCloudStorageSink(ctx, pdutil.NewMonotonicClock(mockClock), sinkURI, replicaConfig, errCh)
require.Nil(t, err)
mockClock.Set(time.Date(2023, 3, 9, 0, 1, 10, 0, time.UTC))
setClock(s, mockClock)

err = s.WriteEvents(txns...)
require.Nil(t, err)
Expand Down
6 changes: 3 additions & 3 deletions cdc/sinkv2/eventsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (
"github.com/pingcap/tiflow/cdc/sink/codec/common"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics"
mcloudstorage "github.com/pingcap/tiflow/cdc/sinkv2/metrics/cloudstorage"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
Expand Down Expand Up @@ -103,7 +103,7 @@ func newDMLWorker(
config *cloudstorage.Config,
extension string,
inputCh *chann.DrainableChann[eventFragment],
clock clock.Clock,
pdClock pdutil.Clock,
statistics *metrics.Statistics,
) *dmlWorker {
d := &dmlWorker{
Expand All @@ -114,7 +114,7 @@ func newDMLWorker(
inputCh: inputCh,
flushNotifyCh: make(chan dmlTask, 64),
statistics: statistics,
filePathGenerator: cloudstorage.NewFilePathGenerator(config, storage, extension, clock),
filePathGenerator: cloudstorage.NewFilePathGenerator(changefeedID, config, storage, extension, pdClock),
metricWriteBytes: mcloudstorage.CloudStorageWriteBytesGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricFileCount: mcloudstorage.CloudStorageFileCountGauge.
Expand Down
4 changes: 3 additions & 1 deletion cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/util"
Expand All @@ -49,8 +50,9 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker {
require.Nil(t, err)

statistics := metrics.NewStatistics(ctx, sink.TxnSink)
pdlock := pdutil.NewMonotonicClock(clock.New())
d := newDMLWorker(1, model.DefaultChangeFeedID("dml-worker-test"), storage,
cfg, ".json", chann.NewDrainableChann[eventFragment](), clock.New(), statistics)
cfg, ".json", chann.NewDrainableChann[eventFragment](), pdlock, statistics)
return d
}

Expand Down
4 changes: 3 additions & 1 deletion cdc/sinkv2/eventsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tiflow/cdc/sinkv2/tablesink"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/kafka"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -63,6 +64,7 @@ func New(ctx context.Context,
sinkURIStr string,
cfg *config.ReplicaConfig,
errCh chan error,
pdClock pdutil.Clock,
) (*SinkFactory, error) {
sinkURI, err := config.GetSinkURIAndAdjustConfigWithSinkURI(sinkURIStr, cfg)
if err != nil {
Expand Down Expand Up @@ -90,7 +92,7 @@ func New(ctx context.Context,
s.sinkType = sink.TxnSink
s.category = CategoryMQ
case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme:
storageSink, err := cloudstorage.NewCloudStorageSink(ctx, sinkURI, cfg, errCh)
storageSink, err := cloudstorage.NewCloudStorageSink(ctx, pdClock, sinkURI, cfg, errCh)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions cmd/storage-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func newConsumer(ctx context.Context) (*consumer, error) {
downstreamURIStr,
config.GetDefaultReplicaConfig(),
errCh,
nil,
)
if err != nil {
log.Error("failed to create event sink factory", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion pkg/applier/redo.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (ra *RedoApplier) catchError(ctx context.Context) error {

func (ra *RedoApplier) initSink(ctx context.Context) (err error) {
replicaConfig := config.GetDefaultReplicaConfig()
ra.sinkFactory, err = dmlfactory.New(ctx, ra.cfg.SinkURI, replicaConfig, ra.errCh)
ra.sinkFactory, err = dmlfactory.New(ctx, ra.cfg.SinkURI, replicaConfig, ra.errCh, nil)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,10 @@ var (
"invalid replica config, %s",
errors.RFCCodeText("CDC:ErrInvalidReplicaConfig"),
)
ErrInternalCheckFailed = errors.Normalize(
"internal check failed, %s",
errors.RFCCodeText("CDC:ErrInternalCheckFailed"),
)

ErrHandleDDLFailed = errors.Normalize(
"handle ddl failed, job: %s, query: %s, startTs: %d. "+
Expand Down
22 changes: 22 additions & 0 deletions pkg/pdutil/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
pclock "github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -136,3 +137,24 @@ func (c *clock4Test) Run(ctx context.Context) {

func (c *clock4Test) Stop() {
}

type monotonicClock struct {
clock pclock.Clock
}

// NewMonotonicClock return a new monotonic clock.
func NewMonotonicClock(pClock pclock.Clock) Clock {
return &monotonicClock{
clock: pClock,
}
}

func (c *monotonicClock) CurrentTime() time.Time {
return c.clock.Now()
}

func (c *monotonicClock) Run(ctx context.Context) {
}

func (c *monotonicClock) Stop() {
}
Loading

0 comments on commit cff9687

Please sign in to comment.