Skip to content

Commit

Permalink
puller(ticdc): fix resolvedTs get stuck when region split and merge (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jan 19, 2024
1 parent 600ad4a commit 2d5a9f6
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
3 changes: 2 additions & 1 deletion cdc/puller/frontier/frontier.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (s *spanFrontier) Frontier() uint64 {
func (s *spanFrontier) Forward(regionID uint64, span tablepb.Span, ts uint64) {
// it's the fast part to detect if the region is split or merged,
// if not we can update the minTsHeap with use new ts directly
if n, ok := s.cachedRegions[regionID]; ok && n.regionID != fakeRegionID && n.end != nil {
if n, ok := s.cachedRegions[regionID]; ok && n.regionID == regionID && n.end != nil {
if bytes.Equal(n.Key(), span.StartKey) && bytes.Equal(n.End(), span.EndKey) {
s.minTsHeap.UpdateKey(n.Value(), ts)
return
Expand All @@ -100,6 +100,7 @@ func (s *spanFrontier) insert(regionID uint64, span tablepb.Span, ts uint64) {
if bytes.Equal(seekRes.Node().Key(), span.StartKey) &&
bytes.Equal(next.Key(), span.EndKey) {
s.minTsHeap.UpdateKey(seekRes.Node().Value(), ts)
delete(s.cachedRegions, seekRes.Node().regionID)
if regionID != fakeRegionID {
s.cachedRegions[regionID] = seekRes.Node()
s.cachedRegions[regionID].regionID = regionID
Expand Down
19 changes: 19 additions & 0 deletions cdc/puller/frontier/frontier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package frontier

import (
"bytes"
"fmt"
"math"
"math/rand"
"sort"
Expand Down Expand Up @@ -435,3 +436,21 @@ func TestFrontierEntries(t *testing.T) {
require.Equal(t, []byte("a"), []byte(slowestRange.StartKey))
require.Equal(t, []byte("b"), []byte(slowestRange.EndKey))
}

func TestMergeSpitWithDifferentRegionID(t *testing.T) {
frontier := NewFrontier(100, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("c")})
frontier.Forward(1, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, 1222)
frontier.Forward(2, tablepb.Span{StartKey: []byte("b"), EndKey: []byte("c")}, 102)
frontier.Forward(4, tablepb.Span{StartKey: []byte("b"), EndKey: []byte("c")}, 103)
frontier.Forward(1, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("c")}, 104)
frontier.Forward(1, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, 1223)
frontier.Forward(3, tablepb.Span{StartKey: []byte("b"), EndKey: []byte("c")}, 105)
frontier.Forward(2, tablepb.Span{StartKey: []byte("b"), EndKey: []byte("c")}, 107)
frontier.(*spanFrontier).spanList.Entries(func(node *skipListNode) bool {
fmt.Printf("%d:[%s: %s) %d\n", node.regionID,
string(node.Key()),
string(node.End()), node.value.key)
return true
})
require.Equal(t, uint64(107), frontier.Frontier())
}

0 comments on commit 2d5a9f6

Please sign in to comment.