Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AutoCompact & CompactWithDeprecateTable Support #165

Merged
merged 62 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
96b9c54
fix the deadlock when closing db
KANIOYH Jul 1, 2024
d6d166b
Add deprecatedtable and test, import google.uuid.
KANIOYH Jul 3, 2024
34cdba4
Add vlog&bptree uuid support, and update some related tests.
KANIOYH Jul 6, 2024
9acf701
Merge branch 'lotusdblabs:main' into autoCompaction
KANIOYH Jul 6, 2024
504614d
Merge branch 'lotusdblabs:main' into main
KANIOYH Jul 11, 2024
1cb5953
Add deprecatedtable and test, import google.uuid.
KANIOYH Jul 11, 2024
8fcb7dd
Add vlog&bptree uuid support, and update some related tests.
KANIOYH Jul 6, 2024
7ee80ca
deprecatedtable add lower and upper threshold, which used for notify …
KANIOYH Jul 10, 2024
9c554b9
Add autocompact framwork, this commit just for merge TestComapct bug.
KANIOYH Jul 11, 2024
f8e4d1a
This commit just for comparison, output:
KANIOYH Jul 17, 2024
cce252a
add iostate support
KANIOYH Jul 24, 2024
fe64281
merge commit
KANIOYH Jul 24, 2024
88a11c7
This commit can be fully tested by adding uuid and deprecateable tabl…
KANIOYH Jul 24, 2024
8fadeb5
this commit for pr.
KANIOYH Aug 21, 2024
f54b54d
fix some formats
KANIOYH Aug 21, 2024
b0e8c6d
fix format
KANIOYH Aug 21, 2024
28bc922
fix format for lint
KANIOYH Aug 21, 2024
39af1a2
fix format
KANIOYH Aug 21, 2024
a77fda5
golint disable gosec
KANIOYH Aug 21, 2024
78be1f7
gomat files
KANIOYH Aug 21, 2024
efa3a45
bptree put return old uuid.
KANIOYH Aug 29, 2024
0a1d520
bptree delete return old entry.
KANIOYH Aug 30, 2024
7931399
format golint.
KANIOYH Aug 30, 2024
b9278b4
fix test db path bug.
KANIOYH Aug 30, 2024
78cbdb0
fix test filename bug.
KANIOYH Aug 30, 2024
0f3f828
Merge branch 'autoCompact' of https://github.com/KANIOYH/lotusdb into…
KANIOYH Aug 30, 2024
b5738fc
support diskio monitor.
KANIOYH Sep 4, 2024
09dcc11
format.
KANIOYH Sep 4, 2024
6178b72
IO monitoring set threshold by collecting IO bandwith in flushmemtable.
KANIOYH Sep 12, 2024
cf81d1d
fmt
KANIOYH Sep 12, 2024
2c5b5eb
IO monitor uses IoTime rate.
KANIOYH Sep 12, 2024
862d564
use rate to check deprecatedtable, persist total number in vlog.
KANIOYH Sep 20, 2024
543c9c0
fmt
KANIOYH Sep 20, 2024
2bc2594
fix testdb name
KANIOYH Sep 20, 2024
678888f
fix deadlock in autocompact
KANIOYH Sep 20, 2024
787a05b
reduce test size
KANIOYH Sep 20, 2024
05dc5ce
autocompact coroutine graceful exit
KANIOYH Sep 20, 2024
649aed3
try fix err:segment file is closed when close db
KANIOYH Sep 20, 2024
5421156
fmt
KANIOYH Sep 20, 2024
407af0d
send compact msg with db.mu
KANIOYH Sep 20, 2024
c8c6100
fmt
KANIOYH Sep 20, 2024
4c06433
without busyIO
KANIOYH Sep 20, 2024
573885d
Merge branch 'autoCompact' of https://github.com/KANIOYH/lotusdb into…
KANIOYH Sep 20, 2024
03db08f
diskio only work in linux
KANIOYH Sep 20, 2024
0955590
Merge branch 'autoCompact' of https://github.com/KANIOYH/lotusdb into…
KANIOYH Sep 20, 2024
249981f
golint
KANIOYH Sep 20, 2024
a0c3efc
fmt
KANIOYH Sep 20, 2024
1e66b30
fix deadlock in flushmemtable
KANIOYH Sep 20, 2024
c1ce11a
fmt
KANIOYH Sep 20, 2024
5f0cda5
Encapsulate meta load/store functions. Use struct{} as dptable value.
KANIOYH Sep 22, 2024
8d65b4e
support longer testkey.
KANIOYH Sep 24, 2024
424df04
fix deadlock in close
KANIOYH Sep 24, 2024
72e9951
fmt
KANIOYH Sep 24, 2024
fb2b6e6
change compactChan buffer
KANIOYH Sep 24, 2024
b01bedd
remove test log
KANIOYH Sep 24, 2024
5085612
change flushmemtable lock range
KANIOYH Sep 24, 2024
7d1ad08
compactChan noblocking
KANIOYH Sep 24, 2024
f1cfdc5
rebase for main
KANIOYH Oct 7, 2024
9946ad8
fmt
KANIOYH Oct 7, 2024
c27e9d9
sync go.sum
KANIOYH Oct 7, 2024
97c6275
add compact optimization, using BatchCapacity to submit rewriting.
KANIOYH Oct 8, 2024
7588260
go fmt and golint.
KANIOYH Oct 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,10 @@ linters:
- godot # checks if comments end in a period
# - goimports # in addition to fixing imports, goimports also formats your code in the same style as gofmt
- gomnd # detects magic numbers
- gomoddirectives # manages the use of 'replace', 'retract', and 'excludes' directives in go.mod
# - gomoddirectives # manages the use of 'replace', 'retract', and 'excludes' directives in go.mod
- gomodguard # allow and block lists linter for direct Go module dependencies. This is different from depguard where there are different block types for example version constraints and module recommendations
- goprintffuncname # checks that printf-like functions are named with f at the end
- gosec # inspects source code for security problems
# - gosec # inspects source code for security problems
- intrange # finds places where for loops could make use of an integer range
- lll # reports long lines
- loggercheck # checks key value pairs for common logger libraries (kitlog,klog,logr,zap)
Expand Down
96 changes: 85 additions & 11 deletions bptree.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"

