Skip to content

Commit

Permalink
Better scanning
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Oct 17, 2024
1 parent 7756199 commit b393e97
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 34 deletions.
28 changes: 0 additions & 28 deletions pkg/database/indexing/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,31 +130,3 @@ func (x *Log[V]) append2(record values.Value[*Block[V]], e *Entry[V]) (*Block[V]
}
return new, x.getBlock(new.Level, new.Index).Put(new)
}

func (x *Log[V]) All(yield func(QueryResult[V]) bool) {
x.all(x.getHead(), yield)
}

func (x *Log[V]) all(block values.Value[*Block[V]], yield func(QueryResult[V]) bool) bool {
b, err := block.Get()
if err != nil {
yield(errResult[V](err))
return false
}

if b.Level == 0 {
for _, e := range b.Entries {
if !yield(entry(e)) {
return false
}
}
return true
}

for _, e := range b.Entries {
if !x.all(x.getBlock(b.Level-1, e.Index), yield) {
return false
}
}
return true
}
93 changes: 93 additions & 0 deletions pkg/database/indexing/log_scan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2024 The Accumulate Authors
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

package indexing

import (
"iter"
"slices"

"gitlab.com/accumulatenetwork/accumulate/internal/database/record"
"gitlab.com/accumulatenetwork/accumulate/pkg/database/values"
)

type logScanner[V any] Log[V]

type logScanFrom[V any] struct {
log *Log[V]
from *record.Key
}

type logScanFromTo[V any] struct {
log *Log[V]
from *record.Key
to *record.Key
}

func (x *Log[V]) Scan() *logScanner[V] {
return (*logScanner[V])(x)
}

func (x *logScanner[V]) All() iter.Seq[QueryResult[V]] {
return x.FromFirst().ToLast()
}

func (x *logScanner[V]) From(from *record.Key) *logScanFrom[V] {
return &logScanFrom[V]{log: (*Log[V])(x), from: from}
}

func (x *logScanner[V]) FromFirst() *logScanFrom[V] {
return x.From(nil)
}

func (x *logScanFrom[V]) To(to *record.Key) iter.Seq[QueryResult[V]] {
s := &logScanFromTo[V]{log: x.log, from: x.from, to: to}
return func(yield func(QueryResult[V]) bool) { s.scan(x.log.getHead(), true, yield) }
}

func (x *logScanFrom[V]) ToLast() iter.Seq[QueryResult[V]] {
return x.To(nil)
}

func (s *logScanFromTo[V]) scan(block values.Value[*Block[V]], checkFrom bool, yield func(QueryResult[V]) bool) bool {
b, err := block.Get()
if err != nil {
yield(errResult[V](err))
return false
}

var i int
if checkFrom && s.from != nil {
var exact bool
i, exact = slices.BinarySearchFunc(b.Entries, s.from, func(e *Entry[V], t *record.Key) int {
return e.Key.Compare(t)
})
if i > 0 && !exact {
i--
}
}

if i >= len(b.Entries) {
return true
}

for i, e := range b.Entries[i:] {
if s.to != nil && e.Key.Compare(s.to) >= 0 {
return true
}
if b.Level == 0 {
if !yield(entry(e)) {
return false
}
} else {
if !s.scan(s.log.getBlock(b.Level-1, e.Index), i == 0, yield) {
return false
}
}
}

return true
}
42 changes: 36 additions & 6 deletions pkg/database/indexing/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package indexing

import (
"fmt"
"iter"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -44,14 +45,14 @@ func TestLog(t *testing.T) {
}

// Verify the sequence is correct
require.Equal(t, strs, getAll(t, x))
require.Equal(t, strs, get(t, x.Scan().All()))

// Verify everything is persisted correctly
require.NoError(t, x.Commit())
x = new(Log[string])
x.store = keyvalue.RecordStore{Store: store}
x.blockSize = 1 << P
require.Equal(t, strs, getAll(t, x))
require.Equal(t, strs, get(t, x.Scan().All()))

// Verify each element can be found
for _, v := range values {
Expand Down Expand Up @@ -99,6 +100,35 @@ func TestLog(t *testing.T) {
}
}

func TestLog_Scan(t *testing.T) {
const P = 2
const N = 1 << (P * 3)

store := memory.New(nil).Begin(nil, true)
x := new(Log[string])
x.store = keyvalue.RecordStore{Store: store}
x.blockSize = 1 << P

var rand lxrand.Sequence
var v uint64 = 1
var keys []*record.Key
var strs []string
for i := 0; i < N; i++ {
v = uint64(i)
k, s := record.NewKey(i), fmt.Sprint(i)
require.NoError(t, x.Append(k, s), "insert %d", i)
keys = append(keys, k)
strs = append(strs, s)
v += uint64(rand.Byte()%16 + 1)
}

// Verify
require.Equal(t, strs, get(t, x.Scan().All()))
require.Equal(t, strs[10:], get(t, x.Scan().From(keys[10]).ToLast()))
require.Equal(t, strs[:50], get(t, x.Scan().FromFirst().To(keys[50])))
require.Equal(t, strs[10:50], get(t, x.Scan().From(keys[10]).To(keys[50])))
}

func BenchmarkLog_Append(b *testing.B) {
for _, P := range []int{8, 10, 12, 14, 16} {
b.Run(fmt.Sprint(1<<P), func(b *testing.B) {
Expand Down Expand Up @@ -144,14 +174,14 @@ func BenchmarkLog_Find(b *testing.B) {
})
}
}
func getAll[V any](t testing.TB, x *Log[V]) []V {

func get[V any](t testing.TB, seq iter.Seq[QueryResult[V]]) []V {
var all []V
x.All(func(r QueryResult[V]) bool {
for r := range seq {
_, v, err := r.Get()
require.NoError(t, err)
all = append(all, v)
return true
})
}
return all
}

Expand Down

0 comments on commit b393e97

Please sign in to comment.