Skip to content

Commit

Permalink
register tso allocator
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Mar 7, 2024
1 parent 32bba44 commit cc960b1
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 8 deletions.
7 changes: 7 additions & 0 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ const (
keyspaceGroupsMembershipKey = "membership"
keyspaceGroupsElectionKey = "election"

tsoAllocatorsPrefix = "tso_allocators"

// we use uint64 to represent ID, the max length of uint64 is 20.
keyLen = 20
)
Expand Down Expand Up @@ -415,3 +417,8 @@ func FullTimestampPath(clusterID uint64, groupID uint32) string {
}
return path.Join(rootPath, tsPath)
}

// GlobalTSOAllocatorsPrefix returns the global TSO allocators prefix.
func GlobalTSOAllocatorsPrefix(clusterID uint64) string {
return path.Join(PDRootPath(clusterID), tsoAllocatorsPrefix)
}
15 changes: 12 additions & 3 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ type AllocatorManager struct {

ctx context.Context
cancel context.CancelFunc

etcdClient *clientv3.Client
// kgID is the keyspace group ID
kgID uint32
// member is for election use
Expand All @@ -184,9 +186,11 @@ type AllocatorManager struct {
// leaderLease defines the time within which a TSO primary/leader must update its TTL
// in etcd, otherwise etcd will expire the leader key and other servers can campaign
// the primary/leader again. Etcd only supports seconds TTL, so here is second too.
leaderLease int64
maxResetTSGap func() time.Duration
securityConfig *grpcutil.TLSConfig
leaderLease int64
maxResetTSGap func() time.Duration
securityConfig *grpcutil.TLSConfig
allocatorKeyPrefix string
allocatorKey string
// for gRPC use
localAllocatorConn struct {
syncutil.RWMutex
Expand All @@ -197,17 +201,20 @@ type AllocatorManager struct {
// NewAllocatorManager creates a new TSO Allocator Manager.
func NewAllocatorManager(
ctx context.Context,
etcdClient *clientv3.Client,
keyspaceGroupID uint32,
member ElectionMember,
rootPath string,
storage endpoint.TSOStorage,
cfg Config,
startGlobalLeaderLoop bool,
allocatorKeyPrefix string,
) *AllocatorManager {
ctx, cancel := context.WithCancel(ctx)
am := &AllocatorManager{
ctx: ctx,
cancel: cancel,
etcdClient: etcdClient,
kgID: keyspaceGroupID,
member: member,
rootPath: rootPath,
Expand All @@ -218,6 +225,8 @@ func NewAllocatorManager(
leaderLease: cfg.GetLeaderLease(),
maxResetTSGap: cfg.GetMaxResetTSGap,
securityConfig: cfg.GetTLSConfig(),
allocatorKey: path.Join(allocatorKeyPrefix, fmt.Sprintf("keyspace_group_%d", keyspaceGroupID)),
allocatorKeyPrefix: allocatorKeyPrefix,
}
am.mu.allocatorGroups = make(map[string]*allocatorGroup)
am.mu.clusterDCLocations = make(map[string]*DCLocationInfo)
Expand Down
116 changes: 116 additions & 0 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"runtime/trace"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -32,9 +33,12 @@ import (
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -131,6 +135,9 @@ func newGlobalTimestampOracle(am *AllocatorManager) *timestampOracle {
// close is used to shutdown the primary election loop.
// tso service call this function to shutdown the loop here, but pd manages its own loop.
func (gta *GlobalTSOAllocator) close() {
if err := gta.deregisterAllocator(); err != nil {
log.Warn("deregister tso allocator failed", zap.String("key", gta.am.allocatorKey), errs.ZapError(err))
}
gta.cancel()
gta.wg.Wait()
}
Expand Down Expand Up @@ -184,6 +191,9 @@ func (gta *GlobalTSOAllocator) estimateMaxTS(ctx context.Context, count uint32,

// Initialize will initialize the created global TSO allocator.
func (gta *GlobalTSOAllocator) Initialize(int) error {
if err := gta.registerAllocator(); err != nil {
return err
}
gta.tsoAllocatorRoleGauge.Set(1)
// The suffix of a Global TSO should always be 0.
gta.timestampOracle.suffix = 0
Expand Down Expand Up @@ -658,3 +668,109 @@ func (gta *GlobalTSOAllocator) campaignLeader() {
func (gta *GlobalTSOAllocator) getMetrics() *tsoMetrics {
return gta.timestampOracle.metrics
}

// registerAllocator registers the tso allocator to etcd.
// It is used when switching mode between pd and ms. We need to make sure only one TSO allocator is working at the same time.
func (gta *GlobalTSOAllocator) registerAllocator() error {
log.Info("register tso allocator", zap.String("key", gta.am.allocatorKey))
kresp := gta.tryRegister()
if kresp == nil {
return errors.New("context canceled")
}
go func() {
defer logutil.LogPanic()
for {
select {
case <-gta.ctx.Done():
log.Info("exit register tso allocator", zap.String("key", gta.am.allocatorKey))
return
case _, ok := <-kresp:
if !ok {
log.Error("keep alive tso allocator failed", zap.String("key", gta.am.allocatorKey))
kresp = gta.renewKeepalive()
}
}
}
}()
return nil
}

func (gta *GlobalTSOAllocator) renewKeepalive() <-chan *clientv3.LeaseKeepAliveResponse {
t := time.NewTicker(time.Duration(3) * time.Second / 2)
defer t.Stop()
for {
select {
case <-gta.ctx.Done():
log.Info("exit register tso allocator", zap.String("key", gta.am.allocatorKey))
return nil
case <-t.C:
return gta.tryRegister()
}
}
}

func (gta *GlobalTSOAllocator) txnWithTTL(key, value string) (clientv3.LeaseID, error) {
ctx, cancel := context.WithTimeout(gta.ctx, etcdutil.DefaultRequestTimeout)
defer cancel()
grantResp, err := gta.am.etcdClient.Grant(ctx, 3)
if err != nil {
return 0, err
}
resp, err := kv.NewSlowLogTxn(gta.am.etcdClient).
Then(clientv3.OpPut(key, value, clientv3.WithLease(grantResp.ID))).
Commit()
if err != nil {
return 0, errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByArgs()
}
if !resp.Succeeded {
return 0, errs.ErrEtcdTxnConflict.FastGenByArgs()
}

return grantResp.ID, nil
}

// deregisterAllocator deregisters the tso allocator from etcd.
func (gta *GlobalTSOAllocator) deregisterAllocator() error {
log.Info("deregister tso allocator", zap.String("key", gta.am.allocatorKey))
ctx, cancel := context.WithTimeout(gta.ctx, time.Duration(3)*time.Second)
defer cancel()
_, err := gta.am.etcdClient.Delete(ctx, gta.am.allocatorKey)
return err
}

func (gta *GlobalTSOAllocator) tryRegister() <-chan *clientv3.LeaseKeepAliveResponse {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
outerLoop:
for {
select {
case <-gta.ctx.Done():
return nil
case <-ticker.C:
ctx, cancel := context.WithTimeout(gta.ctx, time.Duration(3)*time.Second)
resp, err := gta.am.etcdClient.Get(ctx, gta.am.allocatorKeyPrefix, clientv3.WithPrefix())
cancel()
if err != nil {
continue
}
// wait for the previous allocator with different mode to be deregistered
if len(resp.Kvs) > 0 {
for _, kv := range resp.Kvs {
key := string(kv.Key)
if !strings.Contains(key, gta.am.allocatorKeyPrefix) {
continue outerLoop
}
}
}
id, err := gta.txnWithTTL(gta.am.allocatorKey, "")
if err != nil {
continue
}
kresp, err := gta.am.etcdClient.KeepAlive(gta.ctx, id)
if err != nil {
continue
}
return kresp
}
}
}
6 changes: 5 additions & 1 deletion pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"math"
"net/http"
"path"
"regexp"
"sort"
"sync"
Expand Down Expand Up @@ -319,6 +320,7 @@ type KeyspaceGroupManager struct {

// tsoServiceID is the service ID of the TSO service, registered in the service discovery
tsoServiceID *discovery.ServiceRegistryEntry
clusterID uint64
etcdClient *clientv3.Client
httpClient *http.Client
// electionNamePrefix is the name prefix to generate the unique name of a participant,
Expand Down Expand Up @@ -414,6 +416,7 @@ func NewKeyspaceGroupManager(
ctx: ctx,
cancel: cancel,
tsoServiceID: tsoServiceID,
clusterID: clusterID,
etcdClient: etcdClient,
httpClient: httpClient,
electionNamePrefix: electionNamePrefix,
Expand Down Expand Up @@ -768,7 +771,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
storage = kgm.tsoSvcStorage
}
// Initialize all kinds of maps.
am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true)
am := NewAllocatorManager(kgm.ctx, kgm.etcdClient, group.ID, participant, tsRootPath, storage, kgm.cfg, true,
path.Join(endpoint.GlobalTSOAllocatorsPrefix(kgm.clusterID), "tso"))
log.Info("created allocator manager",
zap.Uint32("keyspace-group-id", group.ID),
zap.String("timestamp-path", am.GetTimestampPath("")))
Expand Down
4 changes: 1 addition & 3 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() {
tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: suite.cfg.AdvertiseListenAddr}
clusterID := rand.Uint64()
clusterIDStr := strconv.FormatUint(clusterID, 10)

legacySvcRootPath := path.Join("/pd", clusterIDStr)
tsoSvcRootPath := path.Join(mcsutils.MicroserviceRootPath, clusterIDStr, "tso")
electionNamePrefix := "tso-server-" + clusterIDStr
Expand Down Expand Up @@ -1046,7 +1045,6 @@ func (suite *keyspaceGroupManagerTestSuite) TestPrimaryPriorityChange() {
defaultPriority := mcsutils.DefaultKeyspaceGroupReplicaPriority
clusterID := rand.Uint64()
clusterIDStr := strconv.FormatUint(clusterID, 10)

rootPath := path.Join("/pd", clusterIDStr)
cfg1 := suite.createConfig()
cfg2 := suite.createConfig()
Expand Down Expand Up @@ -1212,5 +1210,5 @@ func waitForPrimariesServing(
}
}
return true
}, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond))
}, testutil.WithWaitFor(10*time.Second), testutil.WithTickInterval(100*time.Millisecond))
}
4 changes: 3 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math/rand"
"net/http"
"os"
"path"
"path/filepath"
"runtime"
"strconv"
Expand Down Expand Up @@ -464,7 +465,8 @@ func (s *Server) startServer(ctx context.Context) error {
s.tsoProtoFactory = &tsoutil.TSOProtoFactory{}
s.pdProtoFactory = &tsoutil.PDProtoFactory{}
if !s.IsAPIServiceMode() {
s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, mcs.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s, false)
s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, s.client, mcs.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s, false,
path.Join(endpoint.GlobalTSOAllocatorsPrefix(s.clusterID.Load()), "pd"))
// When disabled the Local TSO, we should clean up the Local TSO Allocator's meta info written in etcd if it exists.
if !s.cfg.EnableLocalTSO {
if err = s.tsoAllocatorManager.CleanUpDCLocation(); err != nil {
Expand Down

0 comments on commit cc960b1

Please sign in to comment.