Skip to content

Commit

Permalink
Wait for everyone to close (LevelDB)
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed May 6, 2024
1 parent cfdff4d commit f5abe97
Showing 1 changed file with 30 additions and 4 deletions.
34 changes: 30 additions & 4 deletions pkg/database/keyvalue/leveldb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package leveldb

import (
"os"
"sync"
"sync/atomic"

"github.com/syndtr/goleveldb/leveldb"
"gitlab.com/accumulatenetwork/accumulate/pkg/database"
Expand All @@ -20,6 +22,8 @@ import (
type Database struct {
opts
leveldb *leveldb.DB
closing atomic.Bool
open *sync.WaitGroup
}

type opts struct {
Expand Down Expand Up @@ -58,7 +62,15 @@ func (d *Database) key(key *record.Key) []byte {

// Begin begins a change set.
func (d *Database) Begin(prefix *record.Key, writable bool) keyvalue.ChangeSet {
snap, err := d.leveldb.GetSnapshot()
var snap *leveldb.Snapshot
var err error

if d.closing.Load() {
err = errors.Conflict.With("closed")
} else {
d.open.Add(1)
snap, err = d.leveldb.GetSnapshot()
}

// Read from the transaction
get := func(key *record.Key) ([]byte, error) {
Expand All @@ -75,6 +87,14 @@ func (d *Database) Begin(prefix *record.Key, writable bool) keyvalue.ChangeSet {
return d.forEach(snap, err, fn)
}

discard := func() {}
if err == nil {
discard = func() {
defer d.open.Done()
snap.Release()
}
}

// The memory changeset caches entries in a map so Get will see values
// updated with Put, regardless of the underlying transaction and write
// batch behavior
Expand All @@ -83,7 +103,7 @@ func (d *Database) Begin(prefix *record.Key, writable bool) keyvalue.ChangeSet {
Get: get,
Commit: commit,
ForEach: forEach,
Discard: snap.Release,
Discard: discard,
})
}

Expand Down Expand Up @@ -138,8 +158,14 @@ func (d *Database) forEach(snap *leveldb.Snapshot, err error, fn func(*record.Ke
return it.Error()
}

// Close
// Close the underlying database
// Close the database.
func (d *Database) Close() error {
// Stop new batches
d.closing.Store(true)

// Wait for existing batches to resolve
d.open.Wait()

// Close the database
return d.leveldb.Close()
}

0 comments on commit f5abe97

Please sign in to comment.