Skip to content

Commit

Permalink
This is an automated cherry-pick of #10351
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Jan 22, 2024
1 parent 552a41f commit cefa9c5
Show file tree
Hide file tree
Showing 19 changed files with 921 additions and 35 deletions.
13 changes: 13 additions & 0 deletions cdc/api/v1/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,15 @@ func verifyCreateChangefeedConfig(
if err != nil {
return nil, cerror.ErrAPIInvalidParam.Wrap(errors.Annotatef(err, "invalid timezone:%s", changefeedConfig.TimeZone))
}
<<<<<<< HEAD
ctx = contextutil.PutTimezoneInCtx(ctx, tz)
if err := validator.Validate(ctx, info.SinkURI, info.Config); err != nil {
=======
if err := validator.Validate(ctx,
model.ChangeFeedID{Namespace: changefeedConfig.Namespace, ID: changefeedConfig.ID},
info.SinkURI, info.Config, up.PDClock,
); err != nil {
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
return nil, err
}

Expand Down Expand Up @@ -238,7 +245,13 @@ func VerifyUpdateChangefeedConfig(ctx context.Context,
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}

<<<<<<< HEAD
if err := validator.Validate(ctx, newInfo.SinkURI, newInfo.Config); err != nil {
=======
if err := validator.Validate(ctx,
model.ChangeFeedID{Namespace: changefeedConfig.Namespace, ID: changefeedConfig.ID},
newInfo.SinkURI, newInfo.Config, nil); err != nil {
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}
}
Expand Down
13 changes: 13 additions & 0 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,14 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig(
}

// verify sink
<<<<<<< HEAD
if err := validator.Validate(ctx, cfg.SinkURI, replicaCfg); err != nil {
=======
if err := validator.Validate(ctx,
model.ChangeFeedID{Namespace: cfg.Namespace, ID: cfg.ID},
cfg.SinkURI, replicaCfg, nil,
); err != nil {
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
return nil, err
}

Expand Down Expand Up @@ -352,7 +359,13 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}

<<<<<<< HEAD
if err := validator.Validate(ctx, newInfo.SinkURI, newInfo.Config); err != nil {
=======
if err := validator.Validate(ctx,
model.ChangeFeedID{Namespace: cfg.Namespace, ID: cfg.ID},
newInfo.SinkURI, newInfo.Config, nil); err != nil {
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}
}
Expand Down
4 changes: 4 additions & 0 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,11 @@ func (m *SinkManager) initSinkFactory() (chan error, uint64) {
return m.sinkFactory.errors, m.sinkFactory.version
}

<<<<<<< HEAD
m.sinkFactory.f, err = factory.New(m.managerCtx, uri, cfg, m.sinkFactory.errors)
=======
m.sinkFactory.f, err = factory.New(m.managerCtx, m.changefeedID, uri, cfg, m.sinkFactory.errors, m.up.PDClock)
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
if err != nil {
emitError(err)
return m.sinkFactory.errors, m.sinkFactory.version
Expand Down
10 changes: 7 additions & 3 deletions cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sink/tablesink/state"
"github.com/pingcap/tiflow/cdc/sink/util"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/sink/codec/builder"
Expand Down Expand Up @@ -94,6 +94,11 @@ type DMLSink struct {

// NewDMLSink creates a cloud storage sink.
func NewDMLSink(ctx context.Context,
<<<<<<< HEAD
=======
changefeedID model.ChangeFeedID,
pdClock pdutil.Clock,
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
sinkURI *url.URL,
replicaConfig *config.ReplicaConfig,
errCh chan error,
Expand Down Expand Up @@ -153,11 +158,10 @@ func NewDMLSink(ctx context.Context,
// create defragmenter.
s.defragmenter = newDefragmenter(encodedCh, workerChannels)
// create a group of dml workers.
clock := clock.New()
for i := 0; i < cfg.WorkerCount; i++ {
inputCh := chann.NewAutoDrainChann[eventFragment]()
s.workers[i] = newDMLWorker(i, s.changefeedID, storage, cfg, ext,
inputCh, clock, s.statistics)
inputCh, pdClock, s.statistics)
workerChannels[i] = inputCh
}

Expand Down
33 changes: 30 additions & 3 deletions cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,17 @@ import (
"github.com/pingcap/tiflow/cdc/sink/tablesink/state"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/config"
<<<<<<< HEAD

Check failure on line 34 in cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

missing import path

Check failure on line 34 in cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

missing import path
=======
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/util"
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
"github.com/stretchr/testify/require"
)

func setClock(s *DMLSink, clock clock.Clock) {
for _, w := range s.workers {
w.filePathGenerator.SetClock(clock)
w.filePathGenerator.SetClock(pdutil.NewMonotonicClock(clock))
}
}

Expand Down Expand Up @@ -126,7 +131,14 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) {
replicaConfig.Sink.Protocol = config.ProtocolCsv.String()
replicaConfig.Sink.FileIndexWidth = 6
errCh := make(chan error, 5)
<<<<<<< HEAD
s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh)
=======
s, err := NewDMLSink(ctx,
model.DefaultChangeFeedID("test"),
pdutil.NewMonotonicClock(clock.New()),
sinkURI, replicaConfig, errCh)
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
require.Nil(t, err)
var cnt uint64 = 0
batch := 100
Expand Down Expand Up @@ -194,10 +206,17 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
replicaConfig.Sink.FileIndexWidth = 6

errCh := make(chan error, 5)
<<<<<<< HEAD
s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh)
require.Nil(t, err)
=======
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
mockClock := clock.NewMock()
setClock(s, mockClock)
s, err := NewDMLSink(ctx,
model.DefaultChangeFeedID("test"),
pdutil.NewMonotonicClock(mockClock),
sinkURI, replicaConfig, errCh)
require.Nil(t, err)

var cnt uint64 = 0
batch := 100
Expand Down Expand Up @@ -268,11 +287,19 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
// test table is scheduled from one node to another
cnt = 0
ctx, cancel = context.WithCancel(context.Background())
<<<<<<< HEAD
s, err = NewDMLSink(ctx, sinkURI, replicaConfig, errCh)
require.Nil(t, err)
=======

>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
mockClock = clock.NewMock()
mockClock.Set(time.Date(2023, 3, 9, 0, 1, 10, 0, time.UTC))
setClock(s, mockClock)
s, err = NewDMLSink(ctx,
model.DefaultChangeFeedID("test"),
pdutil.NewMonotonicClock(mockClock),
sinkURI, replicaConfig, errCh)
require.Nil(t, err)

err = s.WriteEvents(txns...)
require.Nil(t, err)
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/dmlsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/metrics"
mcloudstorage "github.com/pingcap/tiflow/cdc/sink/metrics/cloudstorage"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -103,7 +103,7 @@ func newDMLWorker(
config *cloudstorage.Config,
extension string,
inputCh *chann.DrainableChann[eventFragment],
clock clock.Clock,
pdClock pdutil.Clock,
statistics *metrics.Statistics,
) *dmlWorker {
d := &dmlWorker{
Expand All @@ -114,7 +114,7 @@ func newDMLWorker(
inputCh: inputCh,
flushNotifyCh: make(chan dmlTask, 64),
statistics: statistics,
filePathGenerator: cloudstorage.NewFilePathGenerator(config, storage, extension, clock),
filePathGenerator: cloudstorage.NewFilePathGenerator(changefeedID, config, storage, extension, pdClock),
metricWriteBytes: mcloudstorage.CloudStorageWriteBytesGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricFileCount: mcloudstorage.CloudStorageFileCountGauge.
Expand Down
9 changes: 8 additions & 1 deletion cdc/sink/dmlsink/cloudstorage/dml_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
Expand All @@ -50,9 +51,15 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker {
cfg.FileIndexWidth = 6
require.Nil(t, err)

<<<<<<< HEAD
statistics := metrics.NewStatistics(ctx, sink.TxnSink)
=======
statistics := metrics.NewStatistics(ctx, model.DefaultChangeFeedID("dml-worker-test"),
sink.TxnSink)
pdlock := pdutil.NewMonotonicClock(clock.New())
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
d := newDMLWorker(1, model.DefaultChangeFeedID("dml-worker-test"), storage,
cfg, ".json", chann.NewAutoDrainChann[eventFragment](), clock.New(), statistics)
cfg, ".json", chann.NewAutoDrainChann[eventFragment](), pdlock, statistics)
return d
}

Expand Down
6 changes: 6 additions & 0 deletions cdc/sink/dmlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/tablesink"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/kafka"
v2 "github.com/pingcap/tiflow/pkg/sink/kafka/v2"
Expand Down Expand Up @@ -67,6 +68,7 @@ func New(
sinkURIStr string,
cfg *config.ReplicaConfig,
errCh chan error,
pdClock pdutil.Clock,
) (*SinkFactory, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
Expand Down Expand Up @@ -96,7 +98,11 @@ func New(
s.txnSink = mqs
s.category = CategoryMQ
case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme:
<<<<<<< HEAD
storageSink, err := cloudstorage.NewDMLSink(ctx, sinkURI, cfg, errCh)
=======
storageSink, err := cloudstorage.NewDMLSink(ctx, changefeedID, pdClock, sinkURI, cfg, errCh)
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
if err != nil {
return nil, err
}
Expand Down
14 changes: 14 additions & 0 deletions cdc/sink/validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/dmlsink/factory"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
pmysql "github.com/pingcap/tiflow/pkg/sink/mysql"
"github.com/pingcap/tiflow/pkg/util"
Expand All @@ -30,7 +31,15 @@ import (
// Validate sink if given valid parameters.
// TODO: For now, we create a real sink instance and validate it.
// Maybe we should support the dry-run mode to validate sink.
<<<<<<< HEAD
func Validate(ctx context.Context, sinkURI string, cfg *config.ReplicaConfig) error {
=======
func Validate(ctx context.Context,
changefeedID model.ChangeFeedID,
sinkURI string, cfg *config.ReplicaConfig,
pdClock pdutil.Clock,
) error {
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
uri, err := preCheckSinkURI(sinkURI)
if err != nil {
return err
Expand All @@ -47,8 +56,13 @@ func Validate(ctx context.Context, sinkURI string, cfg *config.ReplicaConfig) er
}
}

<<<<<<< HEAD
ctx, cancel := context.WithCancel(contextutil.PutRoleInCtx(ctx, util.RoleClient))
s, err := factory.New(ctx, sinkURI, cfg, make(chan error))
=======
ctx, cancel := context.WithCancel(ctx)
s, err := factory.New(ctx, changefeedID, sinkURI, cfg, make(chan error), pdClock)
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
if err != nil {
cancel()
return err
Expand Down
16 changes: 16 additions & 0 deletions cdc/sink/validator/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,26 +99,42 @@ func TestValidateSink(t *testing.T) {

// test sink uri error
sinkURI := "mysql://root:[email protected]:3306/"
<<<<<<< HEAD
err := Validate(ctx, sinkURI, replicateConfig)
=======
err := Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig, nil)
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
require.NotNil(t, err)
require.Contains(t, err.Error(), "fail to open MySQL connection")

// test sink uri right
sinkURI = "blackhole://"
<<<<<<< HEAD
err = Validate(ctx, sinkURI, replicateConfig)
=======
err = Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig, nil)
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
require.Nil(t, err)

// test bdr mode error
replicateConfig.BDRMode = true
sinkURI = "blackhole://"
<<<<<<< HEAD
err = Validate(ctx, sinkURI, replicateConfig)
=======
err = Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig, nil)
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
require.NotNil(t, err)
require.Contains(t, err.Error(), "sink uri scheme is not supported in BDR mode")

// test sink-scheme/syncpoint error
replicateConfig.EnableSyncPoint = true
sinkURI = "kafka://"
<<<<<<< HEAD
err = Validate(ctx, sinkURI, replicateConfig)
=======
err = Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig, nil)
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
require.NotNil(t, err)
require.Contains(
t, err.Error(),
Expand Down
9 changes: 9 additions & 0 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,12 +450,21 @@ func NewConsumer(ctx context.Context) (*Consumer, error) {
partitionNo: i,
}
}
<<<<<<< HEAD
f, err := eventsinkfactory.New(
ctx,
downstreamURIStr,
config.GetDefaultReplicaConfig(),
errChan,
)
=======
log.Info("flow controller created for each partition",
zap.Int32("partitionNum", o.partitionNum),
zap.Int("quota", memoryQuotaPerPartition))

changefeedID := model.DefaultChangeFeedID("kafka-consumer")
f, err := eventsinkfactory.New(ctx, changefeedID, o.downstreamURI, config.GetDefaultReplicaConfig(), errChan, nil)
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
if err != nil {
cancel()
return nil, errors.Trace(err)
Expand Down
Loading

0 comments on commit cefa9c5

Please sign in to comment.