diff --git a/cdc/api/v1/validator.go b/cdc/api/v1/validator.go index 549d76d5cd0..8b41413eeed 100644 --- a/cdc/api/v1/validator.go +++ b/cdc/api/v1/validator.go @@ -179,8 +179,15 @@ func verifyCreateChangefeedConfig( if err != nil { return nil, cerror.ErrAPIInvalidParam.Wrap(errors.Annotatef(err, "invalid timezone:%s", changefeedConfig.TimeZone)) } +<<<<<<< HEAD ctx = contextutil.PutTimezoneInCtx(ctx, tz) if err := validator.Validate(ctx, info.SinkURI, info.Config); err != nil { +======= + if err := validator.Validate(ctx, + model.ChangeFeedID{Namespace: changefeedConfig.Namespace, ID: changefeedConfig.ID}, + info.SinkURI, info.Config, up.PDClock, + ); err != nil { +>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)) return nil, err } @@ -238,7 +245,13 @@ func VerifyUpdateChangefeedConfig(ctx context.Context, return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err) } +<<<<<<< HEAD if err := validator.Validate(ctx, newInfo.SinkURI, newInfo.Config); err != nil { +======= + if err := validator.Validate(ctx, + model.ChangeFeedID{Namespace: changefeedConfig.Namespace, ID: changefeedConfig.ID}, + newInfo.SinkURI, newInfo.Config, nil); err != nil { +>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)) return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err) } } diff --git a/cdc/api/v2/api_helpers.go b/cdc/api/v2/api_helpers.go index 1e74c97e3b1..1c334c73dd7 100644 --- a/cdc/api/v2/api_helpers.go +++ b/cdc/api/v2/api_helpers.go @@ -228,7 +228,14 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig( } // verify sink +<<<<<<< HEAD if err := validator.Validate(ctx, cfg.SinkURI, replicaCfg); err != nil { +======= + if err := validator.Validate(ctx, + model.ChangeFeedID{Namespace: cfg.Namespace, ID: cfg.ID}, + cfg.SinkURI, replicaCfg, nil, + ); err != nil { +>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)) return nil, err } @@ -352,7 +359,13 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig( return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err) } +<<<<<<< HEAD if err := validator.Validate(ctx, newInfo.SinkURI, newInfo.Config); err != nil { +======= + if err := validator.Validate(ctx, + model.ChangeFeedID{Namespace: cfg.Namespace, ID: cfg.ID}, + newInfo.SinkURI, newInfo.Config, nil); err != nil { +>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)) return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err) } } diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 776af8cdf01..0f080a40fcc 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -350,7 +350,11 @@ func (m *SinkManager) initSinkFactory() (chan error, uint64) { return m.sinkFactory.errors, m.sinkFactory.version } +<<<<<<< HEAD m.sinkFactory.f, err = factory.New(m.managerCtx, uri, cfg, m.sinkFactory.errors) +======= + m.sinkFactory.f, err = factory.New(m.managerCtx, m.changefeedID, uri, cfg, m.sinkFactory.errors, m.up.PDClock) +>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)) if err != nil { emitError(err) return m.sinkFactory.errors, m.sinkFactory.version diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go index 1069ef4a1ce..db92612201e 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go @@ -28,10 +28,10 @@ import ( "github.com/pingcap/tiflow/cdc/sink/metrics" "github.com/pingcap/tiflow/cdc/sink/tablesink/state" "github.com/pingcap/tiflow/cdc/sink/util" - "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/chann" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/pingcap/tiflow/pkg/sink/codec/builder" @@ -94,6 +94,11 @@ type DMLSink struct { // NewDMLSink creates a cloud storage sink. func NewDMLSink(ctx context.Context, +<<<<<<< HEAD +======= + changefeedID model.ChangeFeedID, + pdClock pdutil.Clock, +>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)) sinkURI *url.URL, replicaConfig *config.ReplicaConfig, errCh chan error, @@ -153,11 +158,10 @@ func NewDMLSink(ctx context.Context, // create defragmenter. s.defragmenter = newDefragmenter(encodedCh, workerChannels) // create a group of dml workers. - clock := clock.New() for i := 0; i < cfg.WorkerCount; i++ { inputCh := chann.NewAutoDrainChann[eventFragment]() s.workers[i] = newDMLWorker(i, s.changefeedID, storage, cfg, ext, - inputCh, clock, s.statistics) + inputCh, pdClock, s.statistics) workerChannels[i] = inputCh } diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go index 9ef1fa8e42a..63a2571a3f6 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go @@ -31,12 +31,17 @@ import ( "github.com/pingcap/tiflow/cdc/sink/tablesink/state" "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/config" +<<<<<<< HEAD +======= + "github.com/pingcap/tiflow/pkg/pdutil" + "github.com/pingcap/tiflow/pkg/util" +>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)) "github.com/stretchr/testify/require" ) func setClock(s *DMLSink, clock clock.Clock) { for _, w := range s.workers { - w.filePathGenerator.SetClock(clock) + w.filePathGenerator.SetClock(pdutil.NewMonotonicClock(clock)) } } @@ -126,7 +131,14 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) { replicaConfig.Sink.Protocol = config.ProtocolCsv.String() replicaConfig.Sink.FileIndexWidth = 6 errCh := make(chan error, 5) +<<<<<<< HEAD s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh) +======= + s, err := NewDMLSink(ctx, + model.DefaultChangeFeedID("test"), + pdutil.NewMonotonicClock(clock.New()), + sinkURI, replicaConfig, errCh) +>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)) require.Nil(t, err) var cnt uint64 = 0 batch := 100 @@ -194,10 +206,17 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { replicaConfig.Sink.FileIndexWidth = 6 errCh := make(chan error, 5) +<<<<<<< HEAD s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh) require.Nil(t, err) +======= +>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)) mockClock := clock.NewMock() - setClock(s, mockClock) + s, err := NewDMLSink(ctx, + model.DefaultChangeFeedID("test"), + pdutil.NewMonotonicClock(mockClock), + sinkURI, replicaConfig, errCh) + require.Nil(t, err) var cnt uint64 = 0 batch := 100 @@ -268,11 +287,19 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { // test table is scheduled from one node to another cnt = 0 ctx, cancel = context.WithCancel(context.Background()) +<<<<<<< HEAD s, err = NewDMLSink(ctx, sinkURI, replicaConfig, errCh) require.Nil(t, err) +======= + +>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)) mockClock = clock.NewMock() mockClock.Set(time.Date(2023, 3, 9, 0, 1, 10, 0, time.UTC)) - setClock(s, mockClock) + s, err = NewDMLSink(ctx, + model.DefaultChangeFeedID("test"), + pdutil.NewMonotonicClock(mockClock), + sinkURI, replicaConfig, errCh) + require.Nil(t, err) err = s.WriteEvents(txns...) require.Nil(t, err) diff --git a/cdc/sink/dmlsink/cloudstorage/dml_worker.go b/cdc/sink/dmlsink/cloudstorage/dml_worker.go index 1d0174cc1ee..c1addb12f28 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_worker.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_worker.go @@ -24,9 +24,9 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/metrics" mcloudstorage "github.com/pingcap/tiflow/cdc/sink/metrics/cloudstorage" - "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/chann" "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/prometheus/client_golang/prometheus" @@ -103,7 +103,7 @@ func newDMLWorker( config *cloudstorage.Config, extension string, inputCh *chann.DrainableChann[eventFragment], - clock clock.Clock, + pdClock pdutil.Clock, statistics *metrics.Statistics, ) *dmlWorker { d := &dmlWorker{ @@ -114,7 +114,7 @@ func newDMLWorker( inputCh: inputCh, flushNotifyCh: make(chan dmlTask, 64), statistics: statistics, - filePathGenerator: cloudstorage.NewFilePathGenerator(config, storage, extension, clock), + filePathGenerator: cloudstorage.NewFilePathGenerator(changefeedID, config, storage, extension, pdClock), metricWriteBytes: mcloudstorage.CloudStorageWriteBytesGauge. WithLabelValues(changefeedID.Namespace, changefeedID.ID), metricFileCount: mcloudstorage.CloudStorageFileCountGauge. diff --git a/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go b/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go index 2e8dd5620c5..5d33397423c 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/chann" "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/pingcap/tiflow/pkg/sink/codec/common" @@ -50,9 +51,15 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker { cfg.FileIndexWidth = 6 require.Nil(t, err) +<<<<<<< HEAD statistics := metrics.NewStatistics(ctx, sink.TxnSink) +======= + statistics := metrics.NewStatistics(ctx, model.DefaultChangeFeedID("dml-worker-test"), + sink.TxnSink) + pdlock := pdutil.NewMonotonicClock(clock.New()) +>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)) d := newDMLWorker(1, model.DefaultChangeFeedID("dml-worker-test"), storage, - cfg, ".json", chann.NewAutoDrainChann[eventFragment](), clock.New(), statistics) + cfg, ".json", chann.NewAutoDrainChann[eventFragment](), pdlock, statistics) return d } diff --git a/cdc/sink/dmlsink/factory/factory.go b/cdc/sink/dmlsink/factory/factory.go index e037d1c8916..ebfb3bd45b0 100644 --- a/cdc/sink/dmlsink/factory/factory.go +++ b/cdc/sink/dmlsink/factory/factory.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tiflow/cdc/sink/tablesink" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/sink/kafka" v2 "github.com/pingcap/tiflow/pkg/sink/kafka/v2" @@ -67,6 +68,7 @@ func New( sinkURIStr string, cfg *config.ReplicaConfig, errCh chan error, + pdClock pdutil.Clock, ) (*SinkFactory, error) { sinkURI, err := url.Parse(sinkURIStr) if err != nil { @@ -96,7 +98,11 @@ func New( s.txnSink = mqs s.category = CategoryMQ case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme: +<<<<<<< HEAD storageSink, err := cloudstorage.NewDMLSink(ctx, sinkURI, cfg, errCh) +======= + storageSink, err := cloudstorage.NewDMLSink(ctx, changefeedID, pdClock, sinkURI, cfg, errCh) +>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)) if err != nil { return nil, err } diff --git a/cdc/sink/validator/validator.go b/cdc/sink/validator/validator.go index 302ef8012f5..f442fd66014 100644 --- a/cdc/sink/validator/validator.go +++ b/cdc/sink/validator/validator.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tiflow/cdc/sink/dmlsink/factory" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/sink" pmysql "github.com/pingcap/tiflow/pkg/sink/mysql" "github.com/pingcap/tiflow/pkg/util" @@ -30,7 +31,15 @@ import ( // Validate sink if given valid parameters. // TODO: For now, we create a real sink instance and validate it. // Maybe we should support the dry-run mode to validate sink. +<<<<<<< HEAD func Validate(ctx context.Context, sinkURI string, cfg *config.ReplicaConfig) error { +======= +func Validate(ctx context.Context, + changefeedID model.ChangeFeedID, + sinkURI string, cfg *config.ReplicaConfig, + pdClock pdutil.Clock, +) error { +>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)) uri, err := preCheckSinkURI(sinkURI) if err != nil { return err @@ -47,8 +56,13 @@ func Validate(ctx context.Context, sinkURI string, cfg *config.ReplicaConfig) er } } +<<<<<<< HEAD ctx, cancel := context.WithCancel(contextutil.PutRoleInCtx(ctx, util.RoleClient)) s, err := factory.New(ctx, sinkURI, cfg, make(chan error)) +======= + ctx, cancel := context.WithCancel(ctx) + s, err := factory.New(ctx, changefeedID, sinkURI, cfg, make(chan error), pdClock) +>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)) if err != nil { cancel() return err diff --git a/cdc/sink/validator/validator_test.go b/cdc/sink/validator/validator_test.go index 38b35e3669e..b8c8d001e8a 100644 --- a/cdc/sink/validator/validator_test.go +++ b/cdc/sink/validator/validator_test.go @@ -99,26 +99,42 @@ func TestValidateSink(t *testing.T) { // test sink uri error sinkURI := "mysql://root:111@127.0.0.1:3306/" +<<<<<<< HEAD err := Validate(ctx, sinkURI, replicateConfig) +======= + err := Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig, nil) +>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)) require.NotNil(t, err) require.Contains(t, err.Error(), "fail to open MySQL connection") // test sink uri right sinkURI = "blackhole://" +<<<<<<< HEAD err = Validate(ctx, sinkURI, replicateConfig) +======= + err = Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig, nil) +>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)) require.Nil(t, err) // test bdr mode error replicateConfig.BDRMode = true sinkURI = "blackhole://" +<<<<<<< HEAD err = Validate(ctx, sinkURI, replicateConfig) +======= + err = Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig, nil) +>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)) require.NotNil(t, err) require.Contains(t, err.Error(), "sink uri scheme is not supported in BDR mode") // test sink-scheme/syncpoint error replicateConfig.EnableSyncPoint = true sinkURI = "kafka://" +<<<<<<< HEAD err = Validate(ctx, sinkURI, replicateConfig) +======= + err = Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig, nil) +>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)) require.NotNil(t, err) require.Contains( t, err.Error(), diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 29da7cf23e7..6c0789be929 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -450,12 +450,21 @@ func NewConsumer(ctx context.Context) (*Consumer, error) { partitionNo: i, } } +<<<<<<< HEAD f, err := eventsinkfactory.New( ctx, downstreamURIStr, config.GetDefaultReplicaConfig(), errChan, ) +======= + log.Info("flow controller created for each partition", + zap.Int32("partitionNum", o.partitionNum), + zap.Int("quota", memoryQuotaPerPartition)) + + changefeedID := model.DefaultChangeFeedID("kafka-consumer") + f, err := eventsinkfactory.New(ctx, changefeedID, o.downstreamURI, config.GetDefaultReplicaConfig(), errChan, nil) +>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)) if err != nil { cancel() return nil, errors.Trace(err) diff --git a/cmd/pulsar-consumer/main.go b/cmd/pulsar-consumer/main.go new file mode 100644 index 00000000000..475507d301f --- /dev/null +++ b/cmd/pulsar-consumer/main.go @@ -0,0 +1,714 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "fmt" + "math" + "net/url" + "os" + "os/signal" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" + "syscall" + "time" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/ddlsink" + ddlsinkfactory "github.com/pingcap/tiflow/cdc/sink/ddlsink/factory" + eventsinkfactory "github.com/pingcap/tiflow/cdc/sink/dmlsink/factory" + "github.com/pingcap/tiflow/cdc/sink/tablesink" + sutil "github.com/pingcap/tiflow/cdc/sink/util" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/logutil" + "github.com/pingcap/tiflow/pkg/quotes" + "github.com/pingcap/tiflow/pkg/sink" + "github.com/pingcap/tiflow/pkg/sink/codec" + "github.com/pingcap/tiflow/pkg/sink/codec/canal" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + tpulsar "github.com/pingcap/tiflow/pkg/sink/pulsar" + "github.com/pingcap/tiflow/pkg/spanz" + "github.com/pingcap/tiflow/pkg/util" + "github.com/pingcap/tiflow/pkg/version" + "github.com/prometheus/client_golang/prometheus" + "github.com/spf13/cobra" + "go.uber.org/zap" +) + +// ConsumerOption represents the options of the pulsar consumer +type ConsumerOption struct { + address []string + topic string + + protocol config.Protocol + enableTiDBExtension bool + + logPath string + logLevel string + timezone string + ca, cert, key string + + downstreamURI string + partitionNum int +} + +func newConsumerOption() *ConsumerOption { + return &ConsumerOption{ + protocol: config.ProtocolDefault, + } +} + +// Adjust the consumer option by the upstream uri passed in parameters. +func (o *ConsumerOption) Adjust(upstreamURI *url.URL, configFile string) { + // the default value of partitionNum is 1 + o.partitionNum = 1 + + o.topic = strings.TrimFunc(upstreamURI.Path, func(r rune) bool { + return r == '/' + }) + + o.address = strings.Split(upstreamURI.Host, ",") + + s := upstreamURI.Query().Get("protocol") + if s != "" { + protocol, err := config.ParseSinkProtocolFromString(s) + if err != nil { + log.Panic("invalid protocol", zap.Error(err), zap.String("protocol", s)) + } + if !sutil.IsPulsarSupportedProtocols(protocol) { + log.Panic("unsupported protocol, pulsar sink currently only support these protocols: [canal-json, canal, maxwell]", + zap.String("protocol", s)) + } + o.protocol = protocol + } + + s = upstreamURI.Query().Get("enable-tidb-extension") + if s != "" { + enableTiDBExtension, err := strconv.ParseBool(s) + if err != nil { + log.Panic("invalid enable-tidb-extension of upstream-uri") + } + if enableTiDBExtension { + if o.protocol != config.ProtocolCanalJSON && o.protocol != config.ProtocolAvro { + log.Panic("enable-tidb-extension only work with canal-json / avro") + } + } + o.enableTiDBExtension = enableTiDBExtension + } + + log.Info("consumer option adjusted", + zap.String("configFile", configFile), + zap.String("address", strings.Join(o.address, ",")), + zap.String("topic", o.topic), + zap.Any("protocol", o.protocol), + zap.Bool("enableTiDBExtension", o.enableTiDBExtension)) +} + +var ( + upstreamURIStr string + configFile string + consumerOption = newConsumerOption() +) + +func main() { + cmd := &cobra.Command{ + Use: "pulsar consumer", + Run: run, + } + // Flags for the root command + cmd.Flags().StringVar(&configFile, "config", "", "config file for changefeed") + cmd.Flags().StringVar(&upstreamURIStr, "upstream-uri", "", "pulsar uri") + cmd.Flags().StringVar(&consumerOption.downstreamURI, "downstream-uri", "", "downstream sink uri") + cmd.Flags().StringVar(&consumerOption.timezone, "tz", "System", "Specify time zone of pulsar consumer") + cmd.Flags().StringVar(&consumerOption.ca, "ca", "", "CA certificate path for pulsar SSL connection") + cmd.Flags().StringVar(&consumerOption.cert, "cert", "", "Certificate path for pulsar SSL connection") + cmd.Flags().StringVar(&consumerOption.key, "key", "", "Private key path for pulsar SSL connection") + cmd.Flags().StringVar(&consumerOption.logPath, "log-file", "cdc_pulsar_consumer.log", "log file path") + cmd.Flags().StringVar(&consumerOption.logLevel, "log-level", "info", "log file path") + + if err := cmd.Execute(); err != nil { + fmt.Println(err) + } +} + +func run(cmd *cobra.Command, args []string) { + err := logutil.InitLogger(&logutil.Config{ + Level: consumerOption.logLevel, + File: consumerOption.logPath, + }, + logutil.WithInitGRPCLogger(), + logutil.WithInitSaramaLogger(), + ) + if err != nil { + log.Error("init logger failed", zap.Error(err)) + return + } + + version.LogVersionInfo("pulsar consumer") + + upstreamURI, err := url.Parse(upstreamURIStr) + if err != nil { + log.Panic("invalid upstream-uri", zap.Error(err)) + } + scheme := strings.ToLower(upstreamURI.Scheme) + if !sink.IsPulsarScheme(scheme) { + log.Panic("invalid upstream-uri scheme, the scheme of upstream-uri must be pulsar schema", + zap.String("schema", scheme), + zap.String("upstreamURI", upstreamURIStr)) + } + + consumerOption.Adjust(upstreamURI, configFile) + + ctx, cancel := context.WithCancel(context.Background()) + consumer, err := NewConsumer(ctx, consumerOption) + if err != nil { + log.Panic("Error creating pulsar consumer", zap.Error(err)) + } + + pulsarConsumer, client := NewPulsarConsumer(consumerOption) + defer client.Close() + defer pulsarConsumer.Close() + msgChan := pulsarConsumer.Chan() + + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + log.Info("terminating: context cancelled") + return + case consumerMsg := <-msgChan: + log.Debug(fmt.Sprintf("Received message msgId: %#v -- content: '%s'\n", + consumerMsg.ID(), + string(consumerMsg.Payload()))) + err := consumer.HandleMsg(consumerMsg.Message) + if err != nil { + log.Panic("Error consuming message", zap.Error(err)) + } + err = pulsarConsumer.AckID(consumerMsg.Message.ID()) + if err != nil { + log.Panic("Error ack message", zap.Error(err)) + } + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + if err := consumer.Run(ctx); err != nil { + if err != context.Canceled { + log.Panic("Error running consumer", zap.Error(err)) + } + } + }() + + log.Info("TiCDC consumer up and running!...") + sigterm := make(chan os.Signal, 1) + signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) + select { + case <-ctx.Done(): + log.Info("terminating: context cancelled") + case <-sigterm: + log.Info("terminating: via signal") + } + cancel() + wg.Wait() +} + +// NewPulsarConsumer creates a pulsar consumer +func NewPulsarConsumer(option *ConsumerOption) (pulsar.Consumer, pulsar.Client) { + pulsarURL := "pulsar" + "://" + option.address[0] + topicName := option.topic + subscriptionName := "pulsar-test-subscription" + + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: pulsarURL, + Logger: tpulsar.NewPulsarLogger(), + }) + if err != nil { + log.Fatal("can't create pulsar client: %v", zap.Error(err)) + } + + consumerConfig := pulsar.ConsumerOptions{ + Topic: topicName, + SubscriptionName: subscriptionName, + Type: pulsar.Exclusive, + SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, + } + + consumer, err := client.Subscribe(consumerConfig) + if err != nil { + log.Fatal("can't create pulsar consumer: %v", zap.Error(err)) + } + return consumer, client +} + +// partitionSinks maintained for each partition, it may sync data for multiple tables. +type partitionSinks struct { + tablesCommitTsMap sync.Map + tableSinksMap sync.Map + // resolvedTs record the maximum timestamp of the received event + resolvedTs uint64 +} + +// Consumer represents a local pulsar consumer +type Consumer struct { + eventGroups map[int64]*eventsGroup + ddlList []*model.DDLEvent + ddlListMu sync.Mutex + lastReceivedDDL *model.DDLEvent + ddlSink ddlsink.Sink + fakeTableIDGenerator *fakeTableIDGenerator + + // sinkFactory is used to create table sink for each table. + sinkFactory *eventsinkfactory.SinkFactory + sinks []*partitionSinks + sinksMu sync.Mutex + + // initialize to 0 by default + globalResolvedTs uint64 + + tz *time.Location + + codecConfig *common.Config + + option *ConsumerOption +} + +// NewConsumer creates a new cdc pulsar consumer +// the consumer is responsible for consuming the data from the pulsar topic +// and write the data to the downstream. +func NewConsumer(ctx context.Context, o *ConsumerOption) (*Consumer, error) { + c := new(Consumer) + c.option = o + + tz, err := util.GetTimezone(o.timezone) + if err != nil { + return nil, errors.Annotate(err, "can not load timezone") + } + config.GetGlobalServerConfig().TZ = o.timezone + c.tz = tz + + c.fakeTableIDGenerator = &fakeTableIDGenerator{ + tableIDs: make(map[string]int64), + } + + c.codecConfig = common.NewConfig(o.protocol) + c.codecConfig.EnableTiDBExtension = o.enableTiDBExtension + if c.codecConfig.Protocol == config.ProtocolAvro { + c.codecConfig.AvroEnableWatermark = true + } + + c.sinks = make([]*partitionSinks, o.partitionNum) + ctx, cancel := context.WithCancel(ctx) + errChan := make(chan error, 1) + for i := 0; i < o.partitionNum; i++ { + c.sinks[i] = &partitionSinks{} + } + + changefeedID := model.DefaultChangeFeedID("pulsar-consumer") + f, err := eventsinkfactory.New(ctx, changefeedID, o.downstreamURI, config.GetDefaultReplicaConfig(), errChan, nil) + if err != nil { + cancel() + return nil, errors.Trace(err) + } + c.sinkFactory = f + + go func() { + err := <-errChan + if errors.Cause(err) != context.Canceled { + log.Error("error on running consumer", zap.Error(err)) + } else { + log.Info("consumer exited") + } + cancel() + }() + + ddlSink, err := ddlsinkfactory.New(ctx, changefeedID, o.downstreamURI, config.GetDefaultReplicaConfig()) + if err != nil { + cancel() + return nil, errors.Trace(err) + } + c.ddlSink = ddlSink + c.eventGroups = make(map[int64]*eventsGroup) + return c, nil +} + +type eventsGroup struct { + events []*model.RowChangedEvent +} + +func newEventsGroup() *eventsGroup { + return &eventsGroup{ + events: make([]*model.RowChangedEvent, 0), + } +} + +func (g *eventsGroup) Append(e *model.RowChangedEvent) { + g.events = append(g.events, e) +} + +func (g *eventsGroup) Resolve(resolveTs uint64) []*model.RowChangedEvent { + sort.Slice(g.events, func(i, j int) bool { + return g.events[i].CommitTs < g.events[j].CommitTs + }) + + i := sort.Search(len(g.events), func(i int) bool { + return g.events[i].CommitTs > resolveTs + }) + result := g.events[:i] + g.events = g.events[i:] + + return result +} + +// HandleMsg handles the message received from the pulsar consumer +func (c *Consumer) HandleMsg(msg pulsar.Message) error { + c.sinksMu.Lock() + sink := c.sinks[0] + c.sinksMu.Unlock() + if sink == nil { + panic("sink should initialized") + } + + ctx := context.Background() + var ( + decoder codec.RowEventDecoder + err error + ) + + switch c.codecConfig.Protocol { + case config.ProtocolCanalJSON: + decoder, err = canal.NewBatchDecoder(ctx, c.codecConfig, nil) + if err != nil { + return err + } + default: + log.Panic("Protocol not supported", zap.Any("Protocol", c.codecConfig.Protocol)) + } + if err != nil { + return errors.Trace(err) + } + + if err := decoder.AddKeyValue([]byte(msg.Key()), msg.Payload()); err != nil { + log.Error("add key value to the decoder failed", zap.Error(err)) + return errors.Trace(err) + } + + counter := 0 + for { + tp, hasNext, err := decoder.HasNext() + if err != nil { + log.Panic("decode message key failed", zap.Error(err)) + } + if !hasNext { + break + } + + counter++ + switch tp { + case model.MessageTypeDDL: + // for some protocol, DDL would be dispatched to all partitions, + // Consider that DDL a, b, c received from partition-0, the latest DDL is c, + // if we receive `a` from partition-1, which would be seemed as DDL regression, + // then cause the consumer panic, but it was a duplicate one. + // so we only handle DDL received from partition-0 should be enough. + // but all DDL event messages should be consumed. + ddl, err := decoder.NextDDLEvent() + if err != nil { + log.Panic("decode message value failed", + zap.ByteString("value", msg.Payload()), + zap.Error(err)) + } + log.Info("DDL event received", zap.Any("DDL", ddl)) + c.appendDDL(ddl) + case model.MessageTypeRow: + row, err := decoder.NextRowChangedEvent() + if err != nil { + log.Panic("decode message value failed", + zap.ByteString("value", msg.Payload()), + zap.Error(err)) + } + globalResolvedTs := atomic.LoadUint64(&c.globalResolvedTs) + partitionResolvedTs := atomic.LoadUint64(&sink.resolvedTs) + if row.CommitTs <= globalResolvedTs || row.CommitTs <= partitionResolvedTs { + log.Warn("RowChangedEvent fallback row, ignore it", + zap.Uint64("commitTs", row.CommitTs), + zap.Uint64("globalResolvedTs", globalResolvedTs), + zap.Uint64("partitionResolvedTs", partitionResolvedTs), + zap.Int32("partition", msg.ID().PartitionIdx()), + zap.Any("row", row)) + // todo: mark the offset after the DDL is fully synced to the downstream mysql. + continue + } + var partitionID int64 + if row.Table.IsPartition { + partitionID = row.Table.TableID + } + // use schema, table and tableID to identify a table + tableID := c.fakeTableIDGenerator. + generateFakeTableID(row.Table.Schema, row.Table.Table, partitionID) + row.Table.TableID = tableID + + group, ok := c.eventGroups[tableID] + if !ok { + group = newEventsGroup() + c.eventGroups[tableID] = group + } + group.Append(row) + case model.MessageTypeResolved: + ts, err := decoder.NextResolvedEvent() + if err != nil { + log.Panic("decode message value failed", + zap.ByteString("value", msg.Payload()), + zap.Error(err)) + } + + globalResolvedTs := atomic.LoadUint64(&c.globalResolvedTs) + partitionResolvedTs := atomic.LoadUint64(&sink.resolvedTs) + if ts < globalResolvedTs || ts < partitionResolvedTs { + log.Warn("partition resolved ts fallback, skip it", + zap.Uint64("ts", ts), + zap.Uint64("partitionResolvedTs", partitionResolvedTs), + zap.Uint64("globalResolvedTs", globalResolvedTs), + zap.Int32("partition", msg.ID().PartitionIdx())) + continue + } + + for tableID, group := range c.eventGroups { + events := group.Resolve(ts) + if len(events) == 0 { + continue + } + if _, ok := sink.tableSinksMap.Load(tableID); !ok { + log.Info("create table sink for consumer", zap.Any("tableID", tableID)) + tableSink := c.sinkFactory.CreateTableSinkForConsumer( + model.DefaultChangeFeedID("pulsar-consumer"), + spanz.TableIDToComparableSpan(tableID), + events[0].CommitTs, + prometheus.NewCounter(prometheus.CounterOpts{})) + + log.Info("table sink created", zap.Any("tableID", tableID), + zap.Any("tableSink", tableSink.GetCheckpointTs())) + + sink.tableSinksMap.Store(tableID, tableSink) + } + s, _ := sink.tableSinksMap.Load(tableID) + s.(tablesink.TableSink).AppendRowChangedEvents(events...) + commitTs := events[len(events)-1].CommitTs + lastCommitTs, ok := sink.tablesCommitTsMap.Load(tableID) + if !ok || lastCommitTs.(uint64) < commitTs { + sink.tablesCommitTsMap.Store(tableID, commitTs) + } + } + atomic.StoreUint64(&sink.resolvedTs, ts) + } + + } + return nil +} + +// append DDL wait to be handled, only consider the constraint among DDLs. +// for DDL a / b received in the order, a.CommitTs < b.CommitTs should be true. +func (c *Consumer) appendDDL(ddl *model.DDLEvent) { + c.ddlListMu.Lock() + defer c.ddlListMu.Unlock() + // DDL CommitTs fallback, just crash it to indicate the bug. + if c.lastReceivedDDL != nil && ddl.CommitTs < c.lastReceivedDDL.CommitTs { + log.Panic("DDL CommitTs < lastReceivedDDL.CommitTs", + zap.Uint64("commitTs", ddl.CommitTs), + zap.Uint64("lastReceivedDDLCommitTs", c.lastReceivedDDL.CommitTs), + zap.Any("DDL", ddl)) + } + + // A rename tables DDL job contains multiple DDL events with same CommitTs. + // So to tell if a DDL is redundant or not, we must check the equivalence of + // the current DDL and the DDL with max CommitTs. + if ddl == c.lastReceivedDDL { + log.Info("ignore redundant DDL, the DDL is equal to ddlWithMaxCommitTs", + zap.Any("DDL", ddl)) + return + } + + c.ddlList = append(c.ddlList, ddl) + log.Info("DDL event received", zap.Any("DDL", ddl)) + c.lastReceivedDDL = ddl +} + +func (c *Consumer) getFrontDDL() *model.DDLEvent { + c.ddlListMu.Lock() + defer c.ddlListMu.Unlock() + if len(c.ddlList) > 0 { + return c.ddlList[0] + } + return nil +} + +func (c *Consumer) popDDL() *model.DDLEvent { + c.ddlListMu.Lock() + defer c.ddlListMu.Unlock() + if len(c.ddlList) > 0 { + ddl := c.ddlList[0] + c.ddlList = c.ddlList[1:] + return ddl + } + return nil +} + +func (c *Consumer) forEachSink(fn func(sink *partitionSinks) error) error { + c.sinksMu.Lock() + defer c.sinksMu.Unlock() + for _, sink := range c.sinks { + if err := fn(sink); err != nil { + return errors.Trace(err) + } + } + return nil +} + +// getMinResolvedTs returns the minimum resolvedTs of all the partitionSinks +func (c *Consumer) getMinResolvedTs() (result uint64, err error) { + result = uint64(math.MaxUint64) + err = c.forEachSink(func(sink *partitionSinks) error { + a := atomic.LoadUint64(&sink.resolvedTs) + if a < result { + result = a + } + return nil + }) + return result, err +} + +// Run the Consumer +func (c *Consumer) Run(ctx context.Context) error { + ticker := time.NewTicker(200 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + // 1. Get the minimum resolvedTs of all the partitionSinks + minResolvedTs, err := c.getMinResolvedTs() + if err != nil { + return errors.Trace(err) + } + + // 2. check if there is a DDL event that can be executed + // if there is, execute it and update the minResolvedTs + nextDDL := c.getFrontDDL() + if nextDDL != nil { + log.Info("get nextDDL", zap.Any("DDL", nextDDL)) + } + if nextDDL != nil && minResolvedTs >= nextDDL.CommitTs { + // flush DMLs that commitTs <= todoDDL.CommitTs + if err := c.forEachSink(func(sink *partitionSinks) error { + return flushRowChangedEvents(ctx, sink, nextDDL.CommitTs) + }); err != nil { + return errors.Trace(err) + } + log.Info("begin to execute DDL", zap.Any("DDL", nextDDL)) + // all DMLs with commitTs <= todoDDL.CommitTs have been flushed to downstream, + // so we can execute the DDL now. + if err := c.ddlSink.WriteDDLEvent(ctx, nextDDL); err != nil { + return errors.Trace(err) + } + ddl := c.popDDL() + log.Info("DDL executed", zap.Any("DDL", ddl)) + minResolvedTs = ddl.CommitTs + } + + // 3. Update global resolved ts + if c.globalResolvedTs > minResolvedTs { + log.Panic("global ResolvedTs fallback", + zap.Uint64("globalResolvedTs", c.globalResolvedTs), + zap.Uint64("minPartitionResolvedTs", minResolvedTs)) + } + + if c.globalResolvedTs < minResolvedTs { + c.globalResolvedTs = minResolvedTs + } + + // 4. flush all the DMLs that commitTs <= globalResolvedTs + if err := c.forEachSink(func(sink *partitionSinks) error { + return flushRowChangedEvents(ctx, sink, c.globalResolvedTs) + }); err != nil { + return errors.Trace(err) + } + } + } +} + +// flushRowChangedEvents flushes all the DMLs that commitTs <= resolvedTs +// Note: This function is synchronous, it will block until all the DMLs are flushed. +func flushRowChangedEvents(ctx context.Context, sink *partitionSinks, resolvedTs uint64) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + flushedResolvedTs := true + sink.tablesCommitTsMap.Range(func(key, value interface{}) bool { + tableID := key.(int64) + resolvedTs := model.NewResolvedTs(resolvedTs) + tableSink, ok := sink.tableSinksMap.Load(tableID) + if !ok { + log.Panic("Table sink not found", zap.Int64("tableID", tableID)) + } + if err := tableSink.(tablesink.TableSink).UpdateResolvedTs(resolvedTs); err != nil { + log.Error("Failed to update resolved ts", zap.Error(err)) + return false + } + if !tableSink.(tablesink.TableSink).GetCheckpointTs().EqualOrGreater(resolvedTs) { + flushedResolvedTs = false + } + return true + }) + if flushedResolvedTs { + return nil + } + } +} + +type fakeTableIDGenerator struct { + tableIDs map[string]int64 + currentTableID int64 + mu sync.Mutex +} + +func (g *fakeTableIDGenerator) generateFakeTableID(schema, table string, partition int64) int64 { + g.mu.Lock() + defer g.mu.Unlock() + key := quotes.QuoteSchema(schema, table) + if partition != 0 { + key = fmt.Sprintf("%s.`%d`", key, partition) + } + if tableID, ok := g.tableIDs[key]; ok { + return tableID + } + g.currentTableID++ + g.tableIDs[key] = g.currentTableID + return g.currentTableID +} diff --git a/cmd/storage-consumer/main.go b/cmd/storage-consumer/main.go index 3e881e79087..8e87d368c93 100644 --- a/cmd/storage-consumer/main.go +++ b/cmd/storage-consumer/main.go @@ -191,6 +191,7 @@ func newConsumer(ctx context.Context) (*consumer, error) { downstreamURIStr, config.GetDefaultReplicaConfig(), errCh, + nil, ) if err != nil { log.Error("failed to create event sink factory", zap.Error(err)) diff --git a/errors.toml b/errors.toml index 51f824f7427..4b1036675c7 100755 --- a/errors.toml +++ b/errors.toml @@ -346,6 +346,11 @@ error = ''' incompatible configuration in sink uri(%s) and config file(%s), please try to update the configuration only through sink uri ''' +["CDC:ErrInternalCheckFailed"] +error = ''' +internal check failed, %s +''' + ["CDC:ErrInternalServerError"] error = ''' internal server error diff --git a/pkg/applier/redo.go b/pkg/applier/redo.go index 32fb55cabcd..9c86f24188d 100644 --- a/pkg/applier/redo.go +++ b/pkg/applier/redo.go @@ -122,7 +122,11 @@ func (ra *RedoApplier) catchError(ctx context.Context) error { func (ra *RedoApplier) initSink(ctx context.Context) (err error) { replicaConfig := config.GetDefaultReplicaConfig() +<<<<<<< HEAD ra.sinkFactory, err = dmlfactory.New(ctx, ra.cfg.SinkURI, replicaConfig, ra.errCh) +======= + ra.sinkFactory, err = dmlfactory.New(ctx, ra.changefeedID, ra.cfg.SinkURI, replicaConfig, ra.errCh, nil) +>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)) if err != nil { return err } diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index 5c2e72145b6..a0b5afb1485 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -872,6 +872,10 @@ var ( "invalid replica config, %s", errors.RFCCodeText("CDC:ErrInvalidReplicaConfig"), ) + ErrInternalCheckFailed = errors.Normalize( + "internal check failed, %s", + errors.RFCCodeText("CDC:ErrInternalCheckFailed"), + ) ErrHandleDDLFailed = errors.Normalize( "handle ddl failed, job: %s, query: %s, startTs: %d. "+ diff --git a/pkg/pdutil/clock.go b/pkg/pdutil/clock.go index ee7edec660b..67156093689 100644 --- a/pkg/pdutil/clock.go +++ b/pkg/pdutil/clock.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + pclock "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/retry" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" @@ -141,3 +142,24 @@ func (c *clock4Test) Run(ctx context.Context) { func (c *clock4Test) Stop() { } + +type monotonicClock struct { + clock pclock.Clock +} + +// NewMonotonicClock return a new monotonic clock. +func NewMonotonicClock(pClock pclock.Clock) Clock { + return &monotonicClock{ + clock: pClock, + } +} + +func (c *monotonicClock) CurrentTime() time.Time { + return c.clock.Now() +} + +func (c *monotonicClock) Run(ctx context.Context) { +} + +func (c *monotonicClock) Stop() { +} diff --git a/pkg/sink/cloudstorage/path.go b/pkg/sink/cloudstorage/path.go index dae3f5a8a0c..5a01bb9d6cf 100644 --- a/pkg/sink/cloudstorage/path.go +++ b/pkg/sink/cloudstorage/path.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/hash" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/util" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" @@ -133,11 +134,12 @@ type VersionedTableName struct { // FilePathGenerator is used to generate data file path and index file path. type FilePathGenerator struct { - extension string - config *Config - clock clock.Clock - storage storage.ExternalStorage - fileIndex map[VersionedTableName]*indexWithDate + changefeedID model.ChangeFeedID + extension string + config *Config + pdClock pdutil.Clock + storage storage.ExternalStorage + fileIndex map[VersionedTableName]*indexWithDate hasher *hash.PositionInertia versionMap map[VersionedTableName]uint64 @@ -145,19 +147,27 @@ type FilePathGenerator struct { // NewFilePathGenerator creates a FilePathGenerator. func NewFilePathGenerator( + changefeedID model.ChangeFeedID, config *Config, storage storage.ExternalStorage, extension string, - clock clock.Clock, + pdclock pdutil.Clock, ) *FilePathGenerator { + if pdclock == nil { + pdclock = pdutil.NewMonotonicClock(clock.New()) + log.Warn("pd clock is not set in storage sink, use local clock instead", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeedID", changefeedID.ID)) + } return &FilePathGenerator{ - config: config, - extension: extension, - storage: storage, - clock: clock, - fileIndex: make(map[VersionedTableName]*indexWithDate), - hasher: hash.NewPositionInertia(), - versionMap: make(map[VersionedTableName]uint64), + changefeedID: changefeedID, + config: config, + extension: extension, + storage: storage, + pdClock: pdclock, + fileIndex: make(map[VersionedTableName]*indexWithDate), + hasher: hash.NewPositionInertia(), + versionMap: make(map[VersionedTableName]uint64), } } @@ -176,8 +186,12 @@ func (f *FilePathGenerator) CheckOrWriteSchema( def.FromTableInfo(tableInfo, table.TableInfoVersion) if !def.IsTableSchema() { // only check schema for table - log.Panic("invalid table schema", zap.Any("versionedTableName", table), + log.Error("invalid table schema", + zap.String("namespace", f.changefeedID.Namespace), + zap.String("changefeedID", f.changefeedID.ID), + zap.Any("versionedTableName", table), zap.Any("tableInfo", tableInfo)) + return errors.ErrInternalCheckFailed.GenWithStackByArgs("invalid table schema in FilePathGenerator") } // Case 1: point check if the schema file exists. @@ -210,10 +224,13 @@ func (f *FilePathGenerator) CheckOrWriteSchema( } version, parsedChecksum := mustParseSchemaName(path) if parsedChecksum != checksum { - // TODO: parsedChecksum should be ignored, remove this panic - // after the new path protocol is verified. - log.Panic("invalid schema file name", + log.Error("invalid schema file name", + zap.String("namespace", f.changefeedID.Namespace), + zap.String("changefeedID", f.changefeedID.ID), zap.String("path", path), zap.Any("checksum", checksum)) + errMsg := fmt.Sprintf("invalid schema filename in storage sink, "+ + "expected checksum: %d, actual checksum: %d", checksum, parsedChecksum) + return errors.ErrInternalCheckFailed.GenWithStackByArgs(errMsg) } if version > lastVersion { lastVersion = version @@ -235,6 +252,8 @@ func (f *FilePathGenerator) CheckOrWriteSchema( // b. the schema file is deleted by the consumer. We write schema file to external storage too. if schemaFileCnt != 0 && lastVersion == 0 { log.Warn("no table schema file found in an non-empty meta path", + zap.String("namespace", f.changefeedID.Namespace), + zap.String("changefeedID", f.changefeedID.ID), zap.Any("versionedTableName", table), zap.Uint32("checksum", checksum)) } @@ -247,8 +266,8 @@ func (f *FilePathGenerator) CheckOrWriteSchema( } // SetClock is used for unit test -func (f *FilePathGenerator) SetClock(clock clock.Clock) { - f.clock = clock +func (f *FilePathGenerator) SetClock(pdClock pdutil.Clock) { + f.pdClock = pdClock } // GenerateDateStr generates a date string base on current time @@ -256,7 +275,7 @@ func (f *FilePathGenerator) SetClock(clock clock.Clock) { func (f *FilePathGenerator) GenerateDateStr() string { var dateStr string - currTime := f.clock.Now() + currTime := f.pdClock.CurrentTime() switch f.config.DateSeparator { case config.DateSeparatorYear.String(): dateStr = currTime.Format("2006") diff --git a/pkg/sink/cloudstorage/path_test.go b/pkg/sink/cloudstorage/path_test.go index f8650441626..9ad360aa527 100644 --- a/pkg/sink/cloudstorage/path_test.go +++ b/pkg/sink/cloudstorage/path_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" @@ -49,7 +50,7 @@ func testFilePathGenerator(ctx context.Context, t *testing.T, dir string) *FileP err = cfg.Apply(ctx, sinkURI, replicaConfig) require.NoError(t, err) - f := NewFilePathGenerator(cfg, storage, ".json", clock.New()) + f := NewFilePathGenerator(model.ChangeFeedID{}, cfg, storage, ".json", pdutil.NewMonotonicClock(clock.New())) return f } @@ -84,7 +85,7 @@ func TestGenerateDataFilePath(t *testing.T) { f = testFilePathGenerator(ctx, t, dir) f.versionMap[table] = table.TableInfoVersion f.config.DateSeparator = config.DateSeparatorYear.String() - f.clock = mockClock + f.SetClock(pdutil.NewMonotonicClock(mockClock)) mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC)) date = f.GenerateDateStr() path, err = f.GenerateDataFilePath(ctx, table, date) @@ -108,7 +109,8 @@ func TestGenerateDataFilePath(t *testing.T) { f = testFilePathGenerator(ctx, t, dir) f.versionMap[table] = table.TableInfoVersion f.config.DateSeparator = config.DateSeparatorMonth.String() - f.clock = mockClock + f.SetClock(pdutil.NewMonotonicClock(mockClock)) + mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC)) date = f.GenerateDateStr() path, err = f.GenerateDataFilePath(ctx, table, date) @@ -132,7 +134,8 @@ func TestGenerateDataFilePath(t *testing.T) { f = testFilePathGenerator(ctx, t, dir) f.versionMap[table] = table.TableInfoVersion f.config.DateSeparator = config.DateSeparatorDay.String() - f.clock = mockClock + f.SetClock(pdutil.NewMonotonicClock(mockClock)) + mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC)) date = f.GenerateDateStr() path, err = f.GenerateDataFilePath(ctx, table, date) @@ -210,7 +213,8 @@ func TestGenerateDataFilePathWithIndexFile(t *testing.T) { f := testFilePathGenerator(ctx, t, dir) mockClock := clock.NewMock() f.config.DateSeparator = config.DateSeparatorDay.String() - f.clock = mockClock + f.SetClock(pdutil.NewMonotonicClock(mockClock)) + mockClock.Set(time.Date(2023, 3, 9, 23, 59, 59, 0, time.UTC)) table := VersionedTableName{ TableNameWithPhysicTableID: model.TableName{