"github.com/google/uuid"
"github.com/rosedblabs/diskhash"
"github.com/rosedblabs/wal"
"go.etcd.io/bbolt"
Expand Down Expand Up @@ -81,7 +82,11 @@ func (bt *BPTree) Get(key []byte, _ ...diskhash.MatchKeyFunc) (*KeyPosition, err
if len(value) != 0 {
keyPos = new(KeyPosition)
keyPos.key, keyPos.partition = key, uint32(p)
keyPos.position = wal.DecodeChunkPosition(value)
err := keyPos.uid.UnmarshalBinary(value[:len(keyPos.uid)])
if err != nil {
return err
}
keyPos.position = wal.DecodeChunkPosition(value[len(keyPos.uid):])
}
return nil
}); err != nil {
Expand All @@ -92,9 +97,11 @@ func (bt *BPTree) Get(key []byte, _ ...diskhash.MatchKeyFunc) (*KeyPosition, err
}

// PutBatch puts the specified key positions into the index.
func (bt *BPTree) PutBatch(positions []*KeyPosition, _ ...diskhash.MatchKeyFunc) error {
//
//nolint:gocognit
func (bt *BPTree) PutBatch(positions []*KeyPosition, _ ...diskhash.MatchKeyFunc) ([]*KeyPosition, error) {
if len(positions) == 0 {
return nil
return nil, nil
}

// group positions by partition
Expand All @@ -104,7 +111,10 @@ func (bt *BPTree) PutBatch(positions []*KeyPosition, _ ...diskhash.MatchKeyFunc)
partitionRecords[p] = append(partitionRecords[p], pos)
}

// create chan to collect deprecated entry
deprecatedChan := make(chan []*KeyPosition, len(partitionRecords))
g, ctx := errgroup.WithContext(context.Background())

for i := range partitionRecords {
partition := i
if len(partitionRecords[partition]) == 0 {
Expand All @@ -113,34 +123,68 @@ func (bt *BPTree) PutBatch(positions []*KeyPosition, _ ...diskhash.MatchKeyFunc)
g.Go(func() error {
// get the bolt db instance for this partition
tree := bt.trees[partition]
return tree.Update(func(tx *bbolt.Tx) error {
partitionDeprecatedKeyPosition := make([]*KeyPosition, 0)
err := tree.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(indexBucketName)
// put each record into the bucket
for _, record := range partitionRecords[partition] {
select {
case <-ctx.Done():
return ctx.Err()
default:
uidBytes, _ := record.uid.MarshalBinary()
encPos := record.position.Encode()
if err := bucket.Put(record.key, encPos); err != nil {
//nolint:gocritic // Need to combine uidbytes with encPos and place them in bptree
valueBytes := append(uidBytes, encPos...)
if err, oldValue := bucket.Put(record.key, valueBytes); err != nil {
if errors.Is(err, bbolt.ErrKeyRequired) {
return ErrKeyIsEmpty
}
return err
} else if oldValue != nil {
keyPos := new(KeyPosition)
keyPos.key, keyPos.partition = record.key, record.partition
err = keyPos.uid.UnmarshalBinary(oldValue[:len(keyPos.uid)])
if err != nil {
return err
}
keyPos.position = wal.DecodeChunkPosition(oldValue[len(keyPos.uid):])
partitionDeprecatedKeyPosition = append(partitionDeprecatedKeyPosition, keyPos)
}
}
}
return nil
})
// send deprecateduuid uuid slice to chan
deprecatedChan <- partitionDeprecatedKeyPosition
return err
})
}
return g.Wait()
// Close the channel after all goroutines are done
go func() {
_ = g.Wait()
close(deprecatedChan)
}()

var deprecatedKeyPosition []*KeyPosition
for partitionDeprecatedKeyPosition := range deprecatedChan {
deprecatedKeyPosition = append(deprecatedKeyPosition, partitionDeprecatedKeyPosition...)
}

// Wait for all goroutines to finish
if err := g.Wait(); err != nil {
return nil, err
}

return deprecatedKeyPosition, nil
}

// DeleteBatch deletes the specified keys from the index.
func (bt *BPTree) DeleteBatch(keys [][]byte, _ ...diskhash.MatchKeyFunc) error {
//
//nolint:gocognit
func (bt *BPTree) DeleteBatch(keys [][]byte, _ ...diskhash.MatchKeyFunc) ([]*KeyPosition, error) {
if len(keys) == 0 {
return nil
return nil, nil
}

// group keys by partition
Expand All @@ -150,6 +194,9 @@ func (bt *BPTree) DeleteBatch(keys [][]byte, _ ...diskhash.MatchKeyFunc) error {
partitionKeys[p] = append(partitionKeys[p], key)
}

// create chan to collect deprecated entry
deprecatedChan := make(chan []*KeyPosition, len(partitionKeys))

// delete keys from each partition
g, ctx := errgroup.WithContext(context.Background())
for i := range partitionKeys {
Expand All @@ -159,6 +206,7 @@ func (bt *BPTree) DeleteBatch(keys [][]byte, _ ...diskhash.MatchKeyFunc) error {
}
g.Go(func() error {
tree := bt.trees[partition]
partitionDeprecatedKeyPosition := make([]*KeyPosition, 0)
return tree.Update(func(tx *bbolt.Tx) error {
// get the bolt db instance for this partition
bucket := tx.Bucket(indexBucketName)
Expand All @@ -171,16 +219,41 @@ func (bt *BPTree) DeleteBatch(keys [][]byte, _ ...diskhash.MatchKeyFunc) error {
if len(key) == 0 {
return ErrKeyIsEmpty
}
if err := bucket.Delete(key); err != nil {
if err, oldValue := bucket.Delete(key); err != nil {
yanxiaoqi932 marked this conversation as resolved.
Show resolved Hide resolved
return err
} else if oldValue != nil {
keyPos := new(KeyPosition)
keyPos.key, keyPos.partition = key, uint32(partition)
err = keyPos.uid.UnmarshalBinary(oldValue[:len(keyPos.uid)])
if err != nil {
return err
}
keyPos.position = wal.DecodeChunkPosition(oldValue[len(keyPos.uid):])
partitionDeprecatedKeyPosition = append(partitionDeprecatedKeyPosition, keyPos)
}
}
}
return nil
})
})
}
return g.Wait()
// Close the channel after all goroutines are done
go func() {
_ = g.Wait()
close(deprecatedChan)
}()

var deprecatedKeyPosition []*KeyPosition
for partitionDeprecatedKeyPosition := range deprecatedChan {
deprecatedKeyPosition = append(deprecatedKeyPosition, partitionDeprecatedKeyPosition...)
}

// Wait for all goroutines to finish
if err := g.Wait(); err != nil {
return nil, err
}

return deprecatedKeyPosition, nil
}

// Close releases all boltdb database resources.
Expand Down Expand Up @@ -291,7 +364,8 @@ func (bi *bptreeIterator) Key() []byte {

// Value get the current value.
func (bi *bptreeIterator) Value() any {
return bi.value
var uid uuid.UUID
return bi.value[len(uid):]
}

// Valid returns whether the iterator is exhausted.
Expand Down
104 changes: 98 additions & 6 deletions bptree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package lotusdb

import (
"bytes"
"log"
"os"
"path/filepath"
"strconv"
"testing"

"github.com/cespare/xxhash/v2"
"github.com/google/uuid"
"github.com/rosedblabs/wal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -108,7 +110,7 @@ func testbptreeGet(t *testing.T, partitionNum int) {
partition: uint32(bt.options.getKeyPartition([]byte("exist"))),
position: &wal.ChunkPosition{},
})
err = bt.PutBatch(keyPositions)
_, err = bt.PutBatch(keyPositions)
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -164,14 +166,17 @@ func testbptreePutbatch(t *testing.T, partitionNum int) {
keyPositions = append(keyPositions, &KeyPosition{
key: nil,
partition: 0,
uid: uuid.New(),
position: &wal.ChunkPosition{},
}, &KeyPosition{
key: []byte("normal"),
partition: uint32(bt.options.getKeyPartition([]byte("normal"))),
uid: uuid.New(),
position: &wal.ChunkPosition{},
}, &KeyPosition{
key: []byte(""),
partition: uint32(bt.options.getKeyPartition([]byte(""))),
uid: uuid.New(),
position: &wal.ChunkPosition{},
},
)
Expand All @@ -186,15 +191,102 @@ func testbptreePutbatch(t *testing.T, partitionNum int) {
{"normal", keyPositions[1:2], false},
{"len(key)=0", keyPositions[2:3], true},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err = bt.PutBatch(tt.positions); (err != nil) != tt.wantErr {
if _, err = bt.PutBatch(tt.positions); (err != nil) != tt.wantErr {
t.Errorf("BPTree.PutBatch() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

func TestBPtreePutbatchOldUUID(t *testing.T) {
partitionNum := 3
options := indexOptions{
indexType: BTree,
dirPath: filepath.Join(os.TempDir(), "bptree-putBatch-olduuid"+strconv.Itoa(partitionNum)),
partitionNum: partitionNum,
keyHashFunction: xxhash.Sum64,
}

err := os.MkdirAll(options.dirPath, os.ModePerm)
require.NoError(t, err)
defer func() {
_ = os.RemoveAll(options.dirPath)
}()

bt, err := openBTreeIndex(options)
require.NoError(t, err)

var keyPositions []*KeyPosition
keyPositions = append(keyPositions, &KeyPosition{
key: []byte("123"),
partition: uint32(bt.options.getKeyPartition([]byte("123"))),
uid: uuid.New(),
position: &wal.ChunkPosition{},
}, &KeyPosition{
key: []byte("456"),
partition: uint32(bt.options.getKeyPartition([]byte("456"))),
uid: uuid.New(),
position: &wal.ChunkPosition{},
}, &KeyPosition{
key: []byte("789"),
partition: uint32(bt.options.getKeyPartition([]byte("789"))),
uid: uuid.New(),
position: &wal.ChunkPosition{},
},
)

var coverKeyPositions []*KeyPosition
coverKeyPositions = append(coverKeyPositions, &KeyPosition{
key: []byte("123"),
partition: uint32(bt.options.getKeyPartition([]byte("123"))),
uid: uuid.New(),
position: &wal.ChunkPosition{},
}, &KeyPosition{
key: []byte("456"),
partition: uint32(bt.options.getKeyPartition([]byte("456"))),
uid: uuid.New(),
position: &wal.ChunkPosition{},
}, &KeyPosition{
key: []byte("789"),
partition: uint32(bt.options.getKeyPartition([]byte("789"))),
uid: uuid.New(),
position: &wal.ChunkPosition{},
},
)

t.Run("check old uuid", func(t *testing.T) {
_, err = bt.PutBatch(keyPositions)
if err != nil {
t.Errorf("put error = %v", err)
}
var oldKeyPostions []*KeyPosition
oldKeyPostions, err = bt.PutBatch(coverKeyPositions)
if err != nil {
t.Errorf("put error = %v", err)
}
uidMap := make(map[uuid.UUID]struct{})
for _, oldKeyPostion := range oldKeyPostions {
uidMap[oldKeyPostion.uid] = struct{}{}
}
for _, position := range keyPositions {
log.Println("keyPositions", position.uid)
}
for _, position := range coverKeyPositions {
log.Println("coverkeyPositions", position.uid)
}

for _, position := range keyPositions {
if _, exists := uidMap[position.uid]; !exists {
log.Println("now:", position.uid)
t.Errorf("uuid not exist!")
}
}
})
}

func TestBPTree_DeleteBatch_1(t *testing.T) {
testbptreeDeletebatch(t, 1)
}
Expand Down Expand Up @@ -228,7 +320,7 @@ func testbptreeDeletebatch(t *testing.T, partitionNum int) {
position: &wal.ChunkPosition{},
})

err = bt.PutBatch(keyPositions)
_, err = bt.PutBatch(keyPositions)
require.NoError(t, err)

tests := []struct {
Expand All @@ -243,7 +335,7 @@ func testbptreeDeletebatch(t *testing.T, partitionNum int) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err = bt.DeleteBatch(tt.keys); (err != nil) != tt.wantErr {
if _, err = bt.DeleteBatch(tt.keys); (err != nil) != tt.wantErr {
t.Errorf("BPTree.DeleteBatch() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down Expand Up @@ -373,7 +465,7 @@ func Test_bptreeIterator(t *testing.T) {
position: &wal.ChunkPosition{SegmentId: 4, BlockNumber: 4, ChunkOffset: 4, ChunkSize: 4},
})

err = bt.PutBatch(keyPositions)
_, err = bt.PutBatch(keyPositions)
require.NoError(t, err)

tree := bt.trees[0]
Expand Down Expand Up @@ -454,7 +546,7 @@ func Test_bptreeIterator(t *testing.T) {
require.NoError(t, err)

// prefix
err = bt.PutBatch(keyPositions2)
_, err = bt.PutBatch(keyPositions2)
require.NoError(t, err)

tx, err = tree.Begin(true)
Expand Down
Loading