Skip to content

Commit

Permalink
bt index: read value if key matches straight away (#12786)
Browse files Browse the repository at this point in the history
another try to improve bt indices.
 + keep cursors in pool
+ untie Cursor from BtIndex, now it shares ptr to pool for cursor
disposal after .Close() and ptr to eliasFano mapping of `di ->
offset_to_key`
 + reduce allocations to zero during seek 
+ Changed seek semantics. Previously we returned true if key matches
exactly to `seekKey`. Now caller should do such check if needed. Seek
guarantees to return exact match OR first key bigger than `seekKey`
+ changed Get semantics - now it returns value for given `key` with only
one exception: nil key returns value for first key.
+ read Value immediately if key matches during Get (skip another ef.Get
and decoding key twice)
 
 
Reduces load on GC and slightly improves performance

tried to use `MatchCmp` as well but this causes gas mismatch (i assume
during reading from compressed file, because local tests are green
though).
  • Loading branch information
awskii authored Nov 27, 2024
1 parent f1ec92d commit f00d0f1
Show file tree
Hide file tree
Showing 13 changed files with 301 additions and 149 deletions.
7 changes: 7 additions & 0 deletions erigon-lib/seg/seg_auto_rw.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ func (g *Reader) MatchPrefix(prefix []byte) bool {
return g.Getter.MatchPrefixUncompressed(prefix)
}

func (g *Reader) MatchCmp(prefix []byte) int {
if g.c&CompressKeys != 0 {
return g.Getter.MatchCmp(prefix)
}
return g.Getter.MatchCmpUncompressed(prefix)
}

func (g *Reader) Next(buf []byte) ([]byte, uint64) {
fl := CompressKeys
if g.nextValue {
Expand Down
4 changes: 3 additions & 1 deletion erigon-lib/state/aggregator_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func Benchmark_BtreeIndex_Search(b *testing.B) {
require.NoErrorf(b, err, "i=%d", i)
require.EqualValues(b, keys[p], cur.Key())
require.NotEmptyf(b, cur.Value(), "i=%d", i)
cur.Close()
}
}

Expand Down Expand Up @@ -201,6 +202,7 @@ func Benchmark_BTree_Seek(b *testing.B) {
require.NoError(b, err)

require.EqualValues(b, keys[p], cur.key)
cur.Close()
}
})

