Skip to content

Commit

Permalink
add growing&sealed indexes
Browse files Browse the repository at this point in the history
Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper committed Nov 25, 2024
1 parent 9b742ad commit 89d0be4
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 91 deletions.
153 changes: 79 additions & 74 deletions internal/datacoord/segment_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ type SegmentManager struct {
allocator allocator.Allocator
helper allocHelper

channelLock *lock.KeyLock[string]
channel2Segments *typeutil.ConcurrentMap[string, typeutil.UniqueSet]
channelLock *lock.KeyLock[string]
channel2Growing *typeutil.ConcurrentMap[string, typeutil.UniqueSet]
channel2Sealed *typeutil.ConcurrentMap[string, typeutil.UniqueSet]

// Policies
estimatePolicy calUpperLimitPolicy
Expand Down Expand Up @@ -218,7 +219,8 @@ func newSegmentManager(meta *meta, allocator allocator.Allocator, opts ...allocO
allocator: allocator,
helper: defaultAllocHelper(),
channelLock: lock.NewKeyLock[string](),
channel2Segments: typeutil.NewConcurrentMap[string, typeutil.UniqueSet](),
channel2Growing: typeutil.NewConcurrentMap[string, typeutil.UniqueSet](),
channel2Sealed: typeutil.NewConcurrentMap[string, typeutil.UniqueSet](),
estimatePolicy: defaultCalUpperLimitPolicy(),
allocPolicy: defaultAllocatePolicy(),
segmentSealPolicies: defaultSegmentSealPolicy(),
Expand Down Expand Up @@ -246,15 +248,20 @@ func (s *SegmentManager) loadSegmentsFromMeta(latestTs Timestamp) {
return segment.GetInsertChannel()
})
for channel, segmentInfos := range channel2Segments {
segments := typeutil.NewUniqueSet()
growing := typeutil.NewUniqueSet()
sealed := typeutil.NewUniqueSet()
for _, segment := range segmentInfos {
// for all sealed and growing segments, need to reset last expire
if segment != nil && segment.GetState() == commonpb.SegmentState_Growing {
s.meta.SetLastExpire(segment.GetID(), latestTs)
growing.Insert(segment.GetID())
}
if segment != nil && segment.GetState() == commonpb.SegmentState_Sealed {
sealed.Insert(segment.GetID())
}
segments.Insert(segment.GetID())
}
s.channel2Segments.Insert(channel, segments)
s.channel2Growing.Insert(channel, growing)
s.channel2Sealed.Insert(channel, sealed)
}
}

Expand Down Expand Up @@ -293,16 +300,12 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID

