From 9af28fc1d426697f90bba2f46b3957ec4f03bdcb Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Mon, 29 Jul 2024 16:13:17 +0800 Subject: [PATCH] client: Merge the two tsoStream types to reuse the same error handling and metrics reporting code (#8433) ref tikv/pd#8432 client: Merge the two tsoStream types to reuse the same error handling and metrics reporting code This commit merges the two `xxxTSOStream` types so that the error handling and metrics reporting logic for PD server deployment and TSO service deployment can be reused. Signed-off-by: MyonKeminta Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/tso_client.go | 6 +- client/tso_dispatcher.go | 4 +- client/tso_stream.go | 138 +++++++++++++++++++-------------------- 3 files changed, 72 insertions(+), 76 deletions(-) diff --git a/client/tso_client.go b/client/tso_client.go index 5e221eae478..2f3b949f017 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -350,9 +350,7 @@ type tsoConnectionContext struct { // Current URL of the stream connection. streamURL string // Current stream to send gRPC requests. - // - `pdpb.PD_TsoClient` for a leader/follower in the PD cluster. - // - `tsopb.TSO_TsoClient` for a primary/secondary in the TSO cluster. - stream tsoStream + stream *tsoStream } // updateConnectionCtxs will choose the proper way to update the connections for the given dc-location. @@ -382,7 +380,7 @@ func (c *tsoClient) tryConnectToTSO( var ( networkErrNum uint64 err error - stream tsoStream + stream *tsoStream url string cc *grpc.ClientConn updateAndClear = func(newURL string, connectionCtx *tsoConnectionContext) { diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 0919fd84744..a7c99057275 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -186,7 +186,7 @@ func (td *tsoDispatcher) handleDispatcher(wg *sync.WaitGroup) { streamCtx context.Context cancel context.CancelFunc streamURL string - stream tsoStream + stream *tsoStream ) // Loop through each batch of TSO requests and send them for processing. streamLoopTimer := time.NewTimer(option.timeout) @@ -393,7 +393,7 @@ func chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConnectionContext } func (td *tsoDispatcher) processRequests( - stream tsoStream, dcLocation string, tbc *tsoBatchController, + stream *tsoStream, dcLocation string, tbc *tsoBatchController, ) error { var ( requests = tbc.getCollectedRequests() diff --git a/client/tso_stream.go b/client/tso_stream.go index 9c4d78dfe18..da9cab95ba0 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -47,7 +47,7 @@ func (*tsoTSOStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn) tsoStreamBui // TSO Stream Builder type tsoStreamBuilder interface { - build(context.Context, context.CancelFunc, time.Duration) (tsoStream, error) + build(context.Context, context.CancelFunc, time.Duration) (*tsoStream, error) } type pdTSOStreamBuilder struct { @@ -55,14 +55,14 @@ type pdTSOStreamBuilder struct { client pdpb.PDClient } -func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) (tsoStream, error) { +func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) (*tsoStream, error) { done := make(chan struct{}) // TODO: we need to handle a conner case that this goroutine is timeout while the stream is successfully created. go checkStreamTimeout(ctx, cancel, done, timeout) stream, err := b.client.Tso(ctx) done <- struct{}{} if err == nil { - return &pdTSOStream{stream: stream, serverURL: b.serverURL}, nil + return &tsoStream{stream: pdTSOStreamAdapter{stream}, serverURL: b.serverURL}, nil } return nil, err } @@ -74,14 +74,14 @@ type tsoTSOStreamBuilder struct { func (b *tsoTSOStreamBuilder) build( ctx context.Context, cancel context.CancelFunc, timeout time.Duration, -) (tsoStream, error) { +) (*tsoStream, error) { done := make(chan struct{}) // TODO: we need to handle a conner case that this goroutine is timeout while the stream is successfully created. go checkStreamTimeout(ctx, cancel, done, timeout) stream, err := b.client.Tso(ctx) done <- struct{}{} if err == nil { - return &tsoTSOStream{stream: stream, serverURL: b.serverURL}, nil + return &tsoStream{stream: tsoTSOStreamAdapter{stream}, serverURL: b.serverURL}, nil } return nil, err } @@ -99,30 +99,24 @@ func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done cha <-done } -// TSO Stream - -type tsoStream interface { - getServerURL() string - // processRequests processes TSO requests in streaming mode to get timestamps - processRequests( - clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, - count int64, batchStartTime time.Time, - ) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) +type tsoRequestResult struct { + physical, logical int64 + count uint32 + suffixBits uint32 + respKeyspaceGroupID uint32 } -type pdTSOStream struct { - serverURL string - stream pdpb.PD_TsoClient +type grpcTSOStreamAdapter interface { + Send(clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, + count int64) error + Recv() (tsoRequestResult, error) } -func (s *pdTSOStream) getServerURL() string { - return s.serverURL +type pdTSOStreamAdapter struct { + stream pdpb.PD_TsoClient } -func (s *pdTSOStream) processRequests( - clusterID uint64, _, _ uint32, dcLocation string, count int64, batchStartTime time.Time, -) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { - start := time.Now() +func (s pdTSOStreamAdapter) Send(clusterID uint64, _, _ uint32, dcLocation string, count int64) error { req := &pdpb.TsoRequest{ Header: &pdpb.RequestHeader{ ClusterId: clusterID, @@ -130,55 +124,28 @@ func (s *pdTSOStream) processRequests( Count: uint32(count), DcLocation: dcLocation, } + return s.stream.Send(req) +} - if err = s.stream.Send(req); err != nil { - if err == io.EOF { - err = errs.ErrClientTSOStreamClosed - } else { - err = errors.WithStack(err) - } - return - } - tsoBatchSendLatency.Observe(time.Since(batchStartTime).Seconds()) +func (s pdTSOStreamAdapter) Recv() (tsoRequestResult, error) { resp, err := s.stream.Recv() - duration := time.Since(start).Seconds() if err != nil { - requestFailedDurationTSO.Observe(duration) - if err == io.EOF { - err = errs.ErrClientTSOStreamClosed - } else { - err = errors.WithStack(err) - } - return + return tsoRequestResult{}, err } - requestDurationTSO.Observe(duration) - tsoBatchSize.Observe(float64(count)) - - if resp.GetCount() != uint32(count) { - err = errors.WithStack(errTSOLength) - return - } - - ts := resp.GetTimestamp() - respKeyspaceGroupID = defaultKeySpaceGroupID - physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits() - return + return tsoRequestResult{ + physical: resp.GetTimestamp().GetPhysical(), + logical: resp.GetTimestamp().GetLogical(), + count: resp.GetCount(), + suffixBits: resp.GetTimestamp().GetSuffixBits(), + respKeyspaceGroupID: defaultKeySpaceGroupID, + }, nil } -type tsoTSOStream struct { - serverURL string - stream tsopb.TSO_TsoClient +type tsoTSOStreamAdapter struct { + stream tsopb.TSO_TsoClient } -func (s *tsoTSOStream) getServerURL() string { - return s.serverURL -} - -func (s *tsoTSOStream) processRequests( - clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, - count int64, batchStartTime time.Time, -) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { - start := time.Now() +func (s tsoTSOStreamAdapter) Send(clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, count int64) error { req := &tsopb.TsoRequest{ Header: &tsopb.RequestHeader{ ClusterId: clusterID, @@ -188,8 +155,40 @@ func (s *tsoTSOStream) processRequests( Count: uint32(count), DcLocation: dcLocation, } + return s.stream.Send(req) +} - if err = s.stream.Send(req); err != nil { +func (s tsoTSOStreamAdapter) Recv() (tsoRequestResult, error) { + resp, err := s.stream.Recv() + if err != nil { + return tsoRequestResult{}, err + } + return tsoRequestResult{ + physical: resp.GetTimestamp().GetPhysical(), + logical: resp.GetTimestamp().GetLogical(), + count: resp.GetCount(), + suffixBits: resp.GetTimestamp().GetSuffixBits(), + respKeyspaceGroupID: resp.GetHeader().GetKeyspaceGroupId(), + }, nil +} + +type tsoStream struct { + serverURL string + // The internal gRPC stream. + // - `pdpb.PD_TsoClient` for a leader/follower in the PD cluster. + // - `tsopb.TSO_TsoClient` for a primary/secondary in the TSO cluster. + stream grpcTSOStreamAdapter +} + +func (s *tsoStream) getServerURL() string { + return s.serverURL +} + +func (s *tsoStream) processRequests( + clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, count int64, batchStartTime time.Time, +) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { + start := time.Now() + if err = s.stream.Send(clusterID, keyspaceID, keyspaceGroupID, dcLocation, count); err != nil { if err == io.EOF { err = errs.ErrClientTSOStreamClosed } else { @@ -198,7 +197,7 @@ func (s *tsoTSOStream) processRequests( return } tsoBatchSendLatency.Observe(time.Since(batchStartTime).Seconds()) - resp, err := s.stream.Recv() + res, err := s.stream.Recv() duration := time.Since(start).Seconds() if err != nil { requestFailedDurationTSO.Observe(duration) @@ -212,13 +211,12 @@ func (s *tsoTSOStream) processRequests( requestDurationTSO.Observe(duration) tsoBatchSize.Observe(float64(count)) - if resp.GetCount() != uint32(count) { + if res.count != uint32(count) { err = errors.WithStack(errTSOLength) return } - ts := resp.GetTimestamp() - respKeyspaceGroupID = resp.GetHeader().GetKeyspaceGroupId() - physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits() + respKeyspaceGroupID = res.respKeyspaceGroupID + physical, logical, suffixBits = res.physical, res.logical, res.suffixBits return }