Expand Down Expand Up @@ -233,7 +235,7 @@ func Benchmark_BTree_Seek(b *testing.B) {
if i%1000 == 0 {
fmt.Printf("next_access_last[of %d keys] %v\n", nextKeys, ntimer/time.Duration(nextKeys))
}

cur.Close()
}
})
}
Expand Down
3 changes: 2 additions & 1 deletion erigon-lib/state/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func TestNewBtIndex(t *testing.T) {
defer kv.Close()
require.NotNil(t, kv)
require.NotNil(t, bt)
require.Len(t, bt.bplus.mx, keyCount/int(DefaultBtreeM))
require.True(t, len(bt.bplus.mx) >= keyCount/int(DefaultBtreeM))

for i := 1; i < len(bt.bplus.mx); i++ {
require.NotZero(t, bt.bplus.mx[i].di)
Expand Down Expand Up @@ -652,6 +652,7 @@ func TestAggregatorV3_RestartOnFiles(t *testing.T) {
storedV, _, found, err := ac.GetLatest(kv.StorageDomain, key[:length.Addr], key[length.Addr:], newTx)
require.NoError(t, err)
require.True(t, found)
require.NotEmpty(t, storedV)
_ = key[0]
_ = storedV[0]
require.EqualValues(t, key[0], storedV[0])
Expand Down
163 changes: 97 additions & 66 deletions erigon-lib/state/bps_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewBpsTree(kv *seg.Reader, offt *eliasfano32.EliasFano, M uint64, dataLooku
// "assert key behind offset == to stored key in bt"
var envAssertBTKeys = dbg.EnvBool("BT_ASSERT_OFFSETS", false)

func NewBpsTreeWithNodes(kv *seg.Reader, offt *eliasfano32.EliasFano, M uint64, dataLookup dataLookupFunc, keyCmp keyCmpFunc, nodes []Node) *BpsTree {
func NewBpsTreeWithNodes(kv *seg.Reader, offt *eliasfano32.EliasFano, M uint64, dataLookup dataLookupFunc, keyCmp keyCmpFunc, nodes []*Node) *BpsTree {
bt := &BpsTree{M: M, offt: offt, dataLookupFunc: dataLookup, keyCmpFunc: keyCmp, mx: nodes}

nsz := uint64(unsafe.Sizeof(Node{}))
Expand All @@ -80,6 +80,7 @@ func NewBpsTreeWithNodes(kv *seg.Reader, offt *eliasfano32.EliasFano, M uint64,
}
}
cachedBytes += nsz + uint64(len(nodes[i].key))

nodes[i].off = offt.Get(nodes[i].di)
}

Expand All @@ -88,14 +89,17 @@ func NewBpsTreeWithNodes(kv *seg.Reader, offt *eliasfano32.EliasFano, M uint64,

type BpsTree struct {
offt *eliasfano32.EliasFano // ef with offsets to key/vals
mx []Node
mx []*Node
M uint64 // limit on amount of 'children' for node
trace bool

dataLookupFunc dataLookupFunc
keyCmpFunc keyCmpFunc
cursorGetter cursorGetter
}

type cursorGetter func(k, v []byte, di uint64, g *seg.Reader) *Cursor

type BpsTreeIterator struct {
t *BpsTree
i uint64
Expand Down Expand Up @@ -178,15 +182,17 @@ func encodeListNodes(nodes []Node, w io.Writer) error {
return nil
}

func decodeListNodes(data []byte) ([]Node, error) {
func decodeListNodes(data []byte) ([]*Node, error) {
count := binary.BigEndian.Uint64(data[:8])
nodes := make([]Node, count)
nodes := make([]*Node, count)
pos := 8
for ni := 0; ni < int(count); ni++ {
dp, err := (&nodes[ni]).Decode(data[pos:])
node := new(Node)
dp, err := node.Decode(data[pos:])
if err != nil {
return nil, fmt.Errorf("decode node %d: %w", ni, err)
}
nodes[ni] = node
pos += int(dp)
}
return nodes, nil
Expand Down Expand Up @@ -220,7 +226,7 @@ func (b *BpsTree) WarmUp(kv *seg.Reader) (err error) {
if N == 0 {
return nil
}
b.mx = make([]Node, 0, N/b.M)
b.mx = make([]*Node, 0, N/b.M)
if b.trace {
fmt.Printf("mx cap %d N=%d M=%d\n", cap(b.mx), N, b.M)
}
Expand All @@ -240,7 +246,7 @@ func (b *BpsTree) WarmUp(kv *seg.Reader) (err error) {
if err != nil {
return err
}
b.mx = append(b.mx, Node{off: b.offt.Get(di), key: common.Copy(key), di: di})
b.mx = append(b.mx, &Node{off: b.offt.Get(di), key: common.Copy(key), di: di})
cachedBytes += nsz + uint64(len(key))
}

Expand All @@ -252,9 +258,10 @@ func (b *BpsTree) WarmUp(kv *seg.Reader) (err error) {
}

// bs performs pre-seach over warmed-up list of nodes to figure out left and right bounds on di for key
func (b *BpsTree) bs(x []byte) (n Node, dl, dr uint64) {
func (b *BpsTree) bs(x []byte) (n *Node, dl, dr uint64) {
dr = b.offt.Count()
m, l, r := 0, 0, len(b.mx) //nolint

for l < r {
m = (l + r) >> 1
n = b.mx[m]
Expand All @@ -271,94 +278,91 @@ func (b *BpsTree) bs(x []byte) (n Node, dl, dr uint64) {
case -1:
l = m + 1
dl = n.di
if dl < dr {
dl++
}
}
}
return n, dl, dr
}

// Seek returns first key which is >= key.
// Found is true iff exact key match is found.
// If key is nil, returns first key and found=true
// If found item.key has a prefix of key, returns found=false and item.key
// if key is greater than all keys, returns nil, found=false
func (b *BpsTree) Seek(g *seg.Reader, seekKey []byte) (key, value []byte, di uint64, found bool, err error) {
// Seek returns cursor pointing at first key which is >= seekKey.
// If key is nil, returns cursor with first key
// If found item.key has a prefix of key, returns item.key
// if key is greater than all keys, returns nil
func (b *BpsTree) Seek(g *seg.Reader, seekKey []byte) (cur *Cursor, err error) {
//b.trace = true
if b.trace {
fmt.Printf("seek %x\n", seekKey)
}
cur = b.cursorGetter(nil, nil, 0, g)
if len(seekKey) == 0 && b.offt.Count() > 0 {
key, value, _, err = b.dataLookupFunc(0, g)
if err != nil {
return nil, nil, 0, false, err
}
//return key, value, 0, bytes.Compare(key, seekKey) >= 0, nil
return key, value, 0, bytes.Equal(key, seekKey), nil
cur.Reset(0, g)
return cur, nil
}

// check cached nodes and narrow roi
n, l, r := b.bs(seekKey) // l===r when key is found
if b.trace {
fmt.Printf("pivot di:%d di(LR): [%d %d] k: %x found: %t\n", n.di, l, r, n.key, l == r)
defer func() { fmt.Printf("found=%t %x [%d %d]\n", bytes.Equal(key, seekKey), seekKey, l, r) }()
if l == r {
cur.Reset(n.di, g)
return cur, nil
}

// if b.trace {
// fmt.Printf("pivot di:%d di(LR): [%d %d] k: %x found: %t\n", n.di, l, r, n.key, l == r)
// defer func() { fmt.Printf("found=%t %x [%d %d]\n", bytes.Equal(key, seekKey), seekKey, l, r) }()
// }
var m uint64
var cmp int
for l < r {
m = (l + r) >> 1
if r-l <= DefaultBtreeStartSkip { // found small range, faster to scan now
cmp, key, err = b.keyCmpFunc(seekKey, l, g, key[:0])
if err != nil {
return nil, nil, 0, false, err
}
if b.trace {
fmt.Printf("fs di:[%d %d] k: %x\n", l, r, key)
// m = l
if cur.d == 0 {
cur.Reset(l, g)
} else {
cur.Next()
}
//fmt.Printf("N %d l %d cmp %d (found %x want %x)\n", b.offt.Count(), l, cmp, key, seekKey)
if cmp == 0 {
r = l
break
} else if cmp < 0 { //found key is greater than seekKey
if l+1 < b.offt.Count() {
l++
continue
}

if cmp = bytes.Compare(cur.key, seekKey); cmp < 0 {
l++
continue
}
r = l
break
return cur, err
}

m = (l + r) >> 1
cmp, key, err = b.keyCmpFunc(seekKey, m, g, key[:0])
cmp, cur.key, err = b.keyCmpFunc(seekKey, m, g, cur.key[:0])
if err != nil {
return nil, nil, 0, false, err
return nil, err
}
if b.trace {
fmt.Printf("fs di:[%d %d] k: %x\n", l, r, key)
fmt.Printf("[%d %d] k: %x\n", l, r, cur.key)
}

if cmp == 0 {
l, r = m, m
break
} else if cmp > 0 {
r = m
} else {
l = m + 1
}

}

if l == r {
m = l
}
key, value, _, err = b.dataLookupFunc(m, g)
if err != nil {
return nil, nil, 0, false, err

err = cur.Reset(m, g)
if err != nil || bytes.Compare(cur.Key(), seekKey) < 0 {
return nil, err
}
return key, value, l, bytes.Equal(key, seekKey), nil
return cur, nil
}

// returns first key which is >= key.
// If key is nil, returns first key
// if key is greater than all keys, returns nil
func (b *BpsTree) Get(g *seg.Reader, key []byte) (k []byte, ok bool, i uint64, err error) {
// Get: returns for exact given key, value and offset in file where key starts
// If given key is nil, returns first key
// If no exact match found, returns nil values
func (b *BpsTree) Get(g *seg.Reader, key []byte) (v []byte, ok bool, offset uint64, err error) {
if b.trace {
fmt.Printf("get %x\n", key)
}
Expand All @@ -369,6 +373,7 @@ func (b *BpsTree) Get(g *seg.Reader, key []byte) (k []byte, ok bool, i uint64, e
}
return v0, true, 0, nil
}

n, l, r := b.bs(key) // l===r when key is found
if b.trace {
fmt.Printf("pivot di: %d di(LR): [%d %d] k: %x found: %t\n", n.di, l, r, n.key, l == r)
Expand All @@ -379,29 +384,55 @@ func (b *BpsTree) Get(g *seg.Reader, key []byte) (k []byte, ok bool, i uint64, e
var m uint64
for l < r {
m = (l + r) >> 1
cmp, k, err = b.keyCmpFunc(key, m, g, k[:0])
if r-l <= DefaultBtreeStartSkip {
m = l
if offset == 0 {
offset = b.offt.Get(m)
g.Reset(offset)
}
v, _ = g.Next(v[:0])
if cmp = bytes.Compare(v, key); cmp > 0 {
return nil, false, 0, err
} else if cmp < 0 {
g.Skip()
l++
continue
}
v, _ = g.Next(nil)
offset = b.offt.Get(m)
return v, true, offset, nil
}

cmp, _, err = b.keyCmpFunc(key, m, g, v[:0])
if err != nil {
return nil, false, 0, err
}
if b.trace {
fmt.Printf("fs [%d %d]\n", l, r)
}

switch cmp {
case 0:
return k, true, m, nil
case 1:
if cmp == 0 {
offset = b.offt.Get(m)
if !g.HasNext() {
return nil, false, 0, fmt.Errorf("pair %d/%d key not found in %s", m, b.offt.Count(), g.FileName())
}
v, _ = g.Next(nil)
return v, true, offset, nil
} else if cmp > 0 {
r = m
case -1:
} else {
l = m + 1
}
if b.trace {
fmt.Printf("narrow [%d %d]\n", l, r)
}
}

cmp, k, err = b.keyCmpFunc(key, l, g, k[:0])
cmp, _, err = b.keyCmpFunc(key, l, g, v[:0])
if err != nil || cmp != 0 {
return nil, false, 0, err
}
return k, true, l, nil
if !g.HasNext() {
return nil, false, 0, fmt.Errorf("pair %d/%d key not found in %s", l, b.offt.Count(), g.FileName())
}
v, _ = g.Next(nil)
return v, true, b.offt.Get(l), nil
}

func (b *BpsTree) Offsets() *eliasfano32.EliasFano { return b.offt }
Expand Down
3 changes: 2 additions & 1 deletion erigon-lib/state/bpstree_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ func BenchmarkBpsTreeSeek(t *testing.B) {
key, _ = getter.Next(key[:0])
getter.Skip()
//_, err := bt.Seek(getter, keys[r.Intn(len(keys))])
_, err := bt.Seek(getter, key)
c, err := bt.Seek(getter, key)
require.NoError(t, err)
c.Close()
}
t.ReportAllocs()
}
Loading

0 comments on commit f00d0f1

Please sign in to comment.