diff --git a/dm/syncer/opt_sharding_group.go b/dm/syncer/opt_sharding_group.go index d87514371ea..ce4060de642 100644 --- a/dm/syncer/opt_sharding_group.go +++ b/dm/syncer/opt_sharding_group.go @@ -233,3 +233,11 @@ func (k *OptShardingGroupKeeper) RemoveSchema(schema string) { } } } + +// Reset resets the keeper. +func (k *OptShardingGroupKeeper) Reset() { + k.Lock() + defer k.Unlock() + k.groups = make(map[string]*OptShardingGroup) + k.shardingReSyncs = make(map[string]binlog.Location) +} diff --git a/dm/syncer/opt_sharding_group_test.go b/dm/syncer/opt_sharding_group_test.go index 88ad02cb873..9cd4eec7e6e 100644 --- a/dm/syncer/opt_sharding_group_test.go +++ b/dm/syncer/opt_sharding_group_test.go @@ -73,6 +73,11 @@ func (s *optShardingGroupSuite) TestLowestFirstPosInOptGroups() { k.removeShardingReSync(&ShardingReSync{targetTable: utils.UnpackTableID(db2tbl)}) // should be pos11 now, pos21 is totally resolved require.Equal(s.T(), pos11.Position, k.lowestFirstLocationInGroups().Position) + + // reset + k.Reset() + require.Len(s.T(), k.groups, 0) + require.Len(s.T(), k.shardingReSyncs, 0) } func (s *optShardingGroupSuite) TestSync() { diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 4545832ff24..d306f151372 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -649,6 +649,7 @@ func (s *Syncer) reset() { s.sgk.ResetGroups() s.pessimist.Reset() case config.ShardOptimistic: + s.osgk.Reset() s.optimist.Reset() } }