Skip to content

Commit

Permalink
cdc: fixes minor bugs #10168 and #10169 (#10170)
Browse files Browse the repository at this point in the history
close #10168
  • Loading branch information
hicqu authored Nov 29, 2023
1 parent 21d17c0 commit eb96b96
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 5 deletions.
4 changes: 4 additions & 0 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ const (
// failed region will be reloaded via `BatchLoadRegionsWithKeyRange` API. So we
// don't need to force reload region anymore.
regionScheduleReload = false

scanRegionsConcurrency = 1024
)

// time interval to force kv client to terminate gRPC stream and reconnect
Expand Down Expand Up @@ -432,6 +434,8 @@ func (s *eventFeedSession) eventFeed(ctx context.Context) error {
g.Go(func() error { return s.logSlowRegions(ctx) })

g.Go(func() error {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(scanRegionsConcurrency)
for {
select {
case <-ctx.Done():
Expand Down
6 changes: 4 additions & 2 deletions cdc/kv/shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (s *SharedClient) Run(ctx context.Context) error {
s.workers = append(s.workers, worker)
}

g.Go(func() error { return s.handleRequestRanges(ctx, g) })
g.Go(func() error { return s.handleRequestRanges(ctx) })
g.Go(func() error { return s.dispatchRequest(ctx) })
g.Go(func() error { return s.requestRegionToStore(ctx, g) })
g.Go(func() error { return s.handleErrors(ctx) })
Expand Down Expand Up @@ -427,7 +427,9 @@ func (s *SharedClient) broadcastRequest(r *requestedStore, sri singleRegionInfo)
}
}

func (s *SharedClient) handleRequestRanges(ctx context.Context, g *errgroup.Group) error {
func (s *SharedClient) handleRequestRanges(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(scanRegionsConcurrency)
for {
select {
case <-ctx.Done():
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/sinkmanager/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ var (
maxUpdateIntervalSize = defaultMaxUpdateIntervalSize

// Sink manager schedules table tasks based on lag. Limit the max task range
// can be helpful to reduce changefeed latency.
maxTaskTimeRange = 5 * time.Second
// can be helpful to reduce changefeed latency for large initial data.
maxTaskTimeRange = 30 * time.Minute
)

// Used to record the progress of the table.
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestValidateAndAdjustBound(t *testing.T) {
StartTs: 439333515018895365,
CommitTs: 439333515018895366,
},
taskTimeRange: 10 * time.Second,
taskTimeRange: 60 * time.Minute,
expectAdjust: true,
},
{
Expand Down

0 comments on commit eb96b96

Please sign in to comment.