// filter segments
segmentInfos := make([]*SegmentInfo, 0)
segments, _ := s.channel2Segments.GetOrInsert(channelName, typeutil.NewUniqueSet())
segments.Range(func(segmentID int64) bool {
growing, _ := s.channel2Growing.Get(channelName)
growing.Range(func(segmentID int64) bool {
segment := s.meta.GetHealthySegment(segmentID)
if segment == nil {
log.Warn("failed to get segment, remove it", zap.String("channel", channelName), zap.Int64("segmentID", segmentID))
segments.Remove(segmentID)
return true
}

if !isGrowing(segment) || segment.GetLevel() == datapb.SegmentLevel_L0 {
growing.Remove(segmentID)
return true
}
segmentInfos = append(segmentInfos, segment)
Expand Down Expand Up @@ -347,10 +350,6 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
return allocations, nil
}

func isGrowing(segment *SegmentInfo) bool {
return segment.GetState() == commonpb.SegmentState_Growing
}

func (s *SegmentManager) genExpireTs(ctx context.Context) (Timestamp, error) {
ts, err := s.allocator.AllocTimestamp(ctx)
if err != nil {
Expand Down Expand Up @@ -404,8 +403,8 @@ func (s *SegmentManager) openNewSegmentWithGivenSegmentID(ctx context.Context, c
log.Error("failed to add segment to DataCoord", zap.Error(err))
return nil, err
}
segments, _ := s.channel2Segments.GetOrInsert(channelName, typeutil.NewUniqueSet())
segments.Insert(segmentID)
growing, _ := s.channel2Growing.GetOrInsert(channelName, typeutil.NewUniqueSet())
growing.Insert(segmentID)
log.Info("datacoord: estimateTotalRows: ",
zap.Int64("CollectionID", segmentInfo.CollectionID),
zap.Int64("SegmentID", segmentInfo.ID),
Expand All @@ -432,8 +431,11 @@ func (s *SegmentManager) DropSegment(ctx context.Context, channel string, segmen
s.channelLock.Lock(channel)
defer s.channelLock.Unlock(channel)

if segments, ok := s.channel2Segments.Get(channel); ok {
segments.Remove(segmentID)
if growing, ok := s.channel2Growing.Get(channel); ok {
growing.Remove(segmentID)
}
if sealed, ok := s.channel2Sealed.Get(channel); ok {
sealed.Remove(segmentID)
}

segment := s.meta.GetHealthySegment(segmentID)
Expand All @@ -455,29 +457,37 @@ func (s *SegmentManager) SealAllSegments(ctx context.Context, channel string, se
s.channelLock.Lock(channel)
defer s.channelLock.Unlock(channel)

segments, ok := s.channel2Segments.Get(channel)
sealed, _ := s.channel2Sealed.GetOrInsert(channel, typeutil.NewUniqueSet())
growing, ok := s.channel2Growing.Get(channel)
if !ok {
return nil, nil
return sealed.Collect(), nil
}
segCandidates := segments.Collect()

sealedSegments := s.meta.GetSegments(sealed.Collect(), func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment)
})
growingSegments := s.meta.GetSegments(growing.Collect(), func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment)
})

if len(segIDs) != 0 {
segCandidates = segIDs
sealedSegments = s.meta.GetSegments(segIDs, func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Sealed
})
growingSegments = s.meta.GetSegments(segIDs, func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Growing
})
}

var ret []UniqueID

sealedSegments := s.meta.GetSegments(segCandidates, func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Sealed
})
growingSegments := s.meta.GetSegments(segCandidates, func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Growing
})
ret = append(ret, sealedSegments...)

for _, id := range growingSegments {
if err := s.meta.SetState(id, commonpb.SegmentState_Sealed); err != nil {
return nil, err
}
sealed.Insert(id)
growing.Remove(id)
ret = append(ret, id)
}
return ret, nil
Expand All @@ -499,13 +509,13 @@ func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel strin
// TODO: It's too frequent; perhaps each channel could check once per minute instead.
s.cleanupSealedSegment(t, channel)

segments, ok := s.channel2Segments.Get(channel)
sealed, ok := s.channel2Sealed.Get(channel)
if !ok {
return nil, nil
}

ret := make([]UniqueID, 0, segments.Len())
segments.Range(func(segmentID int64) bool {
ret := make([]UniqueID, 0, sealed.Len())
sealed.Range(func(segmentID int64) bool {
info := s.meta.GetHealthySegment(segmentID)
if info == nil {
return true
Expand All @@ -524,16 +534,16 @@ func (s *SegmentManager) ExpireAllocations(channel string, ts Timestamp) {
s.channelLock.Lock(channel)
defer s.channelLock.Unlock(channel)

segments, ok := s.channel2Segments.Get(channel)
growing, ok := s.channel2Growing.Get(channel)
if !ok {
return
}

segments.Range(func(id int64) bool {
growing.Range(func(id int64) bool {
segment := s.meta.GetHealthySegment(id)
if segment == nil {
log.Warn("failed to get segment, remove it", zap.String("channel", channel), zap.Int64("segmentID", id))
segments.Remove(id)
growing.Remove(id)
return true
}
allocations := make([]*Allocation, 0, len(segment.allocations))
Expand All @@ -551,54 +561,49 @@ func (s *SegmentManager) ExpireAllocations(channel string, ts Timestamp) {
}

func (s *SegmentManager) cleanupSealedSegment(ts Timestamp, channel string) {
segments, ok := s.channel2Segments.Get(channel)
sealed, ok := s.channel2Sealed.Get(channel)
if !ok {
return
}
segments.Range(func(id int64) bool {
sealed.Range(func(id int64) bool {
segment := s.meta.GetHealthySegment(id)
if segment == nil {
log.Warn("failed to get segment, remove it", zap.String("channel", channel), zap.Int64("segmentID", id))
segments.Remove(id)
sealed.Remove(id)
return true
}
if isEmptySealedSegment(segment, ts) {
// Check if segment is empty
if segment.GetLastExpireTime() <= ts && segment.currRows == 0 {
log.Info("remove empty sealed segment", zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id))
if err := s.meta.SetState(id, commonpb.SegmentState_Dropped); err != nil {
log.Warn("failed to set segment state to dropped", zap.String("channel", channel),
zap.Int64("segmentID", id), zap.Error(err))
} else {
segments.Remove(id)
sealed.Remove(id)
}
}
return true
})
}

func isEmptySealedSegment(segment *SegmentInfo, ts Timestamp) bool {
return segment.GetState() == commonpb.SegmentState_Sealed && segment.GetLastExpireTime() <= ts && segment.currRows == 0
}

// tryToSealSegment applies segment & channel seal policies
func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error {
segments, ok := s.channel2Segments.Get(channel)
growing, ok := s.channel2Growing.Get(channel)
if !ok {
return nil
}
sealed, _ := s.channel2Sealed.GetOrInsert(channel, typeutil.NewUniqueSet())

channelInfo := make(map[string][]*SegmentInfo)
channelSegmentInfos := make([]*SegmentInfo, 0, len(growing))
sealedSegments := make(map[int64]struct{})

var setStateErr error
segments.Range(func(id int64) bool {
growing.Range(func(id int64) bool {
info := s.meta.GetHealthySegment(id)
if info == nil {
return true
}
channelInfo[info.InsertChannel] = append(channelInfo[info.InsertChannel], info)
if info.State != commonpb.SegmentState_Growing {
return true
}
channelSegmentInfos = append(channelSegmentInfos, info)
// change shouldSeal to segment seal policy logic
for _, policy := range s.segmentSealPolicies {
if shouldSeal, reason := policy.ShouldSeal(info, ts); shouldSeal {
Expand All @@ -608,6 +613,8 @@ func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error {
return false
}
sealedSegments[id] = struct{}{}
sealed.Insert(id)
growing.Remove(id)
break
}
}
Expand All @@ -618,23 +625,20 @@ func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error {
return setStateErr
}

for channel, segmentInfos := range channelInfo {
for _, policy := range s.channelSealPolicies {
vs, reason := policy(channel, segmentInfos, ts)
for _, info := range vs {
if _, ok := sealedSegments[info.GetID()]; ok {
continue
}
if info.State != commonpb.SegmentState_Growing {
continue
}
if err := s.meta.SetState(info.GetID(), commonpb.SegmentState_Sealed); err != nil {
return err
}
log.Info("seal segment for channel seal policy matched",
zap.Int64("segmentID", info.GetID()), zap.String("channel", channel), zap.String("reason", reason))
sealedSegments[info.GetID()] = struct{}{}
for _, policy := range s.channelSealPolicies {
vs, reason := policy(channel, channelSegmentInfos, ts)
for _, info := range vs {
if _, ok := sealedSegments[info.GetID()]; ok {
continue
}
if err := s.meta.SetState(info.GetID(), commonpb.SegmentState_Sealed); err != nil {
return err
}
log.Info("seal segment for channel seal policy matched",
zap.Int64("segmentID", info.GetID()), zap.String("channel", channel), zap.String("reason", reason))
sealedSegments[info.GetID()] = struct{}{}
sealed.Insert(info.GetID())
growing.Remove(info.GetID())
}
}
return nil
Expand All @@ -645,15 +649,15 @@ func (s *SegmentManager) DropSegmentsOfChannel(ctx context.Context, channel stri
s.channelLock.Lock(channel)
defer s.channelLock.Unlock(channel)

segments, ok := s.channel2Segments.Get(channel)
growing, ok := s.channel2Growing.Get(channel)
if !ok {
return
}
segments.Range(func(sid int64) bool {
growing.Range(func(sid int64) bool {
segment := s.meta.GetHealthySegment(sid)
if segment == nil {
log.Warn("failed to get segment, remove it", zap.String("channel", channel), zap.Int64("segmentID", sid))
segments.Remove(sid)
growing.Remove(sid)
return true
}
s.meta.SetAllocations(sid, nil)
Expand All @@ -662,5 +666,6 @@ func (s *SegmentManager) DropSegmentsOfChannel(ctx context.Context, channel stri
}
return true
})
s.channel2Segments.Remove(channel)
s.channel2Growing.Remove(channel)
s.channel2Sealed.Remove(channel)
}
Loading

0 comments on commit 89d0be4

Please sign in to comment.