Skip to content

Commit

Permalink
merge: branch '3632-database-improvements' into 'main'
Browse files Browse the repository at this point in the history
Database improvements [#3632]

Closes #3632

See merge request accumulatenetwork/accumulate!1096
  • Loading branch information
firelizzard18 committed Oct 16, 2024
2 parents ee154aa + 5b33a32 commit a9a6885
Show file tree
Hide file tree
Showing 13 changed files with 346 additions and 55 deletions.
38 changes: 15 additions & 23 deletions pkg/database/keyvalue/kvtest/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestDatabase(t *testing.T, open Opener) {
db := openDb(t, open)

// Read when nothing exists
doBatch(t, db, func(batch keyvalue.ChangeSet) {
doBatch(t, db, nil, func(batch keyvalue.ChangeSet) {
_, err := batch.Get(record.NewKey("answer", 0))
require.Error(t, err)
require.ErrorAs(t, err, new(*database.NotFoundError))
Expand All @@ -63,34 +63,26 @@ func TestDatabase(t *testing.T, open Opener) {
// Write
values := map[record.KeyHash]string{}
for i := range N {
if i > 0 {
doBatch(t, db, func(batch keyvalue.ChangeSet) {
_, err := batch.Get(record.NewKey("answer", i-1, 0))
require.NoError(t, err)
})
}
doBatch(t, db, func(batch keyvalue.ChangeSet) {
prefix := record.NewKey("answer", i)
doBatch(t, db, prefix, func(batch keyvalue.ChangeSet) {
for j := range M {
key := record.NewKey("answer", i, j)
value := fmt.Sprintf("%x/%x this much data ", i, j)
values[key.Hash()] = value
err := batch.Put(key, []byte(value))
values[record.NewKey("answer", i, j).Hash()] = value
err := batch.Put(record.NewKey(j), []byte(value))
require.NoError(t, err, "Put")
}
})
if i > 0 {
doBatch(t, db, func(batch keyvalue.ChangeSet) {
_, err := batch.Get(record.NewKey("answer", i-1, 0))
require.NoError(t, err)
})
}
doBatch(t, db, prefix, func(batch keyvalue.ChangeSet) {
_, err := batch.Get(record.NewKey(0))
require.NoError(t, err)
})
}

// Verify with a new batch
doBatch(t, db, func(batch keyvalue.ChangeSet) {
doBatch(t, db, record.NewKey("answer"), func(batch keyvalue.ChangeSet) {
for i := range N {
for j := range M {
val, err := batch.Get(record.NewKey("answer", i, j))
val, err := batch.Get(record.NewKey(i, j))
require.NoError(t, err, "Get")
require.Equal(t, fmt.Sprintf("%x/%x this much data ", i, j), string(val))
}
Expand All @@ -101,7 +93,7 @@ func TestDatabase(t *testing.T, open Opener) {
db.Close()
db = openDb(t, open)

doBatch(t, db, func(batch keyvalue.ChangeSet) {
doBatch(t, db, nil, func(batch keyvalue.ChangeSet) {
for i := range N {
for j := range M {
val, err := batch.Get(record.NewKey("answer", i, j))
Expand All @@ -112,7 +104,7 @@ func TestDatabase(t *testing.T, open Opener) {
})

// Verify ForEach
doBatch(t, db, func(batch keyvalue.ChangeSet) {
doBatch(t, db, nil, func(batch keyvalue.ChangeSet) {
require.NoError(t, batch.ForEach(func(key *record.Key, value []byte) error {
expect, ok := values[key.Hash()]
require.Truef(t, ok, "%v should exist", key)
Expand Down Expand Up @@ -242,9 +234,9 @@ func TestDelete(t *testing.T, open Opener) {
require.ErrorIs(t, err, errors.NotFound)
}

func doBatch(t testing.TB, db keyvalue.Beginner, fn func(batch keyvalue.ChangeSet)) {
func doBatch(t testing.TB, db keyvalue.Beginner, prefix *record.Key, fn func(batch keyvalue.ChangeSet)) {
t.Helper()
batch := db.Begin(nil, true)
batch := db.Begin(prefix, true)
defer batch.Discard()
fn(batch)
require.NoError(t, batch.Commit())
Expand Down
101 changes: 101 additions & 0 deletions pkg/database/keyvalue/overlay/overlay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2024 The Accumulate Authors
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

package overlay

import (
"gitlab.com/accumulatenetwork/accumulate/pkg/database"
"gitlab.com/accumulatenetwork/accumulate/pkg/database/keyvalue"
"gitlab.com/accumulatenetwork/accumulate/pkg/database/keyvalue/memory"
"gitlab.com/accumulatenetwork/accumulate/pkg/errors"
"gitlab.com/accumulatenetwork/accumulate/pkg/types/record"
)

func Open(a, b keyvalue.Beginner) keyvalue.Beginner {
return &Database{a, b}
}

type Database struct {
a, b keyvalue.Beginner
}

func (d *Database) Begin(prefix *database.Key, writable bool) keyvalue.ChangeSet {
a := d.a.Begin(prefix, writable)
b := d.b.Begin(prefix, false)

return memory.NewChangeSet(memory.ChangeSetOptions{
Get: func(key *record.Key) ([]byte, error) { return get(a, b, key) },
ForEach: func(fn func(*record.Key, []byte) error) error { return forEach(a, b, fn) },
Discard: func() { a.Discard(); b.Discard() },
Commit: func(m map[[32]byte]memory.Entry) error { return commit(a, b, m) },
})
}

func get(a, b keyvalue.ChangeSet, key *record.Key) ([]byte, error) {
// Get from a
v, err := a.Get(key)
switch {
case err == nil:
return v, nil
case !errors.Is(err, errors.NotFound):
return nil, err
}

// Get from b
v, err = b.Get(key)
switch {
case err == nil:
return v, nil
case !errors.Is(err, errors.NotFound):
return nil, err
}

return nil, (*database.NotFoundError)(key)
}

func forEach(a, b keyvalue.ChangeSet, fn func(*record.Key, []byte) error) error {
seen := map[[32]byte]bool{}

err := a.ForEach(func(key *record.Key, value []byte) error {
seen[key.Hash()] = true
return fn(key, value)
})
if err != nil {
return err
}
return b.ForEach(func(key *record.Key, value []byte) error {
if seen[key.Hash()] {
return nil
}
return fn(key, value)
})
}

func commit(a, b keyvalue.ChangeSet, m map[[32]byte]memory.Entry) error {
for _, entry := range m {
if !entry.Delete {
err := a.Put(entry.Key, entry.Value)
if err != nil {
return err
}
continue
}

_, err := b.Get(entry.Key)
switch {
case err == nil:
return errors.NotAllowed.WithFormat("cannot delete %v: it is present in the underlying database", entry.Key)
case !errors.Is(err, errors.NotFound):
return err
}

err = a.Delete(entry.Key)
if err != nil {
return err
}
}
return a.Commit()
}
43 changes: 43 additions & 0 deletions pkg/database/keyvalue/overlay/overlay_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2024 The Accumulate Authors
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

package overlay

import (
"testing"

"gitlab.com/accumulatenetwork/accumulate/pkg/database/keyvalue"
"gitlab.com/accumulatenetwork/accumulate/pkg/database/keyvalue/kvtest"
"gitlab.com/accumulatenetwork/accumulate/pkg/database/keyvalue/memory"
)

func open(*testing.T) kvtest.Opener {
db := Open(memory.New(nil), memory.New(nil))
return func() (keyvalue.Beginner, error) {
return db, nil
}
}

func TestDatabase(t *testing.T) {
kvtest.TestDatabase(t, open(t))
}

func TestIsolation(t *testing.T) {
t.Skip("Not supported by the underlying databases")
kvtest.TestIsolation(t, open(t))
}

func TestSubBatch(t *testing.T) {
kvtest.TestSubBatch(t, open(t))
}

func TestPrefix(t *testing.T) {
kvtest.TestPrefix(t, open(t))
}

func TestDelete(t *testing.T) {
kvtest.TestDelete(t, open(t))
}
77 changes: 77 additions & 0 deletions pkg/database/keyvalue/overlay/simulator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2024 The Accumulate Authors
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

package overlay_test

import (
"testing"

"github.com/stretchr/testify/require"
"gitlab.com/accumulatenetwork/accumulate/pkg/build"
"gitlab.com/accumulatenetwork/accumulate/pkg/errors"
. "gitlab.com/accumulatenetwork/accumulate/protocol"
. "gitlab.com/accumulatenetwork/accumulate/test/harness"
"gitlab.com/accumulatenetwork/accumulate/test/simulator"
)

func TestOverlay(t *testing.T) {
// Setup
alice := build.
Identity("alice").Create("book").
Tokens("tokens").Create("ACME").Add(1e9).Identity().
Book("book").Page(1).Create().AddCredits(1e9).Book().Identity()
aliceKey := alice.Book("book").Page(1).
GenerateKey(SignatureTypeED25519)

badger := simulator.BadgerDbOpener(t.TempDir(), func(err error) { require.NoError(t, err) })
simOpts := []simulator.Option{
simulator.SimpleNetwork(t.Name(), 1, 1),
simulator.Genesis(GenesisTime).With(alice).WithVersion(ExecutorVersionV2Vandenberg),
}

// Execute a transaction with the original database (timestamp = 1)
sim := NewSim(t, append(simOpts,
simulator.WithDatabase(badger),
)...)

st := sim.BuildAndSubmitTxnSuccessfully(
build.Transaction().For(alice, "book", "1").
BurnCredits(1).
SignWith(alice, "book", "1").Version(1).Timestamp(1).PrivateKey(aliceKey))
sim.StepUntil(
Txn(st.TxID).Completes())

// Execute a transaction in an overlay (timestamp = 2)
sim = NewSim(t, append(simOpts,
simulator.OverlayDatabase(simulator.MemoryDbOpener, badger),
)...)

st = sim.BuildAndSubmitTxnSuccessfully(
build.Transaction().For(alice, "book", "1").
BurnCredits(1).
SignWith(alice, "book", "1").Version(1).Timestamp(2).PrivateKey(aliceKey))
sim.StepUntil(
Txn(st.TxID).Completes())

// Verify that the same timestamp fails
st = sim.BuildAndSubmit(
build.Transaction().For(alice, "book", "1").
BurnCredits(1).
SignWith(alice, "book", "1").Version(1).Timestamp(2).PrivateKey(aliceKey))[1]
require.ErrorIs(t, st.AsError(), errors.BadTimestamp)

// Executing the transaction with the same timestamp in another overlay succeeds
sim = NewSim(t, append(simOpts,
simulator.OverlayDatabase(simulator.MemoryDbOpener, badger),
)...)

st = sim.BuildAndSubmitTxnSuccessfully(
build.Transaction().For(alice, "book", "1").
BurnCredits(1).
SignWith(alice, "book", "1").Version(1).Timestamp(2).PrivateKey(aliceKey))
sim.StepUntil(
Txn(st.TxID).Completes())
}
38 changes: 31 additions & 7 deletions pkg/database/keyvalue/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,26 @@ import (

// Serve opens a batch and serves it over the connection. Serve returns once the
// connection is closed or the remote side calls Commit. See [Connect].
func Serve(db keyvalue.Beginner, conn io.ReadWriteCloser, prefix *record.Key, writable bool) error {
defer conn.Close()
func Serve(db keyvalue.Beginner, conn io.ReadWriteCloser, prefix *record.Key, writable bool) <-chan error {
batch := db.Begin(prefix, writable)
defer batch.Discard()
ch := make(chan error)
go func() {
defer conn.Close()
defer batch.Discard()
defer close(ch)
ch <- serve(batch, conn)
}()
return ch
}

func serve(batch keyvalue.ChangeSet, conn io.ReadWriteCloser) error {
rd := bufio.NewReader(conn)
run := true
for run {
for {
if !run {
return nil
}

c, err := read(rd, unmarshalCall)
switch {
case err == nil:
Expand All @@ -41,6 +53,11 @@ func Serve(db keyvalue.Beginner, conn io.ReadWriteCloser, prefix *record.Key, wr
switch c := c.(type) {
case *commitCall:
run = false
if c.Discard {
batch.Discard()
return new(okResponse)
}

err := batch.Commit()
if err != nil {
return errResp(err)
Expand All @@ -58,7 +75,6 @@ func Serve(db keyvalue.Beginner, conn io.ReadWriteCloser, prefix *record.Key, wr
return err
}
}
return nil
}

// DB is a remote key-value database client that creates a connection to the
Expand Down Expand Up @@ -105,9 +121,12 @@ func (c *DB) Begin(prefix *database.Key, writable bool) keyvalue.ChangeSet {
}

discard := func() {
if err == nil {
_ = conn.Close()
if err != nil {
return
}

_ = c.discard(rd, conn)
_ = conn.Close()
}

return memory.NewChangeSet(memory.ChangeSetOptions{
Expand All @@ -127,6 +146,11 @@ func (c *DB) get(rd *bufio.Reader, wr io.Writer, key *record.Key) ([]byte, error
return r.Value, nil
}

func (c *DB) discard(rd *bufio.Reader, wr io.WriteCloser) error {
_, err := roundTrip[*okResponse](rd, wr, &commitCall{Discard: true})
return err
}

func (c *DB) commit(rd *bufio.Reader, wr io.WriteCloser, entries map[[32]byte]memory.Entry) error {
var err error
for _, e := range entries {
Expand Down
Loading

0 comments on commit a9a6885

Please sign in to comment.