diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 126cf8a0a613..040f2c299166 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -45,7 +45,11 @@ import ( "google.golang.org/grpc" ) -const ttlSeconds = 3 +const ( + ttlSeconds = 3 + keyspaceGroupPrefix = "keyspace_group" + pdPrefix = "pd" +) // Allocator is a Timestamp Oracle allocator. type Allocator interface { @@ -540,6 +544,9 @@ func (gta *GlobalTSOAllocator) getCurrentTSO(ctx context.Context) (*pdpb.Timesta func (gta *GlobalTSOAllocator) Reset() { gta.tsoAllocatorRoleGauge.Set(0) gta.timestampOracle.ResetTimestamp() + if err := gta.deregisterAllocator(); err != nil { + log.Warn("deregister tso allocator failed", zap.String("key", gta.am.allocatorKey), errs.ZapError(err)) + } } // primaryElectionLoop is used to maintain the TSO primary election and TSO's @@ -803,11 +810,15 @@ func (gta *GlobalTSOAllocator) register() (<-chan *clientv3.LeaseKeepAliveRespon if err != nil { return nil, true } + allocatorKey := strings.TrimLeft(gta.am.allocatorKey, gta.am.allocatorKeyPrefix) // wait for the previous allocator with different mode to be deregistered if len(resp.Kvs) > 0 { for _, kv := range resp.Kvs { - key := string(kv.Key) - if !strings.Contains(key, gta.am.allocatorKeyPrefix) { + key := strings.TrimLeft(string(kv.Key), gta.am.allocatorKeyPrefix) + if strings.Contains(allocatorKey, keyspaceGroupPrefix) && strings.Contains(key, pdPrefix) { + return nil, true + } + if strings.Contains(allocatorKey, pdPrefix) && strings.Contains(key, keyspaceGroupPrefix) { return nil, true } } diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 0993e94fb679..df34b9658749 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -1103,6 +1103,9 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest( if err != nil { return err } + if allocator.IsInitialize() { + return nil + } // TODO: support the Local TSO Allocator. return allocator.Initialize(0) }) diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 029dd79d8eb0..6fa355b8310b 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -33,7 +33,9 @@ import ( "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/mcs/discovery" "github.com/tikv/pd/pkg/mcs/utils/constant" + "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/syncutil" @@ -43,6 +45,7 @@ import ( "github.com/tikv/pd/pkg/utils/typeutil" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/server/v3/embed" "go.uber.org/goleak" ) @@ -59,6 +62,7 @@ type keyspaceGroupManagerTestSuite struct { etcdClient *clientv3.Client clean func() cfg *TestServiceConfig + servers []*embed.Etcd } func TestKeyspaceGroupManagerTestSuite(t *testing.T) { @@ -69,8 +73,8 @@ func (suite *keyspaceGroupManagerTestSuite) SetupSuite() { t := suite.T() suite.ctx, suite.cancel = context.WithCancel(context.Background()) suite.ClusterID = rand.Uint64() - servers, client, clean := etcdutil.NewTestEtcdCluster(t, 1) - suite.backendEndpoints, suite.etcdClient, suite.clean = servers[0].Config().ListenClientUrls[0].String(), client, clean + suite.servers, suite.etcdClient, suite.clean = etcdutil.NewTestEtcdCluster(t, 1) + suite.backendEndpoints = suite.servers[0].Config().ListenClientUrls[0].String() suite.cfg = suite.createConfig() } @@ -1215,3 +1219,46 @@ func waitForPrimariesServing( return true }, testutil.WithWaitFor(10*time.Second), testutil.WithTickInterval(50*time.Millisecond)) } + +func (suite *keyspaceGroupManagerTestSuite) TestRegisterAllocatorConflict() { + re := suite.Require() + + kgm := suite.newKeyspaceGroupManager(0, suite.ClusterID, suite.cfg) + re.NotNil(kgm) + defer kgm.Close() + kgm.Initialize() + participant, err := kgm.GetElectionMember(0, 0) + re.NoError(err) + + legacySvcRootPath := keypath.LegacyRootPath(suite.ClusterID) + allocatorKeyPrefix := keypath.GlobalTSOAllocatorsPrefix(suite.ClusterID) + legacySvcStorage := endpoint.NewStorageEndpoint(kv.NewEtcdKVBase(suite.etcdClient, legacySvcRootPath), nil) + member := member.NewMember(suite.servers[0], suite.etcdClient, uint64(suite.servers[0].Server.ID())) + am := NewAllocatorManager(suite.ctx, suite.etcdClient, constant.DefaultKeyspaceGroupID, member, legacySvcRootPath, legacySvcStorage, kgm.cfg, allocatorKeyPrefix, path.Join(allocatorKeyPrefix, "pd")) + gta := NewGlobalTSOAllocator(suite.ctx, am) + err = gta.Initialize(0) + re.NoError(err) + + kam := NewAllocatorManager(kgm.ctx, kgm.etcdClient, constant.DefaultKeyspaceGroupID, participant, kgm.legacySvcRootPath, kgm.legacySvcStorage, kgm.cfg, allocatorKeyPrefix, path.Join(allocatorKeyPrefix, "tso", fmt.Sprintf("keyspace_group_%d", constant.DefaultKeyspaceGroupID))) + kgta := NewGlobalTSOAllocator(suite.ctx, kam) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err = kgta.Initialize(0) + re.NoError(err) + }() + + gta.Reset() + wg.Wait() + + var wg1 sync.WaitGroup + wg1.Add(1) + go func() { + defer wg1.Done() + err = gta.Initialize(0) + re.NoError(err) + }() + kgta.Reset() + wg1.Wait() +}