Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan committed Nov 22, 2023
1 parent 68dc49c commit 2898d74
Show file tree
Hide file tree
Showing 18 changed files with 289 additions and 82 deletions.
1 change: 1 addition & 0 deletions cdc/api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {
changefeedGroup.POST("/:changefeed_id/resume", changefeedOwnerMiddleware, api.resumeChangefeed)
changefeedGroup.POST("/:changefeed_id/pause", changefeedOwnerMiddleware, api.pauseChangefeed)
changefeedGroup.GET("/:changefeed_id/status", changefeedOwnerMiddleware, api.status)
changefeedGroup.GET("/:changefeed_id/synced", changefeedOwnerMiddleware, api.synced)

// capture apis
captureGroup := v2.Group("/captures")
Expand Down
78 changes: 78 additions & 0 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,84 @@ func (h *OpenAPIV2) status(c *gin.Context) {
})
}

func (h *OpenAPIV2) synced(c *gin.Context) {
ctx := c.Request.Context()

namespace := getNamespaceValueWithDefault(c)
changefeedID := model.ChangeFeedID{Namespace: namespace, ID: c.Param(apiOpVarChangefeedID)}
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}
status, err := h.capture.StatusProvider().GetChangeFeedSyncedStatus(
ctx,
changefeedID,
)
if err != nil {
_ = c.Error(err)
return
}

// get pd_now
cfg := &ChangefeedConfig{ReplicaConfig: GetDefaultReplicaConfig()}

if err := c.BindJSON(&cfg); err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}
if len(cfg.PDAddrs) == 0 {
up, err := getCaptureDefaultUpstream(h.capture)
if err != nil {
_ = c.Error(err)
return
}
cfg.PDConfig = getUpstreamPDConfig(up)
}
credential := cfg.PDConfig.toCredential()

timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
defer pdClient.Close()
if err != nil {
// _ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err))
// return
// means pd is offline
c.JSON(http.StatusOK, map[string]any{
"Synced": false,
"CheckpointTs": status.CheckpointTs,
"ResolvedTs": status.ResolvedTs,
"LastSyncTime": status.LastSyncTime,
"info": "xxxxx",
})
return
}

//TSO 是啥?
now, _, _ := pdClient.GetTS(ctx)
// get pd_now
// 随便写一个先
if (now-int64(status.LastSyncTime) > 60*5) && (now-int64(status.CheckpointTs) < 5) {
c.JSON(http.StatusOK, map[string]any{
"Synced": true,
"CheckpointTs": status.CheckpointTs,
"ResolvedTs": status.ResolvedTs,
"LastSyncTime": status.LastSyncTime,
"info": "",
})
} else {
c.JSON(http.StatusOK, map[string]any{
"Synced": false,
"CheckpointTs": status.CheckpointTs,
"ResolvedTs": status.ResolvedTs,
"LastSyncTime": status.LastSyncTime,
"info": "xxxxx",
})
}

}

func toAPIModel(
info *model.ChangeFeedInfo,
resolvedTs uint64,
Expand Down
6 changes: 6 additions & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,3 +629,9 @@ type ChangeFeedStatusForAPI struct {
ResolvedTs uint64 `json:"resolved-ts"`
CheckpointTs uint64 `json:"checkpoint-ts"`
}

type ChangeFeedSyncedStatusForAPI struct {
ResolvedTs uint64 `json:"resolved-ts"`
CheckpointTs uint64 `json:"checkpoint-ts"`
LastSyncTime uint64 `json:"last-sync-time"`
}
32 changes: 17 additions & 15 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type changefeed struct {
barriers *barriers
feedStateManager FeedStateManager
resolvedTs model.Ts
lastSyncTime model.Ts

// ddl related fields
ddlManager *ddlManager
Expand Down Expand Up @@ -272,7 +273,8 @@ func (c *changefeed) Tick(ctx cdcContext.Context,
}
return nil
})
checkpointTs, minTableBarrierTs, err := c.tick(ctx, captures)
checkpointTs, minTableBarrierTs, maxLastSyncTime, err := c.tick(ctx, captures)
c.lastSyncTime = maxLastSyncTime

