Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Oct 28, 2024
1 parent 5eda5be commit 89bed1f
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 5 deletions.
3 changes: 2 additions & 1 deletion pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func NewAllocatorManager(
storage endpoint.TSOStorage,
cfg Config,
allocatorKeyPrefix string,
allocatorKey string,
) *AllocatorManager {
ctx, cancel := context.WithCancel(ctx)
am := &AllocatorManager{
Expand All @@ -224,8 +225,8 @@ func NewAllocatorManager(
leaderLease: cfg.GetLeaderLease(),
maxResetTSGap: cfg.GetMaxResetTSGap,
securityConfig: cfg.GetTLSConfig(),
allocatorKey: path.Join(allocatorKeyPrefix, fmt.Sprintf("keyspace_group_%d", keyspaceGroupID)),
allocatorKeyPrefix: allocatorKeyPrefix,
allocatorKey: allocatorKey,
}
am.mu.allocatorGroups = make(map[string]*allocatorGroup)
am.mu.clusterDCLocations = make(map[string]*DCLocationInfo)
Expand Down
6 changes: 4 additions & 2 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import (
"google.golang.org/grpc"
)

const ttlSeconds = 3

// Allocator is a Timestamp Oracle allocator.
type Allocator interface {
// Initialize is used to initialize a TSO allocator.
Expand Down Expand Up @@ -732,7 +734,7 @@ func (gta *GlobalTSOAllocator) registerAllocator() error {
}

func (gta *GlobalTSOAllocator) renewKeepalive() <-chan *clientv3.LeaseKeepAliveResponse {
t := time.NewTicker(time.Duration(3) * time.Second / 2)
t := time.NewTicker(time.Duration(ttlSeconds) * time.Second / 2)
defer t.Stop()
for {
select {
Expand All @@ -748,7 +750,7 @@ func (gta *GlobalTSOAllocator) renewKeepalive() <-chan *clientv3.LeaseKeepAliveR
func (gta *GlobalTSOAllocator) txnWithTTL(key, value string) (clientv3.LeaseID, error) {
ctx, cancel := context.WithTimeout(gta.ctx, etcdutil.DefaultRequestTimeout)
defer cancel()
grantResp, err := gta.am.etcdClient.Grant(ctx, 3)
grantResp, err := gta.am.etcdClient.Grant(ctx, ttlSeconds)
if err != nil {
return 0, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
storage = kgm.tsoSvcStorage
}
// Initialize all kinds of maps.
am := NewAllocatorManager(kgm.ctx, kgm.etcdClient, group.ID, participant, tsRootPath, storage, kgm.cfg, path.Join(keypath.GlobalTSOAllocatorsPrefix(kgm.clusterID), "tso"))
allocatorKeyPrefix := keypath.GlobalTSOAllocatorsPrefix(kgm.clusterID)
am := NewAllocatorManager(kgm.ctx, kgm.etcdClient, group.ID, participant, tsRootPath, storage, kgm.cfg, allocatorKeyPrefix, path.Join(allocatorKeyPrefix, "tso", fmt.Sprintf("keyspace_group_%d", group.ID)))
am.startGlobalAllocatorLoop()
log.Info("created allocator manager",
zap.Uint32("keyspace-group-id", group.ID),
Expand Down
3 changes: 2 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,8 @@ func (s *Server) startServer(ctx context.Context) error {
s.tsoDispatcher = tsoutil.NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize)
s.tsoProtoFactory = &tsoutil.TSOProtoFactory{}
s.pdProtoFactory = &tsoutil.PDProtoFactory{}
s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, s.client, constant.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s, path.Join(keypath.GlobalTSOAllocatorsPrefix(s.clusterID.Load()), "pd"))
allocatorKeyPrefix := keypath.GlobalTSOAllocatorsPrefix(s.clusterID.Load())
s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, s.client, constant.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s, allocatorKeyPrefix, path.Join(allocatorKeyPrefix, "pd"))
// When disabled the Local TSO, we should clean up the Local TSO Allocator's meta info written in etcd if it exists.
if !s.cfg.EnableLocalTSO {
if err = s.tsoAllocatorManager.CleanUpDCLocation(); err != nil {
Expand Down

0 comments on commit 89bed1f

Please sign in to comment.