Skip to content

Commit

Permalink
cdc: fail the changefeed blocking global gc and suppressing the gc-ttl (
Browse files Browse the repository at this point in the history
#9304)

close #9303
  • Loading branch information
charleszheng44 authored Jun 29, 2023
1 parent 70eab3f commit dcf6f85
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 14 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ error = '''
flow controller is aborted
'''

["CDC:ErrGCTTLExceeded"]
error = '''
the checkpoint-ts(%d) lag of the changefeed(%s) has exceeded the GC TTL and the changefeed is blocking global GC progression
'''

["CDC:ErrGRPCDialFailed"]
error = '''
grpc dial failed
Expand Down
5 changes: 5 additions & 0 deletions pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,11 @@ var (
" caused by GC. checkpoint-ts %d is earlier than or equal to GC safepoint at %d",
errors.RFCCodeText("CDC:ErrSnapshotLostByGC"),
)
ErrGCTTLExceeded = errors.Normalize(
"the checkpoint-ts(%d) lag of the changefeed(%s) has exceeded "+
"the GC TTL and the changefeed is blocking global GC progression",
errors.RFCCodeText("CDC:ErrGCTTLExceeded"),
)
ErrNotOwner = errors.Normalize(
"this capture is not a owner",
errors.RFCCodeText("CDC:ErrNotOwner"),
Expand Down
2 changes: 1 addition & 1 deletion pkg/errors/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func WrapError(rfcError *errors.Error, err error, args ...interface{}) error {
// wants to replicate has been or will be GC. So it makes no sense to try to
// resume the changefeed, and the changefeed should immediately be failed.
var changeFeedFastFailError = []*errors.Error{
ErrSnapshotLostByGC, ErrStartTsBeforeGC,
ErrGCTTLExceeded, ErrSnapshotLostByGC, ErrStartTsBeforeGC,
}

// IsChangefeedFastFailError checks if an error is a ChangefeedFastFailError
Expand Down
40 changes: 27 additions & 13 deletions pkg/txnutil/gc/gc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ import (
"go.uber.org/zap"
)

// gcTTL is the duration during which data related to a
// failed feed will be retained, and beyond which point the data will be deleted
// by garbage collection.
const gcTTL = 24 * time.Hour

// gcSafepointUpdateInterval is the minimum interval that CDC can update gc safepoint
var gcSafepointUpdateInterval = 1 * time.Minute

Expand All @@ -57,6 +52,7 @@ type gcManager struct {
lastUpdatedTime time.Time
lastSucceededTime time.Time
lastSafePointTs uint64
isTiCDCBlockGC bool
}

// NewManager creates a new Manager.
Expand Down Expand Up @@ -103,6 +99,10 @@ func (m *gcManager) TryUpdateGCSafePoint(
log.Warn("update gc safe point failed, the gc safe point is larger than checkpointTs",
zap.Uint64("actual", actual), zap.Uint64("checkpointTs", checkpointTs))
}
// if the min checkpoint ts is equal to the current gc safe point, it
// means that the service gc safe point set by TiCDC is the min service
// gc safe point
m.isTiCDCBlockGC = actual == checkpointTs
m.lastSafePointTs = actual
m.lastSucceededTime = time.Now()
return nil
Expand All @@ -112,13 +112,27 @@ func (m *gcManager) CheckStaleCheckpointTs(
ctx context.Context, changefeedID model.ChangeFeedID, checkpointTs model.Ts,
) error {
gcSafepointUpperBound := checkpointTs - 1
// if there is another service gc point less than the min checkpoint ts.
if gcSafepointUpperBound < m.lastSafePointTs {
return cerror.ErrSnapshotLostByGC.
GenWithStackByArgs(
checkpointTs,
m.lastSafePointTs,
)
if m.isTiCDCBlockGC {
pdTime := m.pdClock.CurrentTime()
if pdTime.Sub(
oracle.GetTimeFromTS(gcSafepointUpperBound),
) > time.Duration(m.gcTTL)*time.Second {
return cerror.ErrGCTTLExceeded.
GenWithStackByArgs(
checkpointTs,
changefeedID,
)
}
} else {
// if `isTiCDCBlockGC` is false, it means there is another service gc
// point less than the min checkpoint ts.
if gcSafepointUpperBound < m.lastSafePointTs {
return cerror.ErrSnapshotLostByGC.
GenWithStackByArgs(
checkpointTs,
m.lastSafePointTs,
)
}
}
return nil
}
Expand All @@ -132,5 +146,5 @@ func (m *gcManager) IgnoreFailedChangeFeed(
gcSafepointUpperBound := checkpointTs - 1
return pdTime.Sub(
oracle.GetTimeFromTS(gcSafepointUpperBound),
) > gcTTL
) > time.Duration(m.gcTTL)*time.Second
}
1 change: 1 addition & 0 deletions pkg/txnutil/gc/gc_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func TestIgnoreFailedFeed(t *testing.T) {
pdClock := pdutil.NewClock4Test()
gcManager := NewManager(etcd.GcServiceIDForTest(),
mockPDClient, pdClock).(*gcManager)
gcManager.gcTTL = 24 * 60 * 60

// 5 hours ago
ts1 := oracle.GoTimeToTS(time.Now().Add(-time.Hour * 5))
Expand Down

0 comments on commit dcf6f85

Please sign in to comment.