// The tick duration is recorded only if changefeed has completed initialization
if c.initialized {
Expand Down Expand Up @@ -350,50 +352,50 @@ func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context,
// tick returns the checkpointTs and minTableBarrierTs.
func (c *changefeed) tick(ctx cdcContext.Context,
captures map[model.CaptureID]*model.CaptureInfo,
) (model.Ts, model.Ts, error) {
) (model.Ts, model.Ts, model.Ts, error) {
adminJobPending := c.feedStateManager.Tick(c.resolvedTs, c.latestStatus, c.latestInfo)
preCheckpointTs := c.latestInfo.GetCheckpointTs(c.latestStatus)
// checkStaleCheckpointTs must be called before `feedStateManager.ShouldRunning()`
// to ensure all changefeeds, no matter whether they are running or not, will be checked.
if err := c.checkStaleCheckpointTs(ctx, c.latestInfo, preCheckpointTs); err != nil {
return 0, 0, errors.Trace(err)
return 0, 0, 0, errors.Trace(err)
}

if !c.feedStateManager.ShouldRunning() {
c.isRemoved = c.feedStateManager.ShouldRemoved()
c.releaseResources(ctx)
return 0, 0, nil
return 0, 0, 0, nil
}

if adminJobPending {
return 0, 0, nil
return 0, 0, 0, nil
}

if err := c.initialize(ctx); err != nil {
return 0, 0, errors.Trace(err)
return 0, 0, 0, errors.Trace(err)
}

select {
case err := <-c.errCh:
return 0, 0, errors.Trace(err)
return 0, 0, 0, errors.Trace(err)
default:
}

if c.redoMetaMgr.Enabled() {
if !c.redoMetaMgr.Running() {
return 0, 0, nil
return 0, 0, 0, nil
}
}

// TODO: pass table checkpointTs when we support concurrent process ddl
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil)
if err != nil {
return 0, 0, errors.Trace(err)
return 0, 0, 0, errors.Trace(err)
}

err = c.handleBarrier(ctx, c.latestInfo, c.latestStatus, barrier)
if err != nil {
return 0, 0, errors.Trace(err)
return 0, 0, 0, errors.Trace(err)
}

log.Debug("owner handles barrier",
Expand All @@ -409,14 +411,14 @@ func (c *changefeed) tick(ctx cdcContext.Context,
// This condition implies that the DDL resolved-ts has not yet reached checkpointTs,
// which implies that it would be premature to schedule tables or to update status.
// So we return here.
return 0, 0, nil
return 0, 0, 0, nil
}

newCheckpointTs, newResolvedTs, err := c.scheduler.Tick(
newCheckpointTs, newResolvedTs, newLastSyncTime, err := c.scheduler.Tick(
ctx, preCheckpointTs, allPhysicalTables, captures,
barrier)
if err != nil {
return 0, 0, errors.Trace(err)
return 0, 0, 0, errors.Trace(err)
}

pdTime := c.upstream.PDClock.CurrentTime()
Expand All @@ -430,7 +432,7 @@ func (c *changefeed) tick(ctx cdcContext.Context,
// advance the watermarks for now.
c.updateMetrics(currentTs, c.latestStatus.CheckpointTs, c.resolvedTs)
}
return 0, 0, nil
return 0, 0, 0, nil
}

log.Debug("owner prepares to update status",
Expand Down Expand Up @@ -463,7 +465,7 @@ func (c *changefeed) tick(ctx cdcContext.Context,
c.updateMetrics(currentTs, newCheckpointTs, c.resolvedTs)
c.tickDownstreamObserver(ctx)

return newCheckpointTs, barrier.MinTableBarrierTs, nil
return newCheckpointTs, barrier.MinTableBarrierTs, newLastSyncTime, nil
}

func (c *changefeed) initialize(ctx cdcContext.Context) (err error) {
Expand Down
13 changes: 12 additions & 1 deletion cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ func (o *ownerImpl) handleJobs(ctx context.Context) {

func (o *ownerImpl) handleQueries(query *Query) error {
switch query.Tp {
case QueryChangeFeedStatuses:
case QueryChangeFeedSyncedStatus:
cfReactor, ok := o.changefeeds[query.ChangeFeedID]
if !ok {
query.Data = nil
Expand All @@ -597,6 +597,17 @@ func (o *ownerImpl) handleQueries(query *Query) error {
ret.ResolvedTs = cfReactor.resolvedTs
ret.CheckpointTs = cfReactor.latestStatus.CheckpointTs
query.Data = ret
case QueryChangeFeedStatuses:
cfReactor, ok := o.changefeeds[query.ChangeFeedID]
if !ok {
query.Data = nil
return nil
}
ret := &model.ChangeFeedSyncedStatusForAPI{}
ret.ResolvedTs = cfReactor.resolvedTs
ret.LastSyncTime = cfReactor.lastSyncTime
ret.CheckpointTs = cfReactor.latestStatus.CheckpointTs
query.Data = ret
case QueryChangefeedInfo:
cfReactor, ok := o.changefeeds[query.ChangeFeedID]
if !ok {
Expand Down
20 changes: 20 additions & 0 deletions cdc/owner/status_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type StatusProvider interface {
// GetChangeFeedStatus returns a changefeeds' runtime status.
GetChangeFeedStatus(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedStatusForAPI, error)

GetChangeFeedSyncedStatus(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedSyncedStatusForAPI, error)

// GetChangeFeedInfo returns a changefeeds' info.
GetChangeFeedInfo(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedInfo, error)

Expand Down Expand Up @@ -63,6 +65,8 @@ const (
QueryChangefeedInfo
// QueryChangeFeedStatuses is the type of query changefeed status
QueryChangeFeedStatuses
// QueryChangeFeedSyncedStatus is the type of query changefeed synced status
QueryChangeFeedSyncedStatus
)

// Query wraps query command and return results.
Expand Down Expand Up @@ -98,6 +102,22 @@ func (p *ownerStatusProvider) GetChangeFeedStatus(ctx context.Context,
return query.Data.(*model.ChangeFeedStatusForAPI), nil
}

func (p *ownerStatusProvider) GetChangeFeedSyncedStatus(ctx context.Context,
changefeedID model.ChangeFeedID,
) (*model.ChangeFeedSyncedStatusForAPI, error) {
query := &Query{
Tp: QueryChangeFeedSyncedStatus,
ChangeFeedID: changefeedID,
}
if err := p.sendQueryToOwner(ctx, query); err != nil {
return nil, errors.Trace(err)
}
if query.Data == nil {
return nil, cerror.ErrChangeFeedNotExists.GenWithStackByArgs(changefeedID)
}
return query.Data.(*model.ChangeFeedSyncedStatusForAPI), nil
}

func (p *ownerStatusProvider) GetChangeFeedInfo(ctx context.Context,
changefeedID model.ChangeFeedID,
) (*model.ChangeFeedInfo, error) {
Expand Down
1 change: 1 addition & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ func (p *processor) GetTableSpanStatus(span tablepb.Span, collectStat bool) tabl
Checkpoint: tablepb.Checkpoint{
CheckpointTs: sinkStats.CheckpointTs,
ResolvedTs: sinkStats.ResolvedTs,
LastSyncTime: sinkStats.LastSyncTime,
},
State: state,
Stats: stats,
Expand Down
4 changes: 4 additions & 0 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (
type TableStats struct {
CheckpointTs model.Ts
ResolvedTs model.Ts
LastSyncTime model.Ts
BarrierTs model.Ts
}

Expand Down Expand Up @@ -140,6 +141,7 @@ func New(
redoDMLMgr redo.DMLManager,
sourceManager *sourcemanager.SourceManager,
) *SinkManager {

m := &SinkManager{
changefeedID: changefeedID,
changefeedInfo: changefeedInfo,
Expand Down Expand Up @@ -961,6 +963,7 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats {
tableSink := value.(*tableSinkWrapper)

checkpointTs := tableSink.getCheckpointTs()
lastSyncTime := tableSink.getLastSyncTime()
m.sinkMemQuota.Release(span, checkpointTs)
m.redoMemQuota.Release(span, checkpointTs)

Expand Down Expand Up @@ -1003,6 +1006,7 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats {
return TableStats{
CheckpointTs: checkpointTs.ResolvedMark(),
ResolvedTs: resolvedTs,
LastSyncTime: lastSyncTime,
BarrierTs: tableSink.barrierTs.Load(),
}
}
Expand Down
6 changes: 6 additions & 0 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ func (t *tableSinkWrapper) updateResolvedTs(ts model.ResolvedTs) error {
return t.tableSink.s.UpdateResolvedTs(ts)
}

func (t *tableSinkWrapper) getLastSyncTime() uint64 {
t.tableSink.RLock()
defer t.tableSink.RUnlock()
return t.tableSink.s.GetLastSyncTime()
}

func (t *tableSinkWrapper) getCheckpointTs() model.ResolvedTs {
t.tableSink.RLock()
defer t.tableSink.RUnlock()
Expand Down
Loading

0 comments on commit 2898d74

Please sign in to comment.