Skip to content

Commit

Permalink
Address the comments
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Dec 26, 2024
1 parent fb22171 commit f84cf99
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 10 deletions.
8 changes: 4 additions & 4 deletions client/clients/tso/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func (c *Cli) tryConnectToTSO(ctx context.Context) error {
cc, url = c.getTSOLeaderClientConn()
if c.conCtxMgr.Exist(url) {
// Just trigger the clean up of the stale connection contexts.
c.conCtxMgr.ExclusivelyStore(ctx, url)
c.conCtxMgr.CleanAllAndStore(ctx, url)
return nil
}
if cc != nil {
Expand All @@ -341,7 +341,7 @@ func (c *Cli) tryConnectToTSO(ctx context.Context) error {
err = status.New(codes.Unavailable, "unavailable").Err()
})
if stream != nil && err == nil {
c.conCtxMgr.ExclusivelyStore(ctx, url, stream)
c.conCtxMgr.CleanAllAndStore(ctx, url, stream)
return nil
}

Expand Down Expand Up @@ -386,7 +386,7 @@ func (c *Cli) tryConnectToTSO(ctx context.Context) error {
// the goroutine is used to check the network and change back to the original stream
go c.checkLeader(ctx, cancel, forwardedHostTrim, addr, url)
metrics.RequestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(1)
c.conCtxMgr.ExclusivelyStore(ctx, backupURL, stream)
c.conCtxMgr.CleanAllAndStore(ctx, backupURL, stream)
return nil
}
cancel()
Expand Down Expand Up @@ -431,7 +431,7 @@ func (c *Cli) checkLeader(
stream, err := c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.Timeout)
if err == nil && stream != nil {
log.Info("[tso] recover the original tso stream since the network has become normal", zap.String("url", url))
c.conCtxMgr.ExclusivelyStore(ctx, url, stream)
c.conCtxMgr.CleanAllAndStore(ctx, url, stream)
return
}
}
Expand Down
2 changes: 1 addition & 1 deletion client/clients/tso/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ tsoBatchLoop:
// Choose a stream to send the TSO gRPC request.
streamChoosingLoop:
for {
connectionCtx := conCtxMgr.Choose()
connectionCtx := conCtxMgr.GetConnectionCtx()
if connectionCtx != nil {
streamCtx, cancel, streamURL, stream = connectionCtx.Ctx, connectionCtx.Cancel, connectionCtx.StreamURL, connectionCtx.Stream
}
Expand Down
10 changes: 5 additions & 5 deletions client/pkg/connectionctx/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ func (c *Manager[T]) storeLocked(ctx context.Context, url string, stream T) {
c.connectionCtxs[url] = &connectionCtx[T]{cctx, cancel, url, stream}
}

// ExclusivelyStore is used to store the connection context exclusively. It will release
// CleanAllAndStore is used to store the connection context exclusively. It will release
// all other connection contexts. `stream` is optional, if it is not provided, all
// connection contexts other than the given `url` will be released.
func (c *Manager[T]) ExclusivelyStore(ctx context.Context, url string, stream ...T) {
func (c *Manager[T]) CleanAllAndStore(ctx context.Context, url string, stream ...T) {
c.Lock()
defer c.Unlock()
// Remove all other `connectionCtx`s.
Expand Down Expand Up @@ -125,9 +125,9 @@ func (c *Manager[T]) releaseLocked(url string) {
delete(c.connectionCtxs, url)
}

// Choose is used to choose a connection context from the connection context map.
// It uses the reservoir sampling algorithm to randomly choose a connection context.
func (c *Manager[T]) Choose() *connectionCtx[T] {
// GetConnectionCtx is used to get a connection context from the connection context map.
// It uses the reservoir sampling algorithm to randomly pick one connection context.
func (c *Manager[T]) GetConnectionCtx() *connectionCtx[T] {
c.RLock()
defer c.RUnlock()
idx := 0
Expand Down
83 changes: 83 additions & 0 deletions client/pkg/connectionctx/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package connectionctx

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func TestManager(t *testing.T) {
re := require.New(t)
ctx := context.Background()
manager := NewManager[int]()

re.False(manager.Exist("test-url"))
manager.Store(ctx, "test-url", 1)
re.True(manager.Exist("test-url"))

cctx := manager.GetConnectionCtx()
re.Equal("test-url", cctx.StreamURL)
re.Equal(1, cctx.Stream)

manager.Store(ctx, "test-url", 2)
cctx = manager.GetConnectionCtx()
re.Equal("test-url", cctx.StreamURL)
re.Equal(1, cctx.Stream)

manager.Store(ctx, "test-url", 2, true)
cctx = manager.GetConnectionCtx()
re.Equal("test-url", cctx.StreamURL)
re.Equal(2, cctx.Stream)

manager.Store(ctx, "test-another-url", 3)
pickedCount := make(map[string]int)
for range 1000 {
cctx = manager.GetConnectionCtx()
pickedCount[cctx.StreamURL]++
}
re.NotEmpty(pickedCount["test-url"])
re.NotEmpty(pickedCount["test-another-url"])
re.Equal(1000, pickedCount["test-url"]+pickedCount["test-another-url"])

manager.GC(func(url string) bool {
return url == "test-url"
})
re.False(manager.Exist("test-url"))
re.True(manager.Exist("test-another-url"))

manager.CleanAllAndStore(ctx, "test-url", 1)
re.True(manager.Exist("test-url"))
re.False(manager.Exist("test-another-url"))

manager.Store(ctx, "test-another-url", 3)
manager.CleanAllAndStore(ctx, "test-unique-url", 4)
re.True(manager.Exist("test-unique-url"))
re.False(manager.Exist("test-url"))
re.False(manager.Exist("test-another-url"))

manager.Release("test-unique-url")
re.False(manager.Exist("test-unique-url"))

for i := range 1000 {
manager.Store(ctx, fmt.Sprintf("test-url-%d", i), i)
}
re.Len(manager.connectionCtxs, 1000)
manager.ReleaseAll()
re.Empty(manager.connectionCtxs)
}

0 comments on commit f84cf99

Please sign in to comment.