Skip to content

Commit

Permalink
client: Merge the two tsoStream types to reuse the same error handlin…
Browse files Browse the repository at this point in the history
…g and metrics reporting code (#8433)

ref #8432

client: Merge the two tsoStream types to reuse the same error handling and metrics reporting code

This commit merges the two `xxxTSOStream` types so that the error handling and metrics reporting logic for PD server deployment and TSO service deployment can be reused.

Signed-off-by: MyonKeminta <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
MyonKeminta and ti-chi-bot[bot] authored Jul 29, 2024
1 parent 5d77447 commit 9af28fc
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 76 deletions.
6 changes: 2 additions & 4 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,7 @@ type tsoConnectionContext struct {
// Current URL of the stream connection.
streamURL string
// Current stream to send gRPC requests.
// - `pdpb.PD_TsoClient` for a leader/follower in the PD cluster.
// - `tsopb.TSO_TsoClient` for a primary/secondary in the TSO cluster.
stream tsoStream
stream *tsoStream
}

// updateConnectionCtxs will choose the proper way to update the connections for the given dc-location.
Expand Down Expand Up @@ -382,7 +380,7 @@ func (c *tsoClient) tryConnectToTSO(
var (
networkErrNum uint64
err error
stream tsoStream
stream *tsoStream
url string
cc *grpc.ClientConn
updateAndClear = func(newURL string, connectionCtx *tsoConnectionContext) {
Expand Down
4 changes: 2 additions & 2 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (td *tsoDispatcher) handleDispatcher(wg *sync.WaitGroup) {
streamCtx context.Context
cancel context.CancelFunc
streamURL string
stream tsoStream
stream *tsoStream
)
// Loop through each batch of TSO requests and send them for processing.
streamLoopTimer := time.NewTimer(option.timeout)
Expand Down Expand Up @@ -393,7 +393,7 @@ func chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConnectionContext
}

func (td *tsoDispatcher) processRequests(
stream tsoStream, dcLocation string, tbc *tsoBatchController,
stream *tsoStream, dcLocation string, tbc *tsoBatchController,
) error {
var (
requests = tbc.getCollectedRequests()
Expand Down
138 changes: 68 additions & 70 deletions client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,22 @@ func (*tsoTSOStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn) tsoStreamBui
// TSO Stream Builder

type tsoStreamBuilder interface {
build(context.Context, context.CancelFunc, time.Duration) (tsoStream, error)
build(context.Context, context.CancelFunc, time.Duration) (*tsoStream, error)
}

type pdTSOStreamBuilder struct {
serverURL string
client pdpb.PDClient
}

func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) (tsoStream, error) {
func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) (*tsoStream, error) {
done := make(chan struct{})
// TODO: we need to handle a conner case that this goroutine is timeout while the stream is successfully created.
go checkStreamTimeout(ctx, cancel, done, timeout)
stream, err := b.client.Tso(ctx)
done <- struct{}{}
if err == nil {
return &pdTSOStream{stream: stream, serverURL: b.serverURL}, nil
return &tsoStream{stream: pdTSOStreamAdapter{stream}, serverURL: b.serverURL}, nil
}
return nil, err
}
Expand All @@ -74,14 +74,14 @@ type tsoTSOStreamBuilder struct {

func (b *tsoTSOStreamBuilder) build(
ctx context.Context, cancel context.CancelFunc, timeout time.Duration,
) (tsoStream, error) {
) (*tsoStream, error) {
done := make(chan struct{})
// TODO: we need to handle a conner case that this goroutine is timeout while the stream is successfully created.
go checkStreamTimeout(ctx, cancel, done, timeout)
stream, err := b.client.Tso(ctx)
done <- struct{}{}
if err == nil {
return &tsoTSOStream{stream: stream, serverURL: b.serverURL}, nil
return &tsoStream{stream: tsoTSOStreamAdapter{stream}, serverURL: b.serverURL}, nil
}
return nil, err
}
Expand All @@ -99,86 +99,53 @@ func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done cha
<-done
}

// TSO Stream

type tsoStream interface {
getServerURL() string
// processRequests processes TSO requests in streaming mode to get timestamps
processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
count int64, batchStartTime time.Time,
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error)
type tsoRequestResult struct {
physical, logical int64
count uint32
suffixBits uint32
respKeyspaceGroupID uint32
}

type pdTSOStream struct {
serverURL string
stream pdpb.PD_TsoClient
type grpcTSOStreamAdapter interface {
Send(clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
count int64) error
Recv() (tsoRequestResult, error)
}

func (s *pdTSOStream) getServerURL() string {
return s.serverURL
type pdTSOStreamAdapter struct {
stream pdpb.PD_TsoClient
}

func (s *pdTSOStream) processRequests(
clusterID uint64, _, _ uint32, dcLocation string, count int64, batchStartTime time.Time,
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
start := time.Now()
func (s pdTSOStreamAdapter) Send(clusterID uint64, _, _ uint32, dcLocation string, count int64) error {
req := &pdpb.TsoRequest{
Header: &pdpb.RequestHeader{
ClusterId: clusterID,
},
Count: uint32(count),
DcLocation: dcLocation,
}
return s.stream.Send(req)
}

if err = s.stream.Send(req); err != nil {
if err == io.EOF {
err = errs.ErrClientTSOStreamClosed
} else {
err = errors.WithStack(err)
}
return
}
tsoBatchSendLatency.Observe(time.Since(batchStartTime).Seconds())
func (s pdTSOStreamAdapter) Recv() (tsoRequestResult, error) {
resp, err := s.stream.Recv()
duration := time.Since(start).Seconds()
if err != nil {
requestFailedDurationTSO.Observe(duration)
if err == io.EOF {
err = errs.ErrClientTSOStreamClosed
} else {
err = errors.WithStack(err)
}
return
return tsoRequestResult{}, err
}
requestDurationTSO.Observe(duration)
tsoBatchSize.Observe(float64(count))

if resp.GetCount() != uint32(count) {
err = errors.WithStack(errTSOLength)
return
}

ts := resp.GetTimestamp()
respKeyspaceGroupID = defaultKeySpaceGroupID
physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits()
return
return tsoRequestResult{
physical: resp.GetTimestamp().GetPhysical(),
logical: resp.GetTimestamp().GetLogical(),
count: resp.GetCount(),
suffixBits: resp.GetTimestamp().GetSuffixBits(),
respKeyspaceGroupID: defaultKeySpaceGroupID,
}, nil
}

type tsoTSOStream struct {
serverURL string
stream tsopb.TSO_TsoClient
type tsoTSOStreamAdapter struct {
stream tsopb.TSO_TsoClient
}

func (s *tsoTSOStream) getServerURL() string {
return s.serverURL
}

func (s *tsoTSOStream) processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
count int64, batchStartTime time.Time,
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
start := time.Now()
func (s tsoTSOStreamAdapter) Send(clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, count int64) error {
req := &tsopb.TsoRequest{
Header: &tsopb.RequestHeader{
ClusterId: clusterID,
Expand All @@ -188,8 +155,40 @@ func (s *tsoTSOStream) processRequests(
Count: uint32(count),
DcLocation: dcLocation,
}
return s.stream.Send(req)
}

if err = s.stream.Send(req); err != nil {
func (s tsoTSOStreamAdapter) Recv() (tsoRequestResult, error) {
resp, err := s.stream.Recv()
if err != nil {
return tsoRequestResult{}, err
}
return tsoRequestResult{
physical: resp.GetTimestamp().GetPhysical(),
logical: resp.GetTimestamp().GetLogical(),
count: resp.GetCount(),
suffixBits: resp.GetTimestamp().GetSuffixBits(),
respKeyspaceGroupID: resp.GetHeader().GetKeyspaceGroupId(),
}, nil
}

type tsoStream struct {
serverURL string
// The internal gRPC stream.
// - `pdpb.PD_TsoClient` for a leader/follower in the PD cluster.
// - `tsopb.TSO_TsoClient` for a primary/secondary in the TSO cluster.
stream grpcTSOStreamAdapter
}

func (s *tsoStream) getServerURL() string {
return s.serverURL
}

func (s *tsoStream) processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, count int64, batchStartTime time.Time,
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
start := time.Now()
if err = s.stream.Send(clusterID, keyspaceID, keyspaceGroupID, dcLocation, count); err != nil {
if err == io.EOF {
err = errs.ErrClientTSOStreamClosed
} else {
Expand All @@ -198,7 +197,7 @@ func (s *tsoTSOStream) processRequests(
return
}
tsoBatchSendLatency.Observe(time.Since(batchStartTime).Seconds())
resp, err := s.stream.Recv()
res, err := s.stream.Recv()
duration := time.Since(start).Seconds()
if err != nil {
requestFailedDurationTSO.Observe(duration)
Expand All @@ -212,13 +211,12 @@ func (s *tsoTSOStream) processRequests(
requestDurationTSO.Observe(duration)
tsoBatchSize.Observe(float64(count))

if resp.GetCount() != uint32(count) {
if res.count != uint32(count) {
err = errors.WithStack(errTSOLength)
return
}

ts := resp.GetTimestamp()
respKeyspaceGroupID = resp.GetHeader().GetKeyspaceGroupId()
physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits()
respKeyspaceGroupID = res.respKeyspaceGroupID
physical, logical, suffixBits = res.physical, res.logical, res.suffixBits
return
}

0 comments on commit 9af28fc

Please sign in to comment.