diff --git a/cdc/kv/client.go b/cdc/kv/client.go index b798a50d30b..e6ac06b0cb0 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -378,9 +378,10 @@ func newEventFeedSession( startTs uint64, eventCh chan<- model.RegionFeedEvent, ) *eventFeedSession { - id := strconv.FormatUint(allocID(), 10) + id := allocID() + idStr := strconv.FormatUint(id, 10) rangeLock := regionlock.NewRegionRangeLock( - totalSpan.StartKey, totalSpan.EndKey, startTs, + id, totalSpan.StartKey, totalSpan.EndKey, startTs, client.changefeed.Namespace+"."+client.changefeed.ID) return &eventFeedSession{ client: client, @@ -393,7 +394,7 @@ func newEventFeedSession( eventCh: eventCh, rangeLock: rangeLock, lockResolver: lockResolver, - id: id, + id: idStr, regionChSizeGauge: clientChannelSize.WithLabelValues("region"), errChSizeGauge: clientChannelSize.WithLabelValues("err"), rangeChSizeGauge: clientChannelSize.WithLabelValues("range"), @@ -1015,7 +1016,7 @@ func (s *eventFeedSession) receiveFromStream( // always create a new region worker, because `receiveFromStream` is ensured // to call exactly once from outer code logic - worker := newRegionWorker(parentCtx, s.changefeed, s, addr) + worker := newRegionWorker(parentCtx, s.changefeed, s, addr, pendingRegions) defer worker.evictAllRegions() ctx, cancel := context.WithCancel(parentCtx) @@ -1061,7 +1062,7 @@ func (s *eventFeedSession) receiveFromStream( }) if err != nil { if status.Code(errors.Cause(err)) == codes.Canceled { - log.Debug( + log.Info( "receive from stream canceled", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), diff --git a/cdc/kv/metrics.go b/cdc/kv/metrics.go index 6b6127f43c4..fb0d6d90a9b 100644 --- a/cdc/kv/metrics.go +++ b/cdc/kv/metrics.go @@ -145,6 +145,11 @@ var ( []string{"namespace", "changefeed"}) ) +// GetGlobalGrpcMetrics gets the global grpc metrics. +func GetGlobalGrpcMetrics() *grpc_prometheus.ClientMetrics { + return grpcMetrics +} + // InitMetrics registers all metrics in the kv package func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(eventFeedErrorCounter) diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index b38639221bb..3934f33caa4 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -113,6 +113,8 @@ type regionWorker struct { // how many pending input events inputPending int32 + + pendingRegions *syncRegionFeedStateMap } func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetrics { @@ -146,6 +148,7 @@ func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetric func newRegionWorker( ctx context.Context, changefeedID model.ChangeFeedID, s *eventFeedSession, addr string, + pendingRegions *syncRegionFeedStateMap, ) *regionWorker { return ®ionWorker{ parentCtx: ctx, @@ -160,6 +163,8 @@ func newRegionWorker( concurrency: int(s.client.config.KVClient.WorkerConcurrent), metrics: newRegionWorkerMetrics(changefeedID), inputPending: 0, + + pendingRegions: pendingRegions, } } @@ -195,7 +200,7 @@ func (w *regionWorker) checkShouldExit() error { empty := w.checkRegionStateEmpty() // If there is no region maintained by this region worker, exit it and // cancel the gRPC stream. - if empty { + if empty && w.pendingRegions.len() == 0 { w.cancelStream(time.Duration(0)) return cerror.ErrRegionWorkerExit.GenWithStackByArgs() } diff --git a/cdc/kv/region_worker_test.go b/cdc/kv/region_worker_test.go index 9438d518651..c8041246a96 100644 --- a/cdc/kv/region_worker_test.go +++ b/cdc/kv/region_worker_test.go @@ -159,7 +159,7 @@ func TestRegionWokerHandleEventEntryEventOutOfOrder(t *testing.T) { &tikv.RPCContext{}), 0) state.sri.lockedRange = ®ionlock.LockedRange{} state.start() - worker := newRegionWorker(ctx, model.ChangeFeedID{}, s, "") + worker := newRegionWorker(ctx, model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap()) require.Equal(t, 2, cap(worker.outputCh)) // Receive prewrite2 with empty value. @@ -322,7 +322,7 @@ func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) { s1.sri.lockedRange = ®ionlock.LockedRange{} s1.sri.lockedRange.CheckpointTs.Store(9) s1.start() - w := newRegionWorker(ctx, model.ChangeFeedID{}, s, "") + w := newRegionWorker(ctx, model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap()) err := w.handleResolvedTs(ctx, &resolvedTsEvent{ resolvedTs: 5, diff --git a/cdc/kv/regionlock/region_range_lock.go b/cdc/kv/regionlock/region_range_lock.go index 3a4e4b7703c..3c375f843df 100644 --- a/cdc/kv/regionlock/region_range_lock.go +++ b/cdc/kv/regionlock/region_range_lock.go @@ -140,12 +140,6 @@ func (e *rangeLockEntry) String() string { len(e.waiters)) } -var currentID uint64 = 0 - -func allocID() uint64 { - return atomic.AddUint64(¤tID, 1) -} - // RegionRangeLock is specifically used for kv client to manage exclusive region ranges. Acquiring lock will be blocked // if part of its range is already locked. It also manages checkpoint ts of all ranges. The ranges are marked by a // version number, which should comes from the Region's Epoch version. The version is used to compare which range is @@ -166,10 +160,11 @@ type RegionRangeLock struct { // NewRegionRangeLock creates a new RegionRangeLock. func NewRegionRangeLock( + id uint64, startKey, endKey []byte, startTs uint64, changefeedLogInfo string, ) *RegionRangeLock { return &RegionRangeLock{ - id: allocID(), + id: id, totalSpan: tablepb.Span{StartKey: startKey, EndKey: endKey}, changefeedLogInfo: changefeedLogInfo, rangeCheckpointTs: newRangeTsMap(startKey, endKey, startTs), diff --git a/cdc/kv/regionlock/region_range_lock_test.go b/cdc/kv/regionlock/region_range_lock_test.go index 8b1a5690190..af887248164 100644 --- a/cdc/kv/regionlock/region_range_lock_test.go +++ b/cdc/kv/regionlock/region_range_lock_test.go @@ -90,7 +90,7 @@ func TestRegionRangeLock(t *testing.T) { t.Parallel() ctx := context.TODO() - l := NewRegionRangeLock([]byte("a"), []byte("h"), math.MaxUint64, "") + l := NewRegionRangeLock(1, []byte("a"), []byte("h"), math.MaxUint64, "") mustLockRangeSuccess(ctx, t, l, "a", "e", 1, 1, math.MaxUint64) unlockRange(l, "a", "e", 1, 1, 100) @@ -107,7 +107,7 @@ func TestRegionRangeLock(t *testing.T) { func TestRegionRangeLockStale(t *testing.T) { t.Parallel() - l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64, "") + l := NewRegionRangeLock(1, []byte("a"), []byte("z"), math.MaxUint64, "") ctx := context.TODO() mustLockRangeSuccess(ctx, t, l, "c", "g", 1, 10, math.MaxUint64) mustLockRangeSuccess(ctx, t, l, "j", "n", 2, 8, math.MaxUint64) @@ -130,7 +130,7 @@ func TestRegionRangeLockLockingRegionID(t *testing.T) { t.Parallel() ctx := context.TODO() - l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64, "") + l := NewRegionRangeLock(1, []byte("a"), []byte("z"), math.MaxUint64, "") mustLockRangeSuccess(ctx, t, l, "c", "d", 1, 10, math.MaxUint64) mustLockRangeStale(ctx, t, l, "e", "f", 1, 5, "e", "f") @@ -166,7 +166,7 @@ func TestRegionRangeLockCanBeCancelled(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) - l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64, "") + l := NewRegionRangeLock(1, []byte("a"), []byte("z"), math.MaxUint64, "") mustLockRangeSuccess(ctx, t, l, "g", "h", 1, 10, math.MaxUint64) wait := mustLockRangeWait(ctx, t, l, "g", "h", 1, 12) cancel() diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index afe0ff83395..f11a036a0b4 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -410,7 +410,7 @@ func (s *SharedClient) createRegionRequest(sri singleRegionInfo) *cdcpb.ChangeDa func (s *SharedClient) appendRequest(r *requestedStore, sri singleRegionInfo) { offset := r.nextStream.Add(1) % uint32(len(r.streams)) - log.Debug("event feed will request a region", + log.Info("event feed will request a region", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Uint64("streamID", r.streams[offset].streamID), @@ -578,7 +578,7 @@ func (s *SharedClient) handleError(ctx context.Context, errInfo regionErrorInfo) switch eerr := err.(type) { case *eventError: innerErr := eerr.err - log.Debug("cdc error", + log.Info("cdc region error", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Any("subscriptionID", errInfo.requestedTable.subscriptionID), @@ -746,7 +746,7 @@ func (s *SharedClient) newRequestedTable( eventCh chan<- MultiplexingEvent, ) *requestedTable { cfName := s.changefeed.String() - rangeLock := regionlock.NewRegionRangeLock(span.StartKey, span.EndKey, startTs, cfName) + rangeLock := regionlock.NewRegionRangeLock(uint64(subID), span.StartKey, span.EndKey, startTs, cfName) rt := &requestedTable{ subscriptionID: subID, diff --git a/cdc/kv/shared_client_test.go b/cdc/kv/shared_client_test.go index 0bd6543f141..4764fd1cb2c 100644 --- a/cdc/kv/shared_client_test.go +++ b/cdc/kv/shared_client_test.go @@ -113,7 +113,7 @@ func TestConnectToOfflineOrFailedTiKV(t *testing.T) { pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} - grpcPool := sharedconn.NewConnAndClientPool(&security.Credential{}) + grpcPool := sharedconn.NewConnAndClientPool(&security.Credential{}, nil) regionCache := tikv.NewRegionCache(pdClient) diff --git a/cdc/kv/sharedconn/conn_and_client.go b/cdc/kv/sharedconn/conn_and_client.go index 79b3f3135e2..c3a96473dec 100644 --- a/cdc/kv/sharedconn/conn_and_client.go +++ b/cdc/kv/sharedconn/conn_and_client.go @@ -44,6 +44,7 @@ func StatusIsEOF(status *grpcstatus.Status) bool { // ConnAndClientPool is a pool of ConnAndClient. type ConnAndClientPool struct { credential *security.Credential + grpcMetrics *grpc_prometheus.ClientMetrics maxStreamsPerConn int sync.Mutex @@ -74,14 +75,23 @@ type connArray struct { } // NewConnAndClientPool creates a new ConnAndClientPool. -func NewConnAndClientPool(credential *security.Credential, maxStreamsPerConn ...int) *ConnAndClientPool { - return newConnAndClientPool(credential, 1000) +func NewConnAndClientPool( + credential *security.Credential, + grpcMetrics *grpc_prometheus.ClientMetrics, + maxStreamsPerConn ...int, +) *ConnAndClientPool { + return newConnAndClientPool(credential, grpcMetrics, 1000) } -func newConnAndClientPool(credential *security.Credential, maxStreamsPerConn int) *ConnAndClientPool { +func newConnAndClientPool( + credential *security.Credential, + grpcMetrics *grpc_prometheus.ClientMetrics, + maxStreamsPerConn int, +) *ConnAndClientPool { stores := make(map[string]*connArray, 64) return &ConnAndClientPool{ credential: credential, + grpcMetrics: grpcMetrics, maxStreamsPerConn: maxStreamsPerConn, stores: stores, } @@ -105,7 +115,7 @@ func (c *ConnAndClientPool) Connect(ctx context.Context, addr string) (cc *ConnA conns.Unlock() var conn *Conn - if conn, err = conns.connect(ctx, c.credential); err != nil { + if conn, err = conns.connect(ctx); err != nil { return } if conn != nil { @@ -162,11 +172,11 @@ func (c *ConnAndClient) Release() { } } -func (c *connArray) connect(ctx context.Context, credential *security.Credential) (conn *Conn, err error) { +func (c *connArray) connect(ctx context.Context) (conn *Conn, err error) { if c.inConnecting.CompareAndSwap(false, true) { defer c.inConnecting.Store(false) var clientConn *grpc.ClientConn - if clientConn, err = connect(ctx, credential, c.addr); err != nil { + if clientConn, err = c.pool.connect(ctx, c.addr); err != nil { return } @@ -240,21 +250,17 @@ func (c *connArray) sort(locked bool) { }) } -func connect(ctx context.Context, credential *security.Credential, target string) (*grpc.ClientConn, error) { - grpcTLSOption, err := credential.ToGRPCDialOption() +func (c *ConnAndClientPool) connect(ctx context.Context, target string) (*grpc.ClientConn, error) { + grpcTLSOption, err := c.credential.ToGRPCDialOption() if err != nil { return nil, err } - return grpc.DialContext( - ctx, - target, + dialOptions := []grpc.DialOption{ grpcTLSOption, grpc.WithInitialWindowSize(grpcInitialWindowSize), grpc.WithInitialConnWindowSize(grpcInitialConnWindowSize), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(grpcMaxCallRecvMsgSize)), - grpc.WithUnaryInterceptor(grpcMetrics.UnaryClientInterceptor()), - grpc.WithStreamInterceptor(grpcMetrics.StreamClientInterceptor()), grpc.WithConnectParams(grpc.ConnectParams{ Backoff: backoff.Config{ BaseDelay: time.Second, @@ -269,7 +275,14 @@ func connect(ctx context.Context, credential *security.Credential, target string Timeout: 3 * time.Second, PermitWithoutStream: true, }), - ) + } + + if c.grpcMetrics != nil { + dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(c.grpcMetrics.UnaryClientInterceptor())) + dialOptions = append(dialOptions, grpc.WithStreamInterceptor(c.grpcMetrics.StreamClientInterceptor())) + } + + return grpc.DialContext(ctx, target, dialOptions...) } const ( @@ -290,8 +303,6 @@ const ( rpcMetaFeatureStreamMultiplexing string = "stream-multiplexing" ) -var grpcMetrics = grpc_prometheus.NewClientMetrics() - func getContextFromFeatures(ctx context.Context, features []string) context.Context { return metadata.NewOutgoingContext( ctx, diff --git a/cdc/kv/sharedconn/conn_and_client_test.go b/cdc/kv/sharedconn/conn_and_client_test.go index b87589e7071..797eb095601 100644 --- a/cdc/kv/sharedconn/conn_and_client_test.go +++ b/cdc/kv/sharedconn/conn_and_client_test.go @@ -47,7 +47,7 @@ func TestConnAndClientPool(t *testing.T) { require.NotNil(t, svc) defer svc.GracefulStop() - pool := newConnAndClientPool(&security.Credential{}, 2) + pool := newConnAndClientPool(&security.Credential{}, nil, 2) cc1, err := pool.Connect(context.Background(), addr) require.Nil(t, err) require.NotNil(t, cc1) @@ -95,7 +95,7 @@ func TestConnAndClientPoolForV2(t *testing.T) { require.NotNil(t, svc) defer svc.GracefulStop() - pool := newConnAndClientPool(&security.Credential{}, 2) + pool := newConnAndClientPool(&security.Credential{}, nil, 2) cc1, err := pool.Connect(context.Background(), addr) require.Nil(t, err) require.NotNil(t, cc1) @@ -106,11 +106,12 @@ func TestConnAndClientPoolForV2(t *testing.T) { } func TestConnectToUnavailable(t *testing.T) { + pool := newConnAndClientPool(&security.Credential{}, nil, 1) + targets := []string{"127.0.0.1:9999", "2.2.2.2:9999"} for _, target := range targets { ctx := context.Background() - - conn, err := connect(ctx, &security.Credential{}, target) + conn, err := pool.connect(ctx, target) require.NotNil(t, conn) require.Nil(t, err) @@ -136,7 +137,7 @@ func TestConnectToUnavailable(t *testing.T) { require.NotNil(t, svc) defer svc.GracefulStop() - conn, err := connect(context.Background(), &security.Credential{}, addr) + conn, err := pool.connect(context.Background(), addr) require.NotNil(t, conn) require.Nil(t, err) @@ -151,6 +152,40 @@ func TestConnectToUnavailable(t *testing.T) { require.Nil(t, conn.Close()) } +func TestCancelStream(t *testing.T) { + service := make(chan *grpc.Server, 1) + var addr string + var wg sync.WaitGroup + defer wg.Wait() + wg.Add(1) + go func() { + defer wg.Done() + require.Nil(t, runGrpcService(&srv{}, &addr, service)) + }() + + svc := <-service + require.NotNil(t, svc) + defer svc.GracefulStop() + + connCtx, connCancel := context.WithCancel(context.Background()) + defer connCancel() + + pool := newConnAndClientPool(&security.Credential{}, nil, 1) + conn, err := pool.connect(connCtx, addr) + require.NotNil(t, conn) + require.Nil(t, err) + + rpcCtx, rpcCancel := context.WithCancel(context.Background()) + rpc := cdcpb.NewChangeDataClient(conn) + client, err := rpc.EventFeed(rpcCtx) + require.Nil(t, err) + + rpcCancel() + _, err = client.Recv() + require.Equal(t, grpccodes.Canceled, grpcstatus.Code(err)) + require.Nil(t, conn.Close()) +} + func runGrpcService(srv cdcpb.ChangeDataServer, addr *string, service chan<- *grpc.Server) error { defer close(service) lis, err := net.Listen("tcp", "127.0.0.1:0") diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index 59a377366dd..3035076993f 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -200,7 +200,7 @@ func (m *SourceManager) GetTableSorterStats(span tablepb.Span) engine.TableStats func (m *SourceManager) Run(ctx context.Context, _ ...chan<- error) error { if m.multiplexing { serverConfig := config.GetGlobalServerConfig() - grpcPool := sharedconn.NewConnAndClientPool(m.up.SecurityConfig) + grpcPool := sharedconn.NewConnAndClientPool(m.up.SecurityConfig, kv.GetGlobalGrpcMetrics()) client := kv.NewSharedClient( m.changefeedID, serverConfig, m.bdrMode, m.up.PDClient, grpcPool, m.up.RegionCache, m.up.PDClock, diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index 4777e8645a6..041ebc5dcd4 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -597,7 +597,7 @@ func NewDDLJobPuller( rawDDLCh := make(chan *model.RawKVEntry, defaultPullerOutputChanSize) mp.sortedDDLCh = memorysorter.SortOutput(ctx, changefeed, rawDDLCh) - grpcPool := sharedconn.NewConnAndClientPool(up.SecurityConfig) + grpcPool := sharedconn.NewConnAndClientPool(up.SecurityConfig, kv.GetGlobalGrpcMetrics()) client := kv.NewSharedClient( changefeed, cfg, ddlPullerFilterLoop,