Skip to content

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Oct 31, 2024
1 parent dfca43e commit 3b5b4a1
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 8 deletions.
20 changes: 14 additions & 6 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ import (
"google.golang.org/grpc"
)

const ttlSeconds = 3
const (
ttlSeconds = 3
keyspaceGroupPrefix = "keyspace_group"
pdPrefix = "pd"
)

// Allocator is a Timestamp Oracle allocator.
type Allocator interface {
Expand Down Expand Up @@ -135,9 +139,6 @@ func newGlobalTimestampOracle(am *AllocatorManager) *timestampOracle {
// close is used to shutdown the primary election loop.
// tso service call this function to shutdown the loop here, but pd manages its own loop.
func (gta *GlobalTSOAllocator) close() {
if err := gta.deregisterAllocator(); err != nil {
log.Warn("deregister tso allocator failed", zap.String("key", gta.am.allocatorKey), errs.ZapError(err))
}
gta.cancel()
gta.wg.Wait()
}
Expand Down Expand Up @@ -540,6 +541,9 @@ func (gta *GlobalTSOAllocator) getCurrentTSO(ctx context.Context) (*pdpb.Timesta
func (gta *GlobalTSOAllocator) Reset() {
gta.tsoAllocatorRoleGauge.Set(0)
gta.timestampOracle.ResetTimestamp()
if err := gta.deregisterAllocator(); err != nil {
log.Warn("deregister tso allocator failed", zap.String("key", gta.am.allocatorKey), errs.ZapError(err))
}
}

// primaryElectionLoop is used to maintain the TSO primary election and TSO's
Expand Down Expand Up @@ -803,11 +807,15 @@ func (gta *GlobalTSOAllocator) register() (<-chan *clientv3.LeaseKeepAliveRespon
if err != nil {
return nil, true
}
allocatorKey := strings.TrimLeft(gta.am.allocatorKey, gta.am.allocatorKeyPrefix)
// wait for the previous allocator with different mode to be deregistered
if len(resp.Kvs) > 0 {
for _, kv := range resp.Kvs {
key := string(kv.Key)
if !strings.Contains(key, gta.am.allocatorKeyPrefix) {
key := strings.TrimLeft(string(kv.Key), gta.am.allocatorKeyPrefix)
if strings.Contains(allocatorKey, keyspaceGroupPrefix) && strings.Contains(key, pdPrefix) {
return nil, true
}
if strings.Contains(allocatorKey, pdPrefix) && strings.Contains(key, keyspaceGroupPrefix) {
return nil, true
}
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,9 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest(
if err != nil {
return err
}
if allocator.IsInitialize() {
return nil
}
// TODO: support the Local TSO Allocator.
return allocator.Initialize(0)
})
Expand Down
51 changes: 49 additions & 2 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import (
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/syncutil"
Expand All @@ -43,6 +45,7 @@ import (
"github.com/tikv/pd/pkg/utils/typeutil"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
"go.uber.org/goleak"
)

Expand All @@ -59,6 +62,7 @@ type keyspaceGroupManagerTestSuite struct {
etcdClient *clientv3.Client
clean func()
cfg *TestServiceConfig
servers []*embed.Etcd
}

func TestKeyspaceGroupManagerTestSuite(t *testing.T) {
Expand All @@ -69,8 +73,8 @@ func (suite *keyspaceGroupManagerTestSuite) SetupSuite() {
t := suite.T()
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.ClusterID = rand.Uint64()
servers, client, clean := etcdutil.NewTestEtcdCluster(t, 1)
suite.backendEndpoints, suite.etcdClient, suite.clean = servers[0].Config().ListenClientUrls[0].String(), client, clean
suite.servers, suite.etcdClient, suite.clean = etcdutil.NewTestEtcdCluster(t, 1)
suite.backendEndpoints = suite.servers[0].Config().ListenClientUrls[0].String()
suite.cfg = suite.createConfig()
}

Expand Down Expand Up @@ -1217,3 +1221,46 @@ func waitForPrimariesServing(
return true
}, testutil.WithWaitFor(10*time.Second), testutil.WithTickInterval(50*time.Millisecond))
}

func (suite *keyspaceGroupManagerTestSuite) TestRegisterAllocatorConflict() {
re := suite.Require()

kgm := suite.newKeyspaceGroupManager(0, suite.ClusterID, suite.cfg)
re.NotNil(kgm)
defer kgm.Close()
kgm.Initialize()
participant, err := kgm.GetElectionMember(0, 0)
re.NoError(err)

legacySvcRootPath := keypath.LegacyRootPath(suite.ClusterID)
allocatorKeyPrefix := keypath.GlobalTSOAllocatorsPrefix(suite.ClusterID)
legacySvcStorage := endpoint.NewStorageEndpoint(kv.NewEtcdKVBase(suite.etcdClient, legacySvcRootPath), nil)
member := member.NewMember(suite.servers[0], suite.etcdClient, uint64(suite.servers[0].Server.ID()))
am := NewAllocatorManager(suite.ctx, suite.etcdClient, constant.DefaultKeyspaceGroupID, member, legacySvcRootPath, legacySvcStorage, kgm.cfg, allocatorKeyPrefix, path.Join(allocatorKeyPrefix, "pd"))
gta := NewGlobalTSOAllocator(suite.ctx, am)
err = gta.Initialize(0)
re.NoError(err)

kam := NewAllocatorManager(kgm.ctx, kgm.etcdClient, constant.DefaultKeyspaceGroupID, participant, kgm.legacySvcRootPath, kgm.legacySvcStorage, kgm.cfg, allocatorKeyPrefix, path.Join(allocatorKeyPrefix, "tso", fmt.Sprintf("keyspace_group_%d", constant.DefaultKeyspaceGroupID)))
kgta := NewGlobalTSOAllocator(suite.ctx, kam)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err = kgta.Initialize(0)
re.NoError(err)
}()

gta.Reset()
wg.Wait()

var wg1 sync.WaitGroup
wg1.Add(1)
go func() {
defer wg1.Done()
err = gta.Initialize(0)
re.NoError(err)
}()
kgta.Reset()
wg1.Wait()
}

0 comments on commit 3b5b4a1

Please sign in to comment.