From c4adbfda147110b813c8621aa9b10674e4ddd815 Mon Sep 17 00:00:00 2001 From: Ethan Reesor Date: Tue, 25 Jun 2024 14:51:56 -0500 Subject: [PATCH] Reduce memory consumption --- pkg/database/keyvalue/block/database.go | 19 ++++++++-------- pkg/database/keyvalue/block/database_view.go | 23 ++++++++++++-------- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/pkg/database/keyvalue/block/database.go b/pkg/database/keyvalue/block/database.go index 2499be043..53fb4313f 100644 --- a/pkg/database/keyvalue/block/database.go +++ b/pkg/database/keyvalue/block/database.go @@ -26,10 +26,11 @@ var poolBuffer = binary.NewPointerPool[bytes.Buffer]() type Database struct { config - commitMu sync.Mutex - records *recordFileSet - indexFiles *indexFileTree - index recordIndex + commitMu sync.Mutex + records *recordFileSet + indexFiles *indexFileTree + blockIndex vmap[blockID, int] + recordLocation vmap[[32]byte, *recordLocation] } type config struct { @@ -88,8 +89,8 @@ func Open(path string, options ...Option) (_ *Database, err error) { return nil, err } - db.index.records.fn.forEach = db.indexFiles.ForEach - db.index.records.fn.commit = db.indexFiles.Commit + db.recordLocation.fn.forEach = db.indexFiles.ForEach + db.recordLocation.fn.commit = db.indexFiles.Commit // Determine the next block number if len(db.records.files) > 0 { @@ -109,7 +110,7 @@ func Open(path string, options ...Option) (_ *Database, err error) { } // Index blocks - err = db.index.indexBlocks(db.records) + err = db.indexBlocks() if err != nil { return nil, err } @@ -135,8 +136,8 @@ func (d *Database) Begin(prefix *record.Key, writable bool) keyvalue.ChangeSet { view := &databaseView{ recordFiles: d.records, indexFiles: d.indexFiles, - blocks: d.index.blocks.View(), - records: d.index.records.View(), + blocks: d.blockIndex.View(), + records: d.recordLocation.View(), } get := func(key *record.Key) ([]byte, error) { diff --git a/pkg/database/keyvalue/block/database_view.go b/pkg/database/keyvalue/block/database_view.go index b7b6b6781..708025043 100644 --- a/pkg/database/keyvalue/block/database_view.go +++ b/pkg/database/keyvalue/block/database_view.go @@ -16,11 +16,6 @@ import ( "golang.org/x/exp/slog" ) -type recordIndex struct { - blocks vmap[blockID, int] - records vmap[[32]byte, *recordLocation] -} - type databaseView struct { recordFiles *recordFileSet indexFiles *indexFileTree @@ -28,9 +23,9 @@ type databaseView struct { records *vmapView[[32]byte, *recordLocation] } -func (r *recordIndex) indexBlocks(records *recordFileSet) error { - blocks := r.blocks.View() - it := records.entries(func(typ entryType) bool { +func (db *Database) indexBlocks() error { + blocks := db.blockIndex.View() + it := db.records.entries(func(typ entryType) bool { return typ == entryTypeStartBlock }) var prev *recordFile @@ -62,7 +57,7 @@ func (db *Database) indexRecords() error { return nil } - records := db.index.records.View() + records := db.recordLocation.View() it := db.records.entries(nil) var prev *recordFile @@ -81,6 +76,7 @@ func (db *Database) indexRecords() error { return false } block = &e.blockID + slog.Info("Indexing block", "file", filepath.Base(item.File.file.Name()), "block", block, "module", "database") case *endBlockEntry: if block == nil { @@ -89,6 +85,15 @@ func (db *Database) indexRecords() error { } block = nil + // Commit at the end of each block to keep the memory usage under + // control for large databases + err := records.Commit() + if err != nil { + it.err = err + return false + } + records = db.recordLocation.View() + case *recordEntry: if block == nil { it.err = fmt.Errorf("%v is corrupted", filepath.Base(item.File.file.Name()))