Skip to content

Commit

Permalink
add compact optimization, using BatchCapacity to submit rewriting.
Browse files Browse the repository at this point in the history
  • Loading branch information
KANIOYH committed Oct 8, 2024
1 parent c27e9d9 commit 97c6275
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 23 deletions.
34 changes: 21 additions & 13 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"path/filepath"
"reflect"
"sync"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -117,7 +118,7 @@ func Open(options Options) (*DB, error) {
segmentSize: options.ValueLogFileSize,
partitionNum: uint32(options.PartitionNum),
hashKeyFunction: options.KeyHashFunction,
compactBatchCount: options.CompactBatchCount,
compactBatchCapacity: options.CompactBatchCapacity,
deprecatedtableNumber: deprecatedNumber,
totalNumber: totalEntryNumber,
})
Expand Down Expand Up @@ -671,18 +672,19 @@ func (db *DB) Compact() error {
}

g, _ := errgroup.WithContext(context.Background())
var capacity int64 = 0

Check warning on line 675 in db.go

View workflow job for this annotation

GitHub Actions / ubuntu-test

var-declaration: should drop = 0 from declaration of var capacity; it is the zero value (revive)

Check warning on line 675 in db.go

View workflow job for this annotation

GitHub Actions / windows-test

var-declaration: should drop = 0 from declaration of var capacity; it is the zero value (revive)
var capacityList []int64 = make([]int64, db.options.PartitionNum)

Check warning on line 676 in db.go

View workflow job for this annotation

GitHub Actions / ubuntu-test

var-declaration: should omit type []int64 from declaration of var capacityList; it will be inferred from the right-hand side (revive)

Check warning on line 676 in db.go

View workflow job for this annotation

GitHub Actions / windows-test

