diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index b38639221bb..32cc523c90f 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -41,6 +41,7 @@ import ( var ( regionWorkerPool workerpool.WorkerPool workerPoolOnce sync.Once + workerPoolLock sync.Mutex // The magic number here is keep the same with some magic numbers in some // other components in TiCDC, including worker pool task chan size, mounter // chan size etc. @@ -407,6 +408,8 @@ func (w *regionWorker) processEvent(ctx context.Context, event *regionStatefulEv func (w *regionWorker) initPoolHandles() { handles := make([]workerpool.EventHandle, 0, w.concurrency) + workerPoolLock.Lock() + defer workerPoolLock.Unlock() for i := 0; i < w.concurrency; i++ { poolHandle := regionWorkerPool.RegisterEvent(func(ctx context.Context, eventI interface{}) error { event := eventI.(*regionStatefulEvent) @@ -862,6 +865,8 @@ func getWorkerPoolSize() (size int) { func InitWorkerPool() { workerPoolOnce.Do(func() { size := getWorkerPoolSize() + workerPoolLock.Lock() + defer workerPoolLock.Unlock() regionWorkerPool = workerpool.NewDefaultWorkerPool(size) }) } diff --git a/cdc/server/server.go b/cdc/server/server.go index f57ab945a11..b029e9256d6 100644 --- a/cdc/server/server.go +++ b/cdc/server/server.go @@ -344,10 +344,6 @@ func (s *server) run(ctx context.Context) (err error) { eg, egCtx := errgroup.WithContext(ctx) - eg.Go(func() error { - return s.capture.Run(egCtx) - }) - eg.Go(func() error { return s.upstreamPDHealthChecker(egCtx) }) @@ -372,6 +368,10 @@ func (s *server) run(ctx context.Context) (err error) { return nil }) + eg.Go(func() error { + return s.capture.Run(egCtx) + }) + return eg.Wait() }