diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index f100ec60d9241..179cd2321db73 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -113,8 +113,9 @@ type SegmentManager struct { allocator allocator.Allocator helper allocHelper - channelLock *lock.KeyLock[string] - channel2Segments *typeutil.ConcurrentMap[string, typeutil.UniqueSet] + channelLock *lock.KeyLock[string] + channel2Growing *typeutil.ConcurrentMap[string, typeutil.UniqueSet] + channel2Sealed *typeutil.ConcurrentMap[string, typeutil.UniqueSet] // Policies estimatePolicy calUpperLimitPolicy @@ -218,7 +219,8 @@ func newSegmentManager(meta *meta, allocator allocator.Allocator, opts ...allocO allocator: allocator, helper: defaultAllocHelper(), channelLock: lock.NewKeyLock[string](), - channel2Segments: typeutil.NewConcurrentMap[string, typeutil.UniqueSet](), + channel2Growing: typeutil.NewConcurrentMap[string, typeutil.UniqueSet](), + channel2Sealed: typeutil.NewConcurrentMap[string, typeutil.UniqueSet](), estimatePolicy: defaultCalUpperLimitPolicy(), allocPolicy: defaultAllocatePolicy(), segmentSealPolicies: defaultSegmentSealPolicy(), @@ -246,15 +248,20 @@ func (s *SegmentManager) loadSegmentsFromMeta(latestTs Timestamp) { return segment.GetInsertChannel() }) for channel, segmentInfos := range channel2Segments { - segments := typeutil.NewUniqueSet() + growing := typeutil.NewUniqueSet() + sealed := typeutil.NewUniqueSet() for _, segment := range segmentInfos { // for all sealed and growing segments, need to reset last expire if segment != nil && segment.GetState() == commonpb.SegmentState_Growing { s.meta.SetLastExpire(segment.GetID(), latestTs) + growing.Insert(segment.GetID()) + } + if segment != nil && segment.GetState() == commonpb.SegmentState_Sealed { + sealed.Insert(segment.GetID()) } - segments.Insert(segment.GetID()) } - s.channel2Segments.Insert(channel, segments) + s.channel2Growing.Insert(channel, growing) + s.channel2Sealed.Insert(channel, sealed) } } @@ -293,16 +300,12 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID // filter segments segmentInfos := make([]*SegmentInfo, 0) - segments, _ := s.channel2Segments.GetOrInsert(channelName, typeutil.NewUniqueSet()) - segments.Range(func(segmentID int64) bool { + growing, _ := s.channel2Growing.Get(channelName) + growing.Range(func(segmentID int64) bool { segment := s.meta.GetHealthySegment(segmentID) if segment == nil { log.Warn("failed to get segment, remove it", zap.String("channel", channelName), zap.Int64("segmentID", segmentID)) - segments.Remove(segmentID) - return true - } - - if !isGrowing(segment) || segment.GetLevel() == datapb.SegmentLevel_L0 { + growing.Remove(segmentID) return true } segmentInfos = append(segmentInfos, segment) @@ -347,10 +350,6 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID return allocations, nil } -func isGrowing(segment *SegmentInfo) bool { - return segment.GetState() == commonpb.SegmentState_Growing -} - func (s *SegmentManager) genExpireTs(ctx context.Context) (Timestamp, error) { ts, err := s.allocator.AllocTimestamp(ctx) if err != nil { @@ -404,8 +403,8 @@ func (s *SegmentManager) openNewSegmentWithGivenSegmentID(ctx context.Context, c log.Error("failed to add segment to DataCoord", zap.Error(err)) return nil, err } - segments, _ := s.channel2Segments.GetOrInsert(channelName, typeutil.NewUniqueSet()) - segments.Insert(segmentID) + growing, _ := s.channel2Growing.GetOrInsert(channelName, typeutil.NewUniqueSet()) + growing.Insert(segmentID) log.Info("datacoord: estimateTotalRows: ", zap.Int64("CollectionID", segmentInfo.CollectionID), zap.Int64("SegmentID", segmentInfo.ID), @@ -432,8 +431,11 @@ func (s *SegmentManager) DropSegment(ctx context.Context, channel string, segmen s.channelLock.Lock(channel) defer s.channelLock.Unlock(channel) - if segments, ok := s.channel2Segments.Get(channel); ok { - segments.Remove(segmentID) + if growing, ok := s.channel2Growing.Get(channel); ok { + growing.Remove(segmentID) + } + if sealed, ok := s.channel2Sealed.Get(channel); ok { + sealed.Remove(segmentID) } segment := s.meta.GetHealthySegment(segmentID) @@ -455,29 +457,37 @@ func (s *SegmentManager) SealAllSegments(ctx context.Context, channel string, se s.channelLock.Lock(channel) defer s.channelLock.Unlock(channel) - segments, ok := s.channel2Segments.Get(channel) + sealed, _ := s.channel2Sealed.GetOrInsert(channel, typeutil.NewUniqueSet()) + growing, ok := s.channel2Growing.Get(channel) if !ok { - return nil, nil + return sealed.Collect(), nil } - segCandidates := segments.Collect() + + sealedSegments := s.meta.GetSegments(sealed.Collect(), func(segment *SegmentInfo) bool { + return isSegmentHealthy(segment) + }) + growingSegments := s.meta.GetSegments(growing.Collect(), func(segment *SegmentInfo) bool { + return isSegmentHealthy(segment) + }) + if len(segIDs) != 0 { - segCandidates = segIDs + sealedSegments = s.meta.GetSegments(segIDs, func(segment *SegmentInfo) bool { + return isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Sealed + }) + growingSegments = s.meta.GetSegments(segIDs, func(segment *SegmentInfo) bool { + return isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Growing + }) } var ret []UniqueID - - sealedSegments := s.meta.GetSegments(segCandidates, func(segment *SegmentInfo) bool { - return isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Sealed - }) - growingSegments := s.meta.GetSegments(segCandidates, func(segment *SegmentInfo) bool { - return isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Growing - }) ret = append(ret, sealedSegments...) for _, id := range growingSegments { if err := s.meta.SetState(id, commonpb.SegmentState_Sealed); err != nil { return nil, err } + sealed.Insert(id) + growing.Remove(id) ret = append(ret, id) } return ret, nil @@ -499,13 +509,13 @@ func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel strin // TODO: It's too frequent; perhaps each channel could check once per minute instead. s.cleanupSealedSegment(t, channel) - segments, ok := s.channel2Segments.Get(channel) + sealed, ok := s.channel2Sealed.Get(channel) if !ok { return nil, nil } - ret := make([]UniqueID, 0, segments.Len()) - segments.Range(func(segmentID int64) bool { + ret := make([]UniqueID, 0, sealed.Len()) + sealed.Range(func(segmentID int64) bool { info := s.meta.GetHealthySegment(segmentID) if info == nil { return true @@ -524,16 +534,16 @@ func (s *SegmentManager) ExpireAllocations(channel string, ts Timestamp) { s.channelLock.Lock(channel) defer s.channelLock.Unlock(channel) - segments, ok := s.channel2Segments.Get(channel) + growing, ok := s.channel2Growing.Get(channel) if !ok { return } - segments.Range(func(id int64) bool { + growing.Range(func(id int64) bool { segment := s.meta.GetHealthySegment(id) if segment == nil { log.Warn("failed to get segment, remove it", zap.String("channel", channel), zap.Int64("segmentID", id)) - segments.Remove(id) + growing.Remove(id) return true } allocations := make([]*Allocation, 0, len(segment.allocations)) @@ -551,54 +561,49 @@ func (s *SegmentManager) ExpireAllocations(channel string, ts Timestamp) { } func (s *SegmentManager) cleanupSealedSegment(ts Timestamp, channel string) { - segments, ok := s.channel2Segments.Get(channel) + sealed, ok := s.channel2Sealed.Get(channel) if !ok { return } - segments.Range(func(id int64) bool { + sealed.Range(func(id int64) bool { segment := s.meta.GetHealthySegment(id) if segment == nil { log.Warn("failed to get segment, remove it", zap.String("channel", channel), zap.Int64("segmentID", id)) - segments.Remove(id) + sealed.Remove(id) return true } - if isEmptySealedSegment(segment, ts) { + // Check if segment is empty + if segment.GetLastExpireTime() <= ts && segment.currRows == 0 { log.Info("remove empty sealed segment", zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id)) if err := s.meta.SetState(id, commonpb.SegmentState_Dropped); err != nil { log.Warn("failed to set segment state to dropped", zap.String("channel", channel), zap.Int64("segmentID", id), zap.Error(err)) } else { - segments.Remove(id) + sealed.Remove(id) } } return true }) } -func isEmptySealedSegment(segment *SegmentInfo, ts Timestamp) bool { - return segment.GetState() == commonpb.SegmentState_Sealed && segment.GetLastExpireTime() <= ts && segment.currRows == 0 -} - // tryToSealSegment applies segment & channel seal policies func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error { - segments, ok := s.channel2Segments.Get(channel) + growing, ok := s.channel2Growing.Get(channel) if !ok { return nil } + sealed, _ := s.channel2Sealed.GetOrInsert(channel, typeutil.NewUniqueSet()) - channelInfo := make(map[string][]*SegmentInfo) + channelSegmentInfos := make([]*SegmentInfo, 0, len(growing)) sealedSegments := make(map[int64]struct{}) var setStateErr error - segments.Range(func(id int64) bool { + growing.Range(func(id int64) bool { info := s.meta.GetHealthySegment(id) if info == nil { return true } - channelInfo[info.InsertChannel] = append(channelInfo[info.InsertChannel], info) - if info.State != commonpb.SegmentState_Growing { - return true - } + channelSegmentInfos = append(channelSegmentInfos, info) // change shouldSeal to segment seal policy logic for _, policy := range s.segmentSealPolicies { if shouldSeal, reason := policy.ShouldSeal(info, ts); shouldSeal { @@ -608,6 +613,8 @@ func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error { return false } sealedSegments[id] = struct{}{} + sealed.Insert(id) + growing.Remove(id) break } } @@ -618,23 +625,20 @@ func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error { return setStateErr } - for channel, segmentInfos := range channelInfo { - for _, policy := range s.channelSealPolicies { - vs, reason := policy(channel, segmentInfos, ts) - for _, info := range vs { - if _, ok := sealedSegments[info.GetID()]; ok { - continue - } - if info.State != commonpb.SegmentState_Growing { - continue - } - if err := s.meta.SetState(info.GetID(), commonpb.SegmentState_Sealed); err != nil { - return err - } - log.Info("seal segment for channel seal policy matched", - zap.Int64("segmentID", info.GetID()), zap.String("channel", channel), zap.String("reason", reason)) - sealedSegments[info.GetID()] = struct{}{} + for _, policy := range s.channelSealPolicies { + vs, reason := policy(channel, channelSegmentInfos, ts) + for _, info := range vs { + if _, ok := sealedSegments[info.GetID()]; ok { + continue + } + if err := s.meta.SetState(info.GetID(), commonpb.SegmentState_Sealed); err != nil { + return err } + log.Info("seal segment for channel seal policy matched", + zap.Int64("segmentID", info.GetID()), zap.String("channel", channel), zap.String("reason", reason)) + sealedSegments[info.GetID()] = struct{}{} + sealed.Insert(info.GetID()) + growing.Remove(info.GetID()) } } return nil @@ -645,15 +649,15 @@ func (s *SegmentManager) DropSegmentsOfChannel(ctx context.Context, channel stri s.channelLock.Lock(channel) defer s.channelLock.Unlock(channel) - segments, ok := s.channel2Segments.Get(channel) + growing, ok := s.channel2Growing.Get(channel) if !ok { return } - segments.Range(func(sid int64) bool { + growing.Range(func(sid int64) bool { segment := s.meta.GetHealthySegment(sid) if segment == nil { log.Warn("failed to get segment, remove it", zap.String("channel", channel), zap.Int64("segmentID", sid)) - segments.Remove(sid) + growing.Remove(sid) return true } s.meta.SetAllocations(sid, nil) @@ -662,5 +666,6 @@ func (s *SegmentManager) DropSegmentsOfChannel(ctx context.Context, channel stri } return true }) - s.channel2Segments.Remove(channel) + s.channel2Growing.Remove(channel) + s.channel2Sealed.Remove(channel) } diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index 1c2377134e03d..80ba0257dc388 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -152,7 +152,7 @@ func TestAllocSegment(t *testing.T) { allocations1, err := segmentManager.AllocSegment(ctx, collID, partitionID, vchannel, 100) assert.NoError(t, err) assert.EqualValues(t, 1, len(allocations1)) - segments, ok := segmentManager.channel2Segments.Get(vchannel) + segments, ok := segmentManager.channel2Growing.Get(vchannel) assert.True(t, ok) assert.EqualValues(t, 1, segments.Len()) @@ -163,7 +163,7 @@ func TestAllocSegment(t *testing.T) { assert.NoError(t, err) assert.EqualValues(t, 1, len(allocations2)) // clear old healthy and alloc new - segments, ok = segmentManager.channel2Segments.Get(vchannel) + segments, ok = segmentManager.channel2Growing.Get(vchannel) assert.True(t, ok) assert.EqualValues(t, 1, segments.Len()) assert.NotEqual(t, allocations1[0].SegmentID, allocations2[0].SegmentID) @@ -321,9 +321,12 @@ func TestLoadSegmentsFromMeta(t *testing.T) { segmentManager, err := newSegmentManager(meta, mockAllocator) assert.NoError(t, err) - segments, ok := segmentManager.channel2Segments.Get(vchannel) + growing, ok := segmentManager.channel2Growing.Get(vchannel) assert.True(t, ok) - assert.EqualValues(t, 2, segments.Len()) + assert.EqualValues(t, 1, growing.Len()) + sealed, ok := segmentManager.channel2Sealed.Get(vchannel) + assert.True(t, ok) + assert.EqualValues(t, 1, sealed.Len()) } func TestSaveSegmentsToMeta(t *testing.T) { @@ -791,7 +794,7 @@ func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) { ID: 1, PartitionID: partitionID, InsertChannel: "ch1", - State: commonpb.SegmentState_Flushed, + State: commonpb.SegmentState_Sealed, }, }, 2: { @@ -799,7 +802,7 @@ func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) { ID: 2, PartitionID: partitionID, InsertChannel: "ch2", - State: commonpb.SegmentState_Flushed, + State: commonpb.SegmentState_Growing, }, }, }, @@ -823,7 +826,7 @@ func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) { ID: 1, PartitionID: partitionID, InsertChannel: "ch1", - State: commonpb.SegmentState_Dropped, + State: commonpb.SegmentState_Sealed, }, }, 2: { @@ -848,9 +851,10 @@ func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := &SegmentManager{ - meta: tt.fields.meta, - channelLock: lock.NewKeyLock[string](), - channel2Segments: typeutil.NewConcurrentMap[string, typeutil.UniqueSet](), + meta: tt.fields.meta, + channelLock: lock.NewKeyLock[string](), + channel2Growing: typeutil.NewConcurrentMap[string, typeutil.UniqueSet](), + channel2Sealed: typeutil.NewConcurrentMap[string, typeutil.UniqueSet](), } for _, segmentID := range tt.fields.segments { segmentInfo := tt.fields.meta.GetSegment(segmentID) @@ -858,16 +862,23 @@ func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) { if segmentInfo != nil { channel = segmentInfo.GetInsertChannel() } - segments, _ := s.channel2Segments.GetOrInsert(channel, typeutil.NewUniqueSet()) - segments.Insert(segmentID) + if segmentInfo.GetState() == commonpb.SegmentState_Sealed { + sealed, _ := s.channel2Sealed.GetOrInsert(channel, typeutil.NewUniqueSet()) + sealed.Insert(segmentID) + } + if segmentInfo.GetState() == commonpb.SegmentState_Sealed { + growing, _ := s.channel2Growing.GetOrInsert(channel, typeutil.NewUniqueSet()) + growing.Insert(segmentID) + } } s.DropSegmentsOfChannel(context.TODO(), tt.args.channel) all := make([]int64, 0) - s.channel2Segments.Range(func(_ string, segments typeutil.UniqueSet) bool { - segments.Range(func(segmentID int64) bool { - all = append(all, segmentID) - return true - }) + s.channel2Sealed.Range(func(_ string, segments typeutil.UniqueSet) bool { + all = append(all, segments.Collect()...) + return true + }) + s.channel2Growing.Range(func(_ string, segments typeutil.UniqueSet) bool { + all = append(all, segments.Collect()...) return true }) assert.ElementsMatch(t, tt.want, all)