diff --git a/cdc/api/v2/api.go b/cdc/api/v2/api.go index 09a2849cdb2..a6b8b48c4ba 100644 --- a/cdc/api/v2/api.go +++ b/cdc/api/v2/api.go @@ -63,6 +63,7 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) { changefeedGroup.POST("/:changefeed_id/resume", changefeedOwnerMiddleware, api.resumeChangefeed) changefeedGroup.POST("/:changefeed_id/pause", changefeedOwnerMiddleware, api.pauseChangefeed) changefeedGroup.GET("/:changefeed_id/status", changefeedOwnerMiddleware, api.status) + changefeedGroup.GET("/:changefeed_id/synced", changefeedOwnerMiddleware, api.synced) // capture apis captureGroup := v2.Group("/captures") diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index 7d5373c526d..cbbbc8550ff 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -895,6 +895,84 @@ func (h *OpenAPIV2) status(c *gin.Context) { }) } +func (h *OpenAPIV2) synced(c *gin.Context) { + ctx := c.Request.Context() + + namespace := getNamespaceValueWithDefault(c) + changefeedID := model.ChangeFeedID{Namespace: namespace, ID: c.Param(apiOpVarChangefeedID)} + if err := model.ValidateChangefeedID(changefeedID.ID); err != nil { + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", + changefeedID.ID)) + return + } + status, err := h.capture.StatusProvider().GetChangeFeedSyncedStatus( + ctx, + changefeedID, + ) + if err != nil { + _ = c.Error(err) + return + } + + // get pd_now + cfg := &ChangefeedConfig{ReplicaConfig: GetDefaultReplicaConfig()} + + if err := c.BindJSON(&cfg); err != nil { + _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) + return + } + if len(cfg.PDAddrs) == 0 { + up, err := getCaptureDefaultUpstream(h.capture) + if err != nil { + _ = c.Error(err) + return + } + cfg.PDConfig = getUpstreamPDConfig(up) + } + credential := cfg.PDConfig.toCredential() + + timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential) + defer pdClient.Close() + if err != nil { + // _ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err)) + // return + // means pd is offline + c.JSON(http.StatusOK, map[string]any{ + "Synced": false, + "CheckpointTs": status.CheckpointTs, + "ResolvedTs": status.ResolvedTs, + "LastSyncTime": status.LastSyncTime, + "info": "xxxxx", + }) + return + } + + //TSO 是啥? + now, _, _ := pdClient.GetTS(ctx) + // get pd_now + // 随便写一个先 + if (now-int64(status.LastSyncTime) > 60*5) && (now-int64(status.CheckpointTs) < 5) { + c.JSON(http.StatusOK, map[string]any{ + "Synced": true, + "CheckpointTs": status.CheckpointTs, + "ResolvedTs": status.ResolvedTs, + "LastSyncTime": status.LastSyncTime, + "info": "", + }) + } else { + c.JSON(http.StatusOK, map[string]any{ + "Synced": false, + "CheckpointTs": status.CheckpointTs, + "ResolvedTs": status.ResolvedTs, + "LastSyncTime": status.LastSyncTime, + "info": "xxxxx", + }) + } + +} + func toAPIModel( info *model.ChangeFeedInfo, resolvedTs uint64, diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index 805d1a56729..34a5aff5c3e 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -629,3 +629,9 @@ type ChangeFeedStatusForAPI struct { ResolvedTs uint64 `json:"resolved-ts"` CheckpointTs uint64 `json:"checkpoint-ts"` } + +type ChangeFeedSyncedStatusForAPI struct { + ResolvedTs uint64 `json:"resolved-ts"` + CheckpointTs uint64 `json:"checkpoint-ts"` + LastSyncTime uint64 `json:"last-sync-time"` +} diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 436485e46f4..47e80d954e4 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -93,6 +93,7 @@ type changefeed struct { barriers *barriers feedStateManager FeedStateManager resolvedTs model.Ts + lastSyncTime model.Ts // ddl related fields ddlManager *ddlManager @@ -272,7 +273,8 @@ func (c *changefeed) Tick(ctx cdcContext.Context, } return nil }) - checkpointTs, minTableBarrierTs, err := c.tick(ctx, captures) + checkpointTs, minTableBarrierTs, maxLastSyncTime, err := c.tick(ctx, captures) + c.lastSyncTime = maxLastSyncTime // The tick duration is recorded only if changefeed has completed initialization if c.initialized { @@ -350,50 +352,50 @@ func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, // tick returns the checkpointTs and minTableBarrierTs. func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*model.CaptureInfo, -) (model.Ts, model.Ts, error) { +) (model.Ts, model.Ts, model.Ts, error) { adminJobPending := c.feedStateManager.Tick(c.resolvedTs, c.latestStatus, c.latestInfo) preCheckpointTs := c.latestInfo.GetCheckpointTs(c.latestStatus) // checkStaleCheckpointTs must be called before `feedStateManager.ShouldRunning()` // to ensure all changefeeds, no matter whether they are running or not, will be checked. if err := c.checkStaleCheckpointTs(ctx, c.latestInfo, preCheckpointTs); err != nil { - return 0, 0, errors.Trace(err) + return 0, 0, 0, errors.Trace(err) } if !c.feedStateManager.ShouldRunning() { c.isRemoved = c.feedStateManager.ShouldRemoved() c.releaseResources(ctx) - return 0, 0, nil + return 0, 0, 0, nil } if adminJobPending { - return 0, 0, nil + return 0, 0, 0, nil } if err := c.initialize(ctx); err != nil { - return 0, 0, errors.Trace(err) + return 0, 0, 0, errors.Trace(err) } select { case err := <-c.errCh: - return 0, 0, errors.Trace(err) + return 0, 0, 0, errors.Trace(err) default: } if c.redoMetaMgr.Enabled() { if !c.redoMetaMgr.Running() { - return 0, 0, nil + return 0, 0, 0, nil } } // TODO: pass table checkpointTs when we support concurrent process ddl allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil) if err != nil { - return 0, 0, errors.Trace(err) + return 0, 0, 0, errors.Trace(err) } err = c.handleBarrier(ctx, c.latestInfo, c.latestStatus, barrier) if err != nil { - return 0, 0, errors.Trace(err) + return 0, 0, 0, errors.Trace(err) } log.Debug("owner handles barrier", @@ -409,14 +411,14 @@ func (c *changefeed) tick(ctx cdcContext.Context, // This condition implies that the DDL resolved-ts has not yet reached checkpointTs, // which implies that it would be premature to schedule tables or to update status. // So we return here. - return 0, 0, nil + return 0, 0, 0, nil } - newCheckpointTs, newResolvedTs, err := c.scheduler.Tick( + newCheckpointTs, newResolvedTs, newLastSyncTime, err := c.scheduler.Tick( ctx, preCheckpointTs, allPhysicalTables, captures, barrier) if err != nil { - return 0, 0, errors.Trace(err) + return 0, 0, 0, errors.Trace(err) } pdTime := c.upstream.PDClock.CurrentTime() @@ -430,7 +432,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, // advance the watermarks for now. c.updateMetrics(currentTs, c.latestStatus.CheckpointTs, c.resolvedTs) } - return 0, 0, nil + return 0, 0, 0, nil } log.Debug("owner prepares to update status", @@ -463,7 +465,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, c.updateMetrics(currentTs, newCheckpointTs, c.resolvedTs) c.tickDownstreamObserver(ctx) - return newCheckpointTs, barrier.MinTableBarrierTs, nil + return newCheckpointTs, barrier.MinTableBarrierTs, newLastSyncTime, nil } func (c *changefeed) initialize(ctx cdcContext.Context) (err error) { diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 966e26da00f..166cfc78df1 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -587,7 +587,7 @@ func (o *ownerImpl) handleJobs(ctx context.Context) { func (o *ownerImpl) handleQueries(query *Query) error { switch query.Tp { - case QueryChangeFeedStatuses: + case QueryChangeFeedSyncedStatus: cfReactor, ok := o.changefeeds[query.ChangeFeedID] if !ok { query.Data = nil @@ -597,6 +597,17 @@ func (o *ownerImpl) handleQueries(query *Query) error { ret.ResolvedTs = cfReactor.resolvedTs ret.CheckpointTs = cfReactor.latestStatus.CheckpointTs query.Data = ret + case QueryChangeFeedStatuses: + cfReactor, ok := o.changefeeds[query.ChangeFeedID] + if !ok { + query.Data = nil + return nil + } + ret := &model.ChangeFeedSyncedStatusForAPI{} + ret.ResolvedTs = cfReactor.resolvedTs + ret.LastSyncTime = cfReactor.lastSyncTime + ret.CheckpointTs = cfReactor.latestStatus.CheckpointTs + query.Data = ret case QueryChangefeedInfo: cfReactor, ok := o.changefeeds[query.ChangeFeedID] if !ok { diff --git a/cdc/owner/status_provider.go b/cdc/owner/status_provider.go index f1b6a057ddd..2cea1e2cd81 100644 --- a/cdc/owner/status_provider.go +++ b/cdc/owner/status_provider.go @@ -27,6 +27,8 @@ type StatusProvider interface { // GetChangeFeedStatus returns a changefeeds' runtime status. GetChangeFeedStatus(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedStatusForAPI, error) + GetChangeFeedSyncedStatus(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedSyncedStatusForAPI, error) + // GetChangeFeedInfo returns a changefeeds' info. GetChangeFeedInfo(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedInfo, error) @@ -63,6 +65,8 @@ const ( QueryChangefeedInfo // QueryChangeFeedStatuses is the type of query changefeed status QueryChangeFeedStatuses + // QueryChangeFeedSyncedStatus is the type of query changefeed synced status + QueryChangeFeedSyncedStatus ) // Query wraps query command and return results. @@ -98,6 +102,22 @@ func (p *ownerStatusProvider) GetChangeFeedStatus(ctx context.Context, return query.Data.(*model.ChangeFeedStatusForAPI), nil } +func (p *ownerStatusProvider) GetChangeFeedSyncedStatus(ctx context.Context, + changefeedID model.ChangeFeedID, +) (*model.ChangeFeedSyncedStatusForAPI, error) { + query := &Query{ + Tp: QueryChangeFeedSyncedStatus, + ChangeFeedID: changefeedID, + } + if err := p.sendQueryToOwner(ctx, query); err != nil { + return nil, errors.Trace(err) + } + if query.Data == nil { + return nil, cerror.ErrChangeFeedNotExists.GenWithStackByArgs(changefeedID) + } + return query.Data.(*model.ChangeFeedSyncedStatusForAPI), nil +} + func (p *ownerStatusProvider) GetChangeFeedInfo(ctx context.Context, changefeedID model.ChangeFeedID, ) (*model.ChangeFeedInfo, error) { diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index bff2b877445..269eadfcf1d 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -366,6 +366,7 @@ func (p *processor) GetTableSpanStatus(span tablepb.Span, collectStat bool) tabl Checkpoint: tablepb.Checkpoint{ CheckpointTs: sinkStats.CheckpointTs, ResolvedTs: sinkStats.ResolvedTs, + LastSyncTime: sinkStats.LastSyncTime, }, State: state, Stats: stats, diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 74c5513888c..c34ec1bc3ed 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -58,6 +58,7 @@ const ( type TableStats struct { CheckpointTs model.Ts ResolvedTs model.Ts + LastSyncTime model.Ts BarrierTs model.Ts } @@ -140,6 +141,7 @@ func New( redoDMLMgr redo.DMLManager, sourceManager *sourcemanager.SourceManager, ) *SinkManager { + m := &SinkManager{ changefeedID: changefeedID, changefeedInfo: changefeedInfo, @@ -961,6 +963,7 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats { tableSink := value.(*tableSinkWrapper) checkpointTs := tableSink.getCheckpointTs() + lastSyncTime := tableSink.getLastSyncTime() m.sinkMemQuota.Release(span, checkpointTs) m.redoMemQuota.Release(span, checkpointTs) @@ -1003,6 +1006,7 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats { return TableStats{ CheckpointTs: checkpointTs.ResolvedMark(), ResolvedTs: resolvedTs, + LastSyncTime: lastSyncTime, BarrierTs: tableSink.barrierTs.Load(), } } diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 8d21d69709b..ac946ea1446 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -218,6 +218,12 @@ func (t *tableSinkWrapper) updateResolvedTs(ts model.ResolvedTs) error { return t.tableSink.s.UpdateResolvedTs(ts) } +func (t *tableSinkWrapper) getLastSyncTime() uint64 { + t.tableSink.RLock() + defer t.tableSink.RUnlock() + return t.tableSink.s.GetLastSyncTime() +} + func (t *tableSinkWrapper) getCheckpointTs() model.ResolvedTs { t.tableSink.RLock() defer t.tableSink.RUnlock() diff --git a/cdc/processor/tablepb/table.pb.go b/cdc/processor/tablepb/table.pb.go index da01e6d59ef..001685818e8 100644 --- a/cdc/processor/tablepb/table.pb.go +++ b/cdc/processor/tablepb/table.pb.go @@ -117,6 +117,7 @@ var xxx_messageInfo_Span proto.InternalMessageInfo type Checkpoint struct { CheckpointTs Ts `protobuf:"varint,1,opt,name=checkpoint_ts,json=checkpointTs,proto3,casttype=Ts" json:"checkpoint_ts,omitempty"` ResolvedTs Ts `protobuf:"varint,2,opt,name=resolved_ts,json=resolvedTs,proto3,casttype=Ts" json:"resolved_ts,omitempty"` + LastSyncTime Ts `protobuf:"varint,3,opt,name=last_sync_time,json=lastSyncTime,proto3,casttype=Ts" json:"last_sync_time,omitempty"` } func (m *Checkpoint) Reset() { *m = Checkpoint{} } @@ -166,6 +167,13 @@ func (m *Checkpoint) GetResolvedTs() Ts { return 0 } +func (m *Checkpoint) GetLastSyncTime() Ts { + if m != nil { + return m.LastSyncTime + } + return 0 +} + // Stats holds a statistic for a table. type Stats struct { // Number of captured regions. @@ -329,50 +337,52 @@ func init() { func init() { proto.RegisterFile("processor/tablepb/table.proto", fileDescriptor_ae83c9c6cf5ef75c) } var fileDescriptor_ae83c9c6cf5ef75c = []byte{ - // 688 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0xbf, 0x6f, 0xd3, 0x4e, - 0x1c, 0xb5, 0x9d, 0x5f, 0xcd, 0x27, 0xf9, 0x7e, 0xe5, 0xde, 0xb7, 0xed, 0x37, 0x44, 0x22, 0x31, - 0x51, 0x81, 0xaa, 0x95, 0x1c, 0x28, 0x0b, 0xea, 0xd6, 0xb4, 0x80, 0xaa, 0x0a, 0x09, 0xb9, 0x81, - 0x81, 0x25, 0xf2, 0x8f, 0xc3, 0xb5, 0x1a, 0xce, 0x96, 0xef, 0xd2, 0x2a, 0x1b, 0x23, 0xca, 0x02, - 0x13, 0x62, 0x89, 0xd4, 0x3f, 0xa7, 0x63, 0x47, 0x06, 0x14, 0x41, 0x2a, 0x66, 0xf6, 0x4e, 0xe8, - 0xee, 0xdc, 0xb8, 0x09, 0x0c, 0xa1, 0x4b, 0x72, 0xbe, 0xf7, 0x3e, 0xcf, 0xef, 0xbd, 0x3b, 0x19, - 0x6e, 0x47, 0x71, 0xe8, 0x62, 0x4a, 0xc3, 0xb8, 0xc9, 0x6c, 0xa7, 0x8b, 0x23, 0x47, 0xfe, 0x9b, - 0x51, 0x1c, 0xb2, 0x10, 0xad, 0x46, 0x01, 0xf1, 0x5d, 0x3b, 0x32, 0x59, 0xf0, 0xa6, 0x1b, 0x9e, - 0x98, 0xae, 0xe7, 0x9a, 0x93, 0x09, 0x33, 0x99, 0xa8, 0x2e, 0xf9, 0xa1, 0x1f, 0x8a, 0x81, 0x26, - 0x5f, 0xc9, 0xd9, 0xc6, 0x07, 0x15, 0xb2, 0x07, 0x91, 0x4d, 0xd0, 0x43, 0x58, 0x10, 0xcc, 0x4e, - 0xe0, 0x55, 0x54, 0x43, 0x5d, 0xcb, 0xb4, 0x56, 0xc6, 0xa3, 0x7a, 0xa1, 0xcd, 0xf7, 0xf6, 0x76, - 0x2f, 0xd3, 0xa5, 0x55, 0x10, 0xbc, 0x3d, 0x0f, 0xad, 0x42, 0x91, 0x32, 0x3b, 0x66, 0x9d, 0x23, - 0xdc, 0xaf, 0x68, 0x86, 0xba, 0x56, 0x6e, 0x15, 0x2e, 0x47, 0xf5, 0xcc, 0x3e, 0xee, 0x5b, 0x0b, - 0x02, 0xd9, 0xc7, 0x7d, 0x64, 0x40, 0x01, 0x13, 0x4f, 0x70, 0x32, 0xd3, 0x9c, 0x3c, 0x26, 0xde, - 0x3e, 0xee, 0x6f, 0x95, 0xdf, 0x9f, 0xd6, 0x95, 0xcf, 0xa7, 0x75, 0xe5, 0xdd, 0x57, 0x43, 0x69, - 0x38, 0x00, 0x3b, 0x87, 0xd8, 0x3d, 0x8a, 0xc2, 0x80, 0x30, 0xb4, 0x01, 0xff, 0xb8, 0x93, 0xa7, - 0x0e, 0xa3, 0xc2, 0x5b, 0xb6, 0x95, 0xbf, 0x1c, 0xd5, 0xb5, 0x36, 0xb5, 0xca, 0x29, 0xd8, 0xa6, - 0xe8, 0x3e, 0x94, 0x62, 0x4c, 0xc3, 0xee, 0x31, 0xf6, 0x38, 0x55, 0x9b, 0xa2, 0xc2, 0x15, 0xd4, - 0xa6, 0x8d, 0x1f, 0x1a, 0xe4, 0x0e, 0x98, 0xcd, 0x28, 0xba, 0x03, 0xe5, 0x18, 0xfb, 0x41, 0x48, - 0x3a, 0x6e, 0xd8, 0x23, 0x4c, 0xca, 0x5b, 0x25, 0xb9, 0xb7, 0xc3, 0xb7, 0xd0, 0x5d, 0x00, 0xb7, - 0x17, 0xc7, 0x58, 0xbe, 0x7f, 0x5a, 0xb4, 0x98, 0x20, 0x6d, 0x8a, 0x18, 0x2c, 0x52, 0x66, 0xfb, - 0xb8, 0x93, 0x5a, 0xa2, 0x95, 0x8c, 0x91, 0x59, 0x2b, 0x6d, 0x6e, 0x9b, 0xf3, 0x9c, 0x90, 0x29, - 0x1c, 0xf1, 0x5f, 0x1f, 0xa7, 0x0d, 0xd0, 0x27, 0x84, 0xc5, 0xfd, 0x56, 0xf6, 0x6c, 0x54, 0x57, - 0x2c, 0x9d, 0xce, 0x80, 0xdc, 0x9c, 0x63, 0xc7, 0x71, 0x80, 0x63, 0x6e, 0x2e, 0x3b, 0x6d, 0x2e, - 0x41, 0xda, 0xb4, 0xda, 0x83, 0xe5, 0x3f, 0xea, 0x22, 0x1d, 0x32, 0xfc, 0x64, 0x78, 0xec, 0xa2, - 0xc5, 0x97, 0xe8, 0x29, 0xe4, 0x8e, 0xed, 0x6e, 0x0f, 0x8b, 0xa4, 0xa5, 0xcd, 0x07, 0xf3, 0x79, - 0x4f, 0x85, 0x2d, 0x39, 0xbe, 0xa5, 0x3d, 0x56, 0x1b, 0x3f, 0x35, 0x28, 0x89, 0x6b, 0xc3, 0xa3, - 0xf5, 0xe8, 0x4d, 0x2e, 0xd9, 0x2e, 0x64, 0x69, 0x64, 0x93, 0x4a, 0x4e, 0xb8, 0x59, 0x9f, 0xb3, - 0xc9, 0xc8, 0x26, 0x49, 0x65, 0x62, 0x9a, 0x87, 0xa2, 0xcc, 0x66, 0x32, 0xd4, 0xbf, 0xf3, 0x86, - 0x9a, 0x58, 0xc7, 0x96, 0x1c, 0x47, 0xaf, 0x00, 0xd2, 0xe3, 0x15, 0xf7, 0xf9, 0x06, 0x0d, 0x25, - 0xce, 0xae, 0x29, 0xa1, 0x67, 0xd2, 0x9f, 0x3c, 0xc1, 0xd2, 0xe6, 0xc6, 0x5f, 0x5c, 0x98, 0x44, - 0x4d, 0xce, 0xaf, 0x7f, 0xd2, 0x00, 0x52, 0xdb, 0xa8, 0x01, 0x85, 0x97, 0xe4, 0x88, 0x84, 0x27, - 0x44, 0x57, 0xaa, 0xcb, 0x83, 0xa1, 0xb1, 0x98, 0x82, 0x09, 0x80, 0x0c, 0xc8, 0x6f, 0x3b, 0x14, - 0x13, 0xa6, 0xab, 0xd5, 0xa5, 0xc1, 0xd0, 0xd0, 0x53, 0x8a, 0xdc, 0x47, 0xf7, 0xa0, 0xf8, 0x22, - 0xc6, 0x91, 0x1d, 0x07, 0xc4, 0xd7, 0xb5, 0xea, 0xff, 0x83, 0xa1, 0xf1, 0x5f, 0x4a, 0x9a, 0x40, - 0x68, 0x15, 0x16, 0xe4, 0x03, 0xf6, 0xf4, 0x4c, 0x75, 0x65, 0x30, 0x34, 0xd0, 0x2c, 0x0d, 0x7b, - 0x68, 0x1d, 0x4a, 0x16, 0x8e, 0xba, 0x81, 0x6b, 0x33, 0xae, 0x97, 0xad, 0xde, 0x1a, 0x0c, 0x8d, - 0xe5, 0x6b, 0x5d, 0xa7, 0x20, 0x57, 0x3c, 0x60, 0x61, 0xc4, 0xdb, 0xd0, 0x73, 0xb3, 0x8a, 0x57, - 0x08, 0x4f, 0x29, 0xd6, 0xd8, 0xd3, 0xf3, 0xb3, 0x29, 0x13, 0xa0, 0xf5, 0xfc, 0xfc, 0x7b, 0x4d, - 0x39, 0x1b, 0xd7, 0xd4, 0xf3, 0x71, 0x4d, 0xfd, 0x36, 0xae, 0xa9, 0x1f, 0x2f, 0x6a, 0xca, 0xf9, - 0x45, 0x4d, 0xf9, 0x72, 0x51, 0x53, 0x5e, 0x37, 0xfd, 0x80, 0x1d, 0xf6, 0x1c, 0xd3, 0x0d, 0xdf, - 0x36, 0x93, 0xea, 0x9b, 0xb2, 0xfa, 0xa6, 0xeb, 0xb9, 0xcd, 0xdf, 0xbe, 0xbf, 0x4e, 0x5e, 0x7c, - 0x3e, 0x1f, 0xfd, 0x0a, 0x00, 0x00, 0xff, 0xff, 0x80, 0x0e, 0x45, 0x99, 0x9b, 0x05, 0x00, 0x00, + // 712 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0xbf, 0x6f, 0xd3, 0x40, + 0x18, 0xb5, 0x9d, 0x5f, 0xcd, 0x97, 0x50, 0xb9, 0x47, 0x5b, 0x42, 0x24, 0x12, 0x13, 0x15, 0xa8, + 0x5a, 0xe4, 0x40, 0x59, 0x50, 0xb7, 0xa6, 0x05, 0x54, 0x55, 0x48, 0xc8, 0x09, 0x0c, 0x2c, 0x91, + 0x63, 0x1f, 0xae, 0xd5, 0xf4, 0x6c, 0xf9, 0x2e, 0xad, 0xbc, 0x31, 0xa2, 0x2c, 0x74, 0x42, 0x2c, + 0x91, 0xfa, 0xe7, 0x74, 0xec, 0xc8, 0x80, 0x22, 0x48, 0xc5, 0xcc, 0xde, 0x09, 0xdd, 0xd9, 0x8d, + 0x9b, 0xc0, 0x10, 0xba, 0x24, 0xe7, 0x7b, 0xef, 0x7b, 0x7a, 0xef, 0xdd, 0xe9, 0xe0, 0x9e, 0x1f, + 0x78, 0x16, 0xa6, 0xd4, 0x0b, 0xea, 0xcc, 0xec, 0x74, 0xb1, 0xdf, 0x89, 0xfe, 0x75, 0x3f, 0xf0, + 0x98, 0x87, 0x56, 0x7c, 0x97, 0x38, 0x96, 0xe9, 0xeb, 0xcc, 0xfd, 0xd0, 0xf5, 0x8e, 0x75, 0xcb, + 0xb6, 0xf4, 0xf1, 0x84, 0x1e, 0x4f, 0x94, 0x17, 0x1d, 0xcf, 0xf1, 0xc4, 0x40, 0x9d, 0xaf, 0xa2, + 0xd9, 0xda, 0x67, 0x19, 0xd2, 0x4d, 0xdf, 0x24, 0xe8, 0x29, 0xcc, 0x09, 0x66, 0xdb, 0xb5, 0x4b, + 0xb2, 0x26, 0xaf, 0xa6, 0x1a, 0xcb, 0xa3, 0x61, 0x35, 0xd7, 0xe2, 0x7b, 0xbb, 0x3b, 0x97, 0xc9, + 0xd2, 0xc8, 0x09, 0xde, 0xae, 0x8d, 0x56, 0x20, 0x4f, 0x99, 0x19, 0xb0, 0xf6, 0x01, 0x0e, 0x4b, + 0x8a, 0x26, 0xaf, 0x16, 0x1b, 0xb9, 0xcb, 0x61, 0x35, 0xb5, 0x87, 0x43, 0x63, 0x4e, 0x20, 0x7b, + 0x38, 0x44, 0x1a, 0xe4, 0x30, 0xb1, 0x05, 0x27, 0x35, 0xc9, 0xc9, 0x62, 0x62, 0xef, 0xe1, 0x70, + 0xb3, 0xf8, 0xe9, 0xb4, 0x2a, 0x7d, 0x3d, 0xad, 0x4a, 0x1f, 0xbf, 0x6b, 0x52, 0xed, 0x44, 0x06, + 0xd8, 0xde, 0xc7, 0xd6, 0x81, 0xef, 0xb9, 0x84, 0xa1, 0x75, 0xb8, 0x65, 0x8d, 0xbf, 0xda, 0x8c, + 0x0a, 0x73, 0xe9, 0x46, 0xf6, 0x72, 0x58, 0x55, 0x5a, 0xd4, 0x28, 0x26, 0x60, 0x8b, 0xa2, 0x47, + 0x50, 0x08, 0x30, 0xf5, 0xba, 0x47, 0xd8, 0xe6, 0x54, 0x65, 0x82, 0x0a, 0x57, 0x50, 0x8b, 0xa2, + 0xc7, 0x30, 0xdf, 0x35, 0x29, 0x6b, 0xd3, 0x90, 0x58, 0x6d, 0xe6, 0x1e, 0x62, 0xe1, 0xed, 0x9a, + 0x2c, 0x47, 0x9b, 0x21, 0xb1, 0x5a, 0xee, 0x21, 0xae, 0xfd, 0x52, 0x20, 0xd3, 0x64, 0x26, 0xa3, + 0xe8, 0x3e, 0x14, 0x03, 0xec, 0xb8, 0x1e, 0x69, 0x5b, 0x5e, 0x8f, 0xb0, 0xc8, 0x8c, 0x51, 0x88, + 0xf6, 0xb6, 0xf9, 0x16, 0x7a, 0x00, 0x60, 0xf5, 0x82, 0x00, 0x47, 0x6e, 0x27, 0x2d, 0xe4, 0x63, + 0xa4, 0x45, 0x11, 0x83, 0x05, 0xca, 0x4c, 0x07, 0xb7, 0x93, 0x00, 0xb4, 0x94, 0xd2, 0x52, 0xab, + 0x85, 0x8d, 0x2d, 0x7d, 0x96, 0x03, 0xd5, 0x85, 0x23, 0xfe, 0xeb, 0xe0, 0xa4, 0x2f, 0xfa, 0x82, + 0xb0, 0x20, 0x6c, 0xa4, 0xcf, 0x86, 0x55, 0xc9, 0x50, 0xe9, 0x14, 0xc8, 0xcd, 0x75, 0xcc, 0x20, + 0x70, 0x71, 0xc0, 0xcd, 0xa5, 0x27, 0xcd, 0xc5, 0x48, 0x8b, 0x96, 0x7b, 0xb0, 0xf4, 0x4f, 0x5d, + 0xa4, 0x42, 0x8a, 0x1f, 0x24, 0x8f, 0x9d, 0x37, 0xf8, 0x12, 0xbd, 0x84, 0xcc, 0x91, 0xd9, 0xed, + 0x61, 0x91, 0xb4, 0xb0, 0xf1, 0x64, 0x36, 0xef, 0x89, 0xb0, 0x11, 0x8d, 0x6f, 0x2a, 0xcf, 0xe5, + 0xda, 0x6f, 0x05, 0x0a, 0xe2, 0x96, 0xf1, 0x68, 0x3d, 0x7a, 0x93, 0x3b, 0xb9, 0x03, 0x69, 0xea, + 0x9b, 0xa4, 0x94, 0x11, 0x6e, 0xd6, 0x66, 0x6c, 0xd2, 0x37, 0x49, 0x5c, 0x99, 0x98, 0xe6, 0xa1, + 0x28, 0x33, 0x59, 0x14, 0x6a, 0x7e, 0xd6, 0x50, 0x63, 0xeb, 0xd8, 0x88, 0xc6, 0xd1, 0x3b, 0x80, + 0xe4, 0x78, 0xc5, 0x15, 0xbb, 0x41, 0x43, 0xb1, 0xb3, 0x6b, 0x4a, 0xe8, 0x55, 0xe4, 0x2f, 0x3a, + 0xc1, 0xc2, 0xc6, 0xfa, 0x7f, 0x5c, 0x98, 0x58, 0x2d, 0x9a, 0x5f, 0xfb, 0xa2, 0x00, 0x24, 0xb6, + 0x51, 0x0d, 0x72, 0x6f, 0xc9, 0x01, 0xf1, 0x8e, 0x89, 0x2a, 0x95, 0x97, 0xfa, 0x03, 0x6d, 0x21, + 0x01, 0x63, 0x00, 0x69, 0x90, 0xdd, 0xea, 0x50, 0x4c, 0x98, 0x2a, 0x97, 0x17, 0xfb, 0x03, 0x4d, + 0x4d, 0x28, 0xd1, 0x3e, 0x7a, 0x08, 0xf9, 0x37, 0x01, 0xf6, 0xcd, 0xc0, 0x25, 0x8e, 0xaa, 0x94, + 0xef, 0xf4, 0x07, 0xda, 0xed, 0x84, 0x34, 0x86, 0xd0, 0x0a, 0xcc, 0x45, 0x1f, 0xd8, 0x56, 0x53, + 0xe5, 0xe5, 0xfe, 0x40, 0x43, 0xd3, 0x34, 0x6c, 0xa3, 0x35, 0x28, 0x18, 0xd8, 0xef, 0xba, 0x96, + 0xc9, 0xb8, 0x5e, 0xba, 0x7c, 0xb7, 0x3f, 0xd0, 0x96, 0xae, 0x75, 0x9d, 0x80, 0x5c, 0xb1, 0xc9, + 0x3c, 0x9f, 0xb7, 0xa1, 0x66, 0xa6, 0x15, 0xaf, 0x10, 0x9e, 0x52, 0xac, 0xb1, 0xad, 0x66, 0xa7, + 0x53, 0xc6, 0x40, 0xe3, 0xf5, 0xf9, 0xcf, 0x8a, 0x74, 0x36, 0xaa, 0xc8, 0xe7, 0xa3, 0x8a, 0xfc, + 0x63, 0x54, 0x91, 0x4f, 0x2e, 0x2a, 0xd2, 0xf9, 0x45, 0x45, 0xfa, 0x76, 0x51, 0x91, 0xde, 0xd7, + 0x1d, 0x97, 0xed, 0xf7, 0x3a, 0xba, 0xe5, 0x1d, 0xd6, 0xe3, 0xea, 0xeb, 0x51, 0xf5, 0x75, 0xcb, + 0xb6, 0xea, 0x7f, 0x3d, 0xd7, 0x9d, 0xac, 0x78, 0x6d, 0x9f, 0xfd, 0x09, 0x00, 0x00, 0xff, 0xff, + 0x8b, 0x24, 0x01, 0x79, 0xca, 0x05, 0x00, 0x00, } func (m *Span) Marshal() (dAtA []byte, err error) { @@ -437,6 +447,11 @@ func (m *Checkpoint) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.LastSyncTime != 0 { + i = encodeVarintTable(dAtA, i, uint64(m.LastSyncTime)) + i-- + dAtA[i] = 0x18 + } if m.ResolvedTs != 0 { i = encodeVarintTable(dAtA, i, uint64(m.ResolvedTs)) i-- @@ -618,6 +633,9 @@ func (m *Checkpoint) Size() (n int) { if m.ResolvedTs != 0 { n += 1 + sovTable(uint64(m.ResolvedTs)) } + if m.LastSyncTime != 0 { + n += 1 + sovTable(uint64(m.LastSyncTime)) + } return n } @@ -879,6 +897,25 @@ func (m *Checkpoint) Unmarshal(dAtA []byte) error { break } } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field LastSyncTime", wireType) + } + m.LastSyncTime = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTable + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.LastSyncTime |= Ts(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipTable(dAtA[iNdEx:]) diff --git a/cdc/processor/tablepb/table.proto b/cdc/processor/tablepb/table.proto index 97de3a23e57..d0143cc219c 100644 --- a/cdc/processor/tablepb/table.proto +++ b/cdc/processor/tablepb/table.proto @@ -57,6 +57,7 @@ enum TableState { message Checkpoint { uint64 checkpoint_ts = 1 [(gogoproto.casttype) = "Ts"]; uint64 resolved_ts = 2 [(gogoproto.casttype) = "Ts"]; + uint64 last_sync_time = 3 [(gogoproto.casttype) = "Ts"]; } // Stats holds a statistic for a table. diff --git a/cdc/scheduler/internal/scheduler.go b/cdc/scheduler/internal/scheduler.go index 111911b5632..8e47a779d2a 100644 --- a/cdc/scheduler/internal/scheduler.go +++ b/cdc/scheduler/internal/scheduler.go @@ -47,7 +47,7 @@ type Scheduler interface { // ddl jobs that need to be replicated. The Scheduler will // broadcast the barrierTs to all captures through the Heartbeat. barrier *schedulepb.BarrierWithMinTs, - ) (newCheckpointTs, newResolvedTs model.Ts, err error) + ) (newCheckpointTs, newResolvedTs, newLastSyncTime model.Ts, err error) // MoveTable requests that a table be moved to target. // It is thread-safe. diff --git a/cdc/scheduler/internal/v3/coordinator.go b/cdc/scheduler/internal/v3/coordinator.go index e27bdf49c97..76291c818ab 100644 --- a/cdc/scheduler/internal/v3/coordinator.go +++ b/cdc/scheduler/internal/v3/coordinator.go @@ -126,7 +126,7 @@ func (c *coordinator) Tick( // All captures that are alive according to the latest Etcd states. aliveCaptures map[model.CaptureID]*model.CaptureInfo, barrier *schedulepb.BarrierWithMinTs, -) (newCheckpointTs, newResolvedTs model.Ts, err error) { +) (newCheckpointTs, newResolvedTs, newLastSyncTime model.Ts, err error) { startTime := time.Now() defer func() { costTime := time.Since(startTime) @@ -272,7 +272,7 @@ func (c *coordinator) poll( currentTables []model.TableID, aliveCaptures map[model.CaptureID]*model.CaptureInfo, barrier *schedulepb.BarrierWithMinTs, -) (newCheckpointTs, newResolvedTs model.Ts, err error) { +) (newCheckpointTs, newResolvedTs, newLastSyncTime model.Ts, err error) { c.maybeCollectMetrics() if c.compat.UpdateCaptureInfo(aliveCaptures) { spanReplicationEnabled := c.compat.CheckSpanReplicationEnabled() @@ -283,7 +283,7 @@ func (c *coordinator) poll( recvMsgs, err := c.recvMsgs(ctx) if err != nil { - return checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err) + return checkpointCannotProceed, checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err) } var msgBuf []*schedulepb.Message @@ -295,7 +295,7 @@ func (c *coordinator) poll( // Handle received messages to advance replication set. msgs, err = c.replicationM.HandleMessage(recvMsgs) if err != nil { - return checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err) + return checkpointCannotProceed, checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err) } msgBuf = append(msgBuf, msgs...) @@ -309,13 +309,13 @@ func (c *coordinator) poll( if !c.captureM.CheckAllCaptureInitialized() { // Skip generating schedule tasks for replication manager, // as not all capture are initialized. - newCheckpointTs, newResolvedTs = c.replicationM.AdvanceCheckpoint(&c.tableRanges, pdTime, barrier, c.redoMetaManager) + newCheckpointTs, newResolvedTs, newLastSyncTime = c.replicationM.AdvanceCheckpoint(&c.tableRanges, pdTime, barrier, c.redoMetaManager) // tick capture manager after checkpoint calculation to take account resolvedTs in barrier // when redo is enabled msgs = c.captureM.Tick(c.replicationM.ReplicationSets(), c.schedulerM.DrainingTarget(), barrier.Barrier) msgBuf = append(msgBuf, msgs...) - return newCheckpointTs, newResolvedTs, c.sendMsgs(ctx, msgBuf) + return newCheckpointTs, newResolvedTs, newLastSyncTime, c.sendMsgs(ctx, msgBuf) } // Handle capture membership changes. @@ -323,7 +323,7 @@ func (c *coordinator) poll( msgs, err = c.replicationM.HandleCaptureChanges( changes.Init, changes.Removed, checkpointTs) if err != nil { - return checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err) + return checkpointCannotProceed, checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err) } msgBuf = append(msgBuf, msgs...) } @@ -339,12 +339,12 @@ func (c *coordinator) poll( // Handle generated schedule tasks. msgs, err = c.replicationM.HandleTasks(allTasks) if err != nil { - return checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err) + return checkpointCannotProceed, checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err) } msgBuf = append(msgBuf, msgs...) // Checkpoint calculation - newCheckpointTs, newResolvedTs = c.replicationM.AdvanceCheckpoint(&c.tableRanges, pdTime, barrier, c.redoMetaManager) + newCheckpointTs, newResolvedTs, newLastSyncTime = c.replicationM.AdvanceCheckpoint(&c.tableRanges, pdTime, barrier, c.redoMetaManager) // tick capture manager after checkpoint calculation to take account resolvedTs in barrier // when redo is enabled @@ -355,10 +355,10 @@ func (c *coordinator) poll( // Send new messages. err = c.sendMsgs(ctx, msgBuf) if err != nil { - return checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err) + return checkpointCannotProceed, checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err) } - return newCheckpointTs, newResolvedTs, nil + return newCheckpointTs, newResolvedTs, newLastSyncTime, nil } func (c *coordinator) recvMsgs(ctx context.Context) ([]*schedulepb.Message, error) { diff --git a/cdc/scheduler/internal/v3/replication/replication_manager.go b/cdc/scheduler/internal/v3/replication/replication_manager.go index 4228cc0ec75..6e2fdea5e19 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager.go @@ -557,9 +557,9 @@ func (r *Manager) AdvanceCheckpoint( currentPDTime time.Time, barrier *schedulepb.BarrierWithMinTs, redoMetaManager redo.MetaManager, -) (newCheckpointTs, newResolvedTs model.Ts) { +) (newCheckpointTs, newResolvedTs model.Ts, newLastSyncTime model.Ts) { var redoFlushedResolvedTs model.Ts - limitBarrierWithRedo := func(newCheckpointTs, newResolvedTs uint64) (uint64, uint64) { + limitBarrierWithRedo := func(newCheckpointTs, newResolvedTs, newLastSyncTime uint64) (uint64, uint64, uint64) { flushedMeta := redoMetaManager.GetFlushedMeta() redoFlushedResolvedTs = flushedMeta.ResolvedTs log.Debug("owner gets flushed redo meta", @@ -578,7 +578,7 @@ func (r *Manager) AdvanceCheckpoint( if barrier.GlobalBarrierTs > newResolvedTs { barrier.GlobalBarrierTs = newResolvedTs } - return newCheckpointTs, newResolvedTs + return newCheckpointTs, newResolvedTs, newLastSyncTime } defer func() { if redoFlushedResolvedTs != 0 && barrier.GlobalBarrierTs > redoFlushedResolvedTs { @@ -594,7 +594,7 @@ func (r *Manager) AdvanceCheckpoint( r.slowestSink = tablepb.Span{} var slowestPullerResolvedTs uint64 = math.MaxUint64 - newCheckpointTs, newResolvedTs = math.MaxUint64, math.MaxUint64 + newCheckpointTs, newResolvedTs, newLastSyncTime = math.MaxUint64, math.MaxUint64, 0 cannotProceed := false currentTables.Iter(func(tableID model.TableID, tableStart, tableEnd tablepb.Span) bool { tableSpanFound, tableHasHole := false, false @@ -629,6 +629,9 @@ func (r *Manager) AdvanceCheckpoint( newResolvedTs = table.Checkpoint.ResolvedTs } + if newLastSyncTime < table.Checkpoint.LastSyncTime { + newLastSyncTime = table.Checkpoint.LastSyncTime + } // Find the minimum puller resolved ts. if pullerCkpt, ok := table.Stats.StageCheckpoints["puller-egress"]; ok { if slowestPullerResolvedTs > pullerCkpt.ResolvedTs { @@ -664,9 +667,9 @@ func (r *Manager) AdvanceCheckpoint( if redoMetaManager.Enabled() { // If redo is enabled, GlobalBarrierTs should be limited by redo flushed meta. newResolvedTs = barrier.RedoBarrierTs - limitBarrierWithRedo(newCheckpointTs, newResolvedTs) + limitBarrierWithRedo(newCheckpointTs, newResolvedTs, newLastSyncTime) } - return checkpointCannotProceed, checkpointCannotProceed + return checkpointCannotProceed, checkpointCannotProceed, checkpointCannotProceed } // If currentTables is empty, we should advance newResolvedTs to global barrier ts and @@ -714,10 +717,10 @@ func (r *Manager) AdvanceCheckpoint( zap.String("changefeed", r.changefeedID.ID), zap.Uint64("newCheckpointTs", newCheckpointTs), zap.Uint64("newResolvedTs", newResolvedTs)) - return limitBarrierWithRedo(newCheckpointTs, newResolvedTs) + return limitBarrierWithRedo(newCheckpointTs, newResolvedTs, newLastSyncTime) } - return newCheckpointTs, newResolvedTs + return newCheckpointTs, newResolvedTs, newLastSyncTime } func (r *Manager) logSlowTableInfo(currentPDTime time.Time) { diff --git a/cdc/scheduler/internal/v3/replication/replication_set.go b/cdc/scheduler/internal/v3/replication/replication_set.go index fec432499a8..d99a297e65c 100644 --- a/cdc/scheduler/internal/v3/replication/replication_set.go +++ b/cdc/scheduler/internal/v3/replication/replication_set.go @@ -1020,6 +1020,10 @@ func (r *ReplicationSet) updateCheckpointAndStats( zap.Any("checkpointTs", r.Checkpoint.CheckpointTs), zap.Any("resolvedTs", r.Checkpoint.ResolvedTs)) } + + if r.Checkpoint.LastSyncTime < checkpoint.LastSyncTime { + r.Checkpoint.LastSyncTime = checkpoint.LastSyncTime + } r.Stats = stats } diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go index 14043124524..b3e235578b0 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go @@ -224,6 +224,7 @@ func (s *DMLSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa if txn.GetTableSinkState() != state.TableSinkSinking { // The table where the event comes from is in stopping, so it's safe // to drop the event directly. + // 这个状态是在干嘛的? txn.Callback() continue } diff --git a/cdc/sink/tablesink/table_sink.go b/cdc/sink/tablesink/table_sink.go index 588b30aabf9..6dac7c87417 100644 --- a/cdc/sink/tablesink/table_sink.go +++ b/cdc/sink/tablesink/table_sink.go @@ -32,6 +32,9 @@ type TableSink interface { // For example, calculating the current progress from the statistics of the table sink. // This is a thread-safe method. GetCheckpointTs() model.ResolvedTs + // GetLastSyncTime returns the lastSyncTime of table sink. + // This is a thread-safe method. + GetLastSyncTime() uint64 // Close closes the table sink. // After it returns, no more events will be sent out from this capture. Close() diff --git a/cdc/sink/tablesink/table_sink_impl.go b/cdc/sink/tablesink/table_sink_impl.go index da067501727..76908c9fac0 100644 --- a/cdc/sink/tablesink/table_sink_impl.go +++ b/cdc/sink/tablesink/table_sink_impl.go @@ -15,6 +15,7 @@ package tablesink import ( "sort" + "sync" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" @@ -31,7 +32,13 @@ var ( _ TableSink = (*EventTableSink[*model.SingleTableTxn, *dmlsink.TxnEventAppender])(nil) ) +type LastSyncTimeRecord struct { + sync.Mutex + lastSyncTime uint64 +} + // EventTableSink is a table sink that can write events. + type EventTableSink[E dmlsink.TableEvent, P dmlsink.Appender[E]] struct { changefeedID model.ChangeFeedID span tablepb.Span @@ -46,6 +53,8 @@ type EventTableSink[E dmlsink.TableEvent, P dmlsink.Appender[E]] struct { eventBuffer []E state state.TableSinkState + lastSyncTime LastSyncTimeRecord + // For dataflow metrics. metricsTableSinkTotalRows prometheus.Counter } @@ -69,6 +78,7 @@ func New[E dmlsink.TableEvent, P dmlsink.Appender[E]]( eventAppender: appender, eventBuffer: make([]E, 0, 1024), state: state.TableSinkSinking, + lastSyncTime: LastSyncTimeRecord{lastSyncTime: 0}, // use 0 to initialize lastSyncTime metricsTableSinkTotalRows: totalRowsCounter, } } @@ -115,8 +125,20 @@ func (e *EventTableSink[E, P]) UpdateResolvedTs(resolvedTs model.ResolvedTs) err } // We have to record the event ID for the callback. ce := &dmlsink.CallbackableEvent[E]{ - Event: ev, - Callback: e.progressTracker.addEvent(), + Event: ev, + Callback: func() { + // Due to multi workers will call this callback concurrently, + // we need to add lock to protect lastSyncTime + // we need make a performance test for it + { + e.lastSyncTime.Lock() + defer e.lastSyncTime.Unlock() + if e.lastSyncTime.lastSyncTime < ev.GetCommitTs() { + e.lastSyncTime.lastSyncTime = ev.GetCommitTs() + } + } + e.progressTracker.addEvent() + }, SinkState: &e.state, } resolvedCallbackableEvents = append(resolvedCallbackableEvents, ce) @@ -140,6 +162,13 @@ func (e *EventTableSink[E, P]) GetCheckpointTs() model.ResolvedTs { return e.progressTracker.advance() } +// GetLastSyncTime returns the lastSyncTime ts of the table sink. +func (e *EventTableSink[E, P]) GetLastSyncTime() uint64 { + e.lastSyncTime.Lock() + defer e.lastSyncTime.Unlock() + return e.lastSyncTime.lastSyncTime +} + // Close closes the table sink. // After it returns, no more events will be sent out from this capture. func (e *EventTableSink[E, P]) Close() {