Skip to content

Commit

Permalink
Fix measure merging issue (#549)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanahmily authored Oct 7, 2024
1 parent d14d599 commit c92ee3a
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Release Notes.
- Fix the bug that the etcd client doesn't reconnect when facing the context timeout in the startup phase.
- Fix the bug that the long running query doesn't stop when the context is canceled.
- Fix the bug that merge block with different tags or fields.
- Fix the bug that the pending measure block is not released when a full block is merged.

### Documentation

Expand Down
4 changes: 2 additions & 2 deletions banyand/measure/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ func (bi *blockPointer) updateMetadata() {
}

func (bi *blockPointer) copyFrom(src *blockPointer) {
bi.idx = 0
bi.reset()
bi.bm.copyFrom(&src.bm)
bi.appendAll(src)
}
Expand Down Expand Up @@ -1006,7 +1006,7 @@ func (bi *blockPointer) isFull() bool {
func (bi *blockPointer) reset() {
bi.idx = 0
bi.block.reset()
bi.bm = blockMetadata{}
bi.bm.reset()
}

func generateBlockPointer() *blockPointer {
Expand Down
3 changes: 2 additions & 1 deletion banyand/measure/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader) (*pa
(pendingBlock.isFull() && pendingBlock.bm.timestamps.max <= b.bm.timestamps.min) {
bw.mustWriteBlock(pendingBlock.bm.seriesID, &pendingBlock.block)
releaseDecoder()
pendingBlock.reset()
br.loadBlockData(getDecoder())
pendingBlock.copyFrom(b)
continue
Expand All @@ -316,6 +315,8 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader) (*pa

if len(tmpBlock.timestamps) <= maxBlockLength {
bw.mustWriteBlock(tmpBlock.bm.seriesID, &tmpBlock.block)
pendingBlock.reset()
pendingBlockIsEmpty = true
releaseDecoder()
continue
}
Expand Down
15 changes: 12 additions & 3 deletions banyand/measure/merger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,23 @@ func Test_mergeParts(t *testing.T) {
{seriesID: 1, count: 1265, uncompressedSizeBytes: 2130260},
{seriesID: 1, count: 1265, uncompressedSizeBytes: 2130260},
{seriesID: 1, count: 2470, uncompressedSizeBytes: 4159480},
{seriesID: 1, count: 2470, uncompressedSizeBytes: 4159480},
{seriesID: 1, count: 2470, uncompressedSizeBytes: 4159480},
{seriesID: 1, count: 2410, uncompressedSizeBytes: 4058440},
{seriesID: 1, count: 1265, uncompressedSizeBytes: 2130260},
{seriesID: 1, count: 1265, uncompressedSizeBytes: 2130260},
{seriesID: 1, count: 1205, uncompressedSizeBytes: 2029220},
{seriesID: 2, count: 2, uncompressedSizeBytes: 126},
{seriesID: 3, count: 2, uncompressedSizeBytes: 64},
},
},
{
name: "Test with multiple parts with a large small quantity of different ts",
dpsList: []*dataPoints{generateHugeSmallDps(1, 5000, 1), generateHugeSmallDps(5001, 10000, 2)},
want: []blockMetadata{
{seriesID: 1, count: 8192, uncompressedSizeBytes: 262144},
{seriesID: 1, count: 1808, uncompressedSizeBytes: 57856},
{seriesID: 2, count: 2, uncompressedSizeBytes: 32},
{seriesID: 3, count: 2, uncompressedSizeBytes: 64},
},
},
}

for _, tt := range tests {
Expand Down
32 changes: 32 additions & 0 deletions banyand/measure/tstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,3 +582,35 @@ func generateHugeDps(startTimestamp, endTimestamp, timestamp int64) *dataPoints
}}...)
return hugeDps
}

func generateHugeSmallDps(startTimestamp, endTimestamp, timestamp int64) *dataPoints {
hugeDps := &dataPoints{
seriesIDs: []common.SeriesID{},
timestamps: []int64{},
versions: []int64{},
tagFamilies: [][]nameValues{},
fields: []nameValues{},
}
now := time.Now().UnixNano()
for i := startTimestamp; i <= endTimestamp; i++ {
hugeDps.seriesIDs = append(hugeDps.seriesIDs, 1)
hugeDps.timestamps = append(hugeDps.timestamps, i)
hugeDps.versions = append(hugeDps.versions, now+i)
hugeDps.tagFamilies = append(hugeDps.tagFamilies, []nameValues{})
hugeDps.fields = append(hugeDps.fields, nameValues{
name: "skipped", values: []*nameValue{
{name: "intField", valueType: pbv1.ValueTypeInt64, value: convert.Int64ToBytes(3330), valueArr: nil},
},
})
}
hugeDps.seriesIDs = append(hugeDps.seriesIDs, []common.SeriesID{2, 3}...)
hugeDps.timestamps = append(hugeDps.timestamps, []int64{timestamp, timestamp}...)
hugeDps.versions = append(hugeDps.versions, []int64{now + timestamp, now + timestamp}...)
hugeDps.tagFamilies = append(hugeDps.tagFamilies, [][]nameValues{{}, {}}...)
hugeDps.fields = append(hugeDps.fields, []nameValues{{}, {
name: "onlyFields", values: []*nameValue{
{name: "intField", valueType: pbv1.ValueTypeInt64, value: convert.Int64ToBytes(4440), valueArr: nil},
},
}}...)
return hugeDps
}

0 comments on commit c92ee3a

Please sign in to comment.