diff --git a/pkg/database/keyvalue/leveldb/database.go b/pkg/database/keyvalue/leveldb/database.go index ba4e3d23d..e0b2e2ed0 100644 --- a/pkg/database/keyvalue/leveldb/database.go +++ b/pkg/database/keyvalue/leveldb/database.go @@ -8,6 +8,8 @@ package leveldb import ( "os" + "sync" + "sync/atomic" "github.com/syndtr/goleveldb/leveldb" "gitlab.com/accumulatenetwork/accumulate/pkg/database" @@ -20,6 +22,8 @@ import ( type Database struct { opts leveldb *leveldb.DB + closing atomic.Bool + open *sync.WaitGroup } type opts struct { @@ -58,7 +62,15 @@ func (d *Database) key(key *record.Key) []byte { // Begin begins a change set. func (d *Database) Begin(prefix *record.Key, writable bool) keyvalue.ChangeSet { - snap, err := d.leveldb.GetSnapshot() + var snap *leveldb.Snapshot + var err error + + if d.closing.Load() { + err = errors.Conflict.With("closed") + } else { + d.open.Add(1) + snap, err = d.leveldb.GetSnapshot() + } // Read from the transaction get := func(key *record.Key) ([]byte, error) { @@ -75,6 +87,14 @@ func (d *Database) Begin(prefix *record.Key, writable bool) keyvalue.ChangeSet { return d.forEach(snap, err, fn) } + discard := func() {} + if err == nil { + discard = func() { + defer d.open.Done() + snap.Release() + } + } + // The memory changeset caches entries in a map so Get will see values // updated with Put, regardless of the underlying transaction and write // batch behavior @@ -83,7 +103,7 @@ func (d *Database) Begin(prefix *record.Key, writable bool) keyvalue.ChangeSet { Get: get, Commit: commit, ForEach: forEach, - Discard: snap.Release, + Discard: discard, }) } @@ -138,8 +158,14 @@ func (d *Database) forEach(snap *leveldb.Snapshot, err error, fn func(*record.Ke return it.Error() } -// Close -// Close the underlying database +// Close the database. func (d *Database) Close() error { + // Stop new batches + d.closing.Store(true) + + // Wait for existing batches to resolve + d.open.Wait() + + // Close the database return d.leveldb.Close() }