var-declaration: should omit type []int64 from declaration of var capacityList; it will be inferred from the right-hand side (revive)
for i := 0; i < int(db.vlog.options.partitionNum); i++ {
part := i
g.Go(func() error {
newVlogFile := openVlogFile(part, tempValueLogFileExt)

validRecords := make([]*ValueLogRecord, 0, db.vlog.options.compactBatchCount)
validRecords := make([]*ValueLogRecord, 0)
reader := db.vlog.walFiles[part].NewReader()
count := 0
// iterate all records in wal, find the valid records
for {
count++
chunk, pos, err := reader.Next()
atomic.AddInt64(&capacity, int64(len(chunk)))
capacityList[part] += int64(len(chunk))
if err != nil {
if errors.Is(err, io.EOF) {
break
Expand Down Expand Up @@ -713,13 +715,16 @@ func (db *DB) Compact() error {
if keyPos.partition == uint32(part) && reflect.DeepEqual(keyPos.position, pos) {
validRecords = append(validRecords, record)
}
if count%db.vlog.options.compactBatchCount == 0 {
err = db.rewriteValidRecords(newVlogFile, validRecords, part)

if capacity >= int64(db.vlog.options.compactBatchCapacity) {
err := db.rewriteValidRecords(newVlogFile, validRecords, part)

Check failure on line 720 in db.go

View workflow job for this annotation

GitHub Actions / ubuntu-test

shadow: declaration of "err" shadows declaration at line 685 (govet)

Check failure on line 720 in db.go

View workflow job for this annotation

GitHub Actions / windows-test

shadow: declaration of "err" shadows declaration at line 685 (govet)
if err != nil {
_ = newVlogFile.Delete()
return err
}
validRecords = validRecords[:0]
atomic.AddInt64(&capacity, -capacityList[part])
capacityList[part] = 0
}
}

Expand Down Expand Up @@ -775,17 +780,19 @@ func (db *DB) CompactWithDeprecatedtable() error {
}

g, _ := errgroup.WithContext(context.Background())
var capacity int64 = 0

Check warning on line 783 in db.go

View workflow job for this annotation

GitHub Actions / ubuntu-test

var-declaration: should drop = 0 from declaration of var capacity; it is the zero value (revive)

Check warning on line 783 in db.go

View workflow job for this annotation

GitHub Actions / windows-test

var-declaration: should drop = 0 from declaration of var capacity; it is the zero value (revive)
var capacityList []int64 = make([]int64, db.options.PartitionNum)

Check warning on line 784 in db.go

View workflow job for this annotation

GitHub Actions / ubuntu-test

var-declaration: should omit type []int64 from declaration of var capacityList; it will be inferred from the right-hand side (revive)

Check warning on line 784 in db.go

View workflow job for this annotation

GitHub Actions / windows-test

var-declaration: should omit type []int64 from declaration of var capacityList; it will be inferred from the right-hand side (revive)
for i := 0; i < int(db.vlog.options.partitionNum); i++ {
part := i
g.Go(func() error {
newVlogFile := openVlogFile(part, tempValueLogFileExt)
validRecords := make([]*ValueLogRecord, 0, db.vlog.options.compactBatchCount)
validRecords := make([]*ValueLogRecord, 0)
reader := db.vlog.walFiles[part].NewReader()
count := 0
// iterate all records in wal, find the valid records
for {
count++
chunk, pos, err := reader.Next()
atomic.AddInt64(&capacity, int64(len(chunk)))
capacityList[part] += int64(len(chunk))
if err != nil {
if errors.Is(err, io.EOF) {
break
Expand Down Expand Up @@ -822,14 +829,15 @@ func (db *DB) CompactWithDeprecatedtable() error {
}
}

if count%db.vlog.options.compactBatchCount == 0 {
err = db.rewriteValidRecords(newVlogFile, validRecords, part)

if capacity >= int64(db.vlog.options.compactBatchCapacity) {
err := db.rewriteValidRecords(newVlogFile, validRecords, part)

Check failure on line 833 in db.go

View workflow job for this annotation

GitHub Actions / ubuntu-test

shadow: declaration of "err" shadows declaration at line 793 (govet)

Check failure on line 833 in db.go

View workflow job for this annotation

GitHub Actions / windows-test

shadow: declaration of "err" shadows declaration at line 793 (govet)
if err != nil {
_ = newVlogFile.Delete()
return err
}
validRecords = validRecords[:0]
atomic.AddInt64(&capacity, -capacityList[part])
capacityList[part] = 0
}
}
if len(validRecords) > 0 {
Expand Down
10 changes: 5 additions & 5 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func TestDBCompact(t *testing.T) {
path, err := os.MkdirTemp("", "db-test-compact")
require.NoError(t, err)
options.DirPath = path
options.CompactBatchCount = 2 << 5
options.CompactBatchCapacity = 1 << 6

db, err := Open(options)
require.NoError(t, err)
Expand Down Expand Up @@ -489,7 +489,7 @@ func TestDBCompactWitchDeprecatetable(t *testing.T) {
path, err := os.MkdirTemp("", "db-test-CompactWitchDeprecatetable")
require.NoError(t, err)
options.DirPath = path
options.CompactBatchCount = 2 << 5
options.CompactBatchCapacity = 1 << 6

db, err := Open(options)
require.NoError(t, err)
Expand Down Expand Up @@ -542,7 +542,7 @@ func TestDBAutoCompact(t *testing.T) {
path, err := os.MkdirTemp("", "db-test-AutoCompact")
require.NoError(t, err)
options.DirPath = path
options.CompactBatchCount = 2 << 5
options.CompactBatchCapacity = 1 << 6

db, err := Open(options)
require.NoError(t, err)
Expand Down Expand Up @@ -605,7 +605,7 @@ func TestDBAutoCompactWithBusyIO(t *testing.T) {
path, err := os.MkdirTemp("", "db-test-AutoCompactWithBusyIO")
require.NoError(t, err)
options.DirPath = path
options.CompactBatchCount = 2 << 5
options.CompactBatchCapacity = 1 << 6

db, err := Open(options)
require.NoError(t, err)
Expand Down Expand Up @@ -1062,7 +1062,7 @@ func TestDeprecatetableMetaPersist(t *testing.T) {
path, err := os.MkdirTemp("", "db-test-DeprecatetableMetaPersist")
require.NoError(t, err)
options.DirPath = path
options.CompactBatchCount = 2 << 5
options.CompactBatchCapacity = 1 << 6

db, err := Open(options)
require.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ type Options struct {
// default value is bptree.
IndexType IndexType

// writing entries to disk after reading the specified number of entries.
CompactBatchCount int
// writing entries to disk after reading the specified memory capacity of entries.
CompactBatchCapacity int

// deprecatedtable recommend compaction rate
AdvisedCompactionRate float32
Expand Down Expand Up @@ -140,7 +140,7 @@ var DefaultOptions = Options{
ValueLogFileSize: 1 * GB,
IndexType: BTree,
//nolint:gomnd // default
CompactBatchCount: 10000,
CompactBatchCapacity: 1 << 30,
//nolint:gomnd // default
AdvisedCompactionRate: 0.3,
//nolint:gomnd // default
Expand Down
4 changes: 2 additions & 2 deletions vlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ type valueLogOptions struct {
// hash function for sharding
hashKeyFunction func([]byte) uint64

// writing validEntries to disk after reading the specified number of entries.
compactBatchCount int
// writing validEntries to disk after reading the specified memory capacity of entries.
compactBatchCapacity int

// deprecated number
deprecatedtableNumber uint32
Expand Down

0 comments on commit 97c6275

Please sign in to comment.