From f00d0f11c461ad940b9816b665ad1bbfbd8ef984 Mon Sep 17 00:00:00 2001 From: awskii Date: Wed, 27 Nov 2024 14:35:16 +0000 Subject: [PATCH] bt index: read value if key matches straight away (#12786) another try to improve bt indices. + keep cursors in pool + untie Cursor from BtIndex, now it shares ptr to pool for cursor disposal after .Close() and ptr to eliasFano mapping of `di -> offset_to_key` + reduce allocations to zero during seek + Changed seek semantics. Previously we returned true if key matches exactly to `seekKey`. Now caller should do such check if needed. Seek guarantees to return exact match OR first key bigger than `seekKey` + changed Get semantics - now it returns value for given `key` with only one exception: nil key returns value for first key. + read Value immediately if key matches during Get (skip another ef.Get and decoding key twice) Reduces load on GC and slightly improves performance tried to use `MatchCmp` as well but this causes gas mismatch (i assume during reading from compressed file, because local tests are green though). --- erigon-lib/seg/seg_auto_rw.go | 7 + erigon-lib/state/aggregator_bench_test.go | 4 +- erigon-lib/state/aggregator_test.go | 3 +- erigon-lib/state/bps_tree.go | 163 +++++++++++++--------- erigon-lib/state/bpstree_bench_test.go | 3 +- erigon-lib/state/btree_index.go | 144 ++++++++++++------- erigon-lib/state/btree_index_test.go | 83 +++++++---- erigon-lib/state/domain.go | 19 ++- erigon-lib/state/domain_shared.go | 7 + erigon-lib/state/domain_stream.go | 6 + erigon-lib/state/domain_test.go | 4 +- erigon-lib/state/merge.go | 6 +- turbo/app/snapshots_cmd.go | 1 + 13 files changed, 301 insertions(+), 149 deletions(-) diff --git a/erigon-lib/seg/seg_auto_rw.go b/erigon-lib/seg/seg_auto_rw.go index f16dc073f3e..92136c8afc4 100644 --- a/erigon-lib/seg/seg_auto_rw.go +++ b/erigon-lib/seg/seg_auto_rw.go @@ -81,6 +81,13 @@ func (g *Reader) MatchPrefix(prefix []byte) bool { return g.Getter.MatchPrefixUncompressed(prefix) } +func (g *Reader) MatchCmp(prefix []byte) int { + if g.c&CompressKeys != 0 { + return g.Getter.MatchCmp(prefix) + } + return g.Getter.MatchCmpUncompressed(prefix) +} + func (g *Reader) Next(buf []byte) ([]byte, uint64) { fl := CompressKeys if g.nextValue { diff --git a/erigon-lib/state/aggregator_bench_test.go b/erigon-lib/state/aggregator_bench_test.go index 4c2b673caff..63fa225af81 100644 --- a/erigon-lib/state/aggregator_bench_test.go +++ b/erigon-lib/state/aggregator_bench_test.go @@ -161,6 +161,7 @@ func Benchmark_BtreeIndex_Search(b *testing.B) { require.NoErrorf(b, err, "i=%d", i) require.EqualValues(b, keys[p], cur.Key()) require.NotEmptyf(b, cur.Value(), "i=%d", i) + cur.Close() } } @@ -201,6 +202,7 @@ func Benchmark_BTree_Seek(b *testing.B) { require.NoError(b, err) require.EqualValues(b, keys[p], cur.key) + cur.Close() } }) @@ -233,7 +235,7 @@ func Benchmark_BTree_Seek(b *testing.B) { if i%1000 == 0 { fmt.Printf("next_access_last[of %d keys] %v\n", nextKeys, ntimer/time.Duration(nextKeys)) } - + cur.Close() } }) } diff --git a/erigon-lib/state/aggregator_test.go b/erigon-lib/state/aggregator_test.go index fc94048090b..6c0c9d34558 100644 --- a/erigon-lib/state/aggregator_test.go +++ b/erigon-lib/state/aggregator_test.go @@ -213,7 +213,7 @@ func TestNewBtIndex(t *testing.T) { defer kv.Close() require.NotNil(t, kv) require.NotNil(t, bt) - require.Len(t, bt.bplus.mx, keyCount/int(DefaultBtreeM)) + require.True(t, len(bt.bplus.mx) >= keyCount/int(DefaultBtreeM)) for i := 1; i < len(bt.bplus.mx); i++ { require.NotZero(t, bt.bplus.mx[i].di) @@ -652,6 +652,7 @@ func TestAggregatorV3_RestartOnFiles(t *testing.T) { storedV, _, found, err := ac.GetLatest(kv.StorageDomain, key[:length.Addr], key[length.Addr:], newTx) require.NoError(t, err) require.True(t, found) + require.NotEmpty(t, storedV) _ = key[0] _ = storedV[0] require.EqualValues(t, key[0], storedV[0]) diff --git a/erigon-lib/state/bps_tree.go b/erigon-lib/state/bps_tree.go index 188d7723590..85434ada326 100644 --- a/erigon-lib/state/bps_tree.go +++ b/erigon-lib/state/bps_tree.go @@ -64,7 +64,7 @@ func NewBpsTree(kv *seg.Reader, offt *eliasfano32.EliasFano, M uint64, dataLooku // "assert key behind offset == to stored key in bt" var envAssertBTKeys = dbg.EnvBool("BT_ASSERT_OFFSETS", false) -func NewBpsTreeWithNodes(kv *seg.Reader, offt *eliasfano32.EliasFano, M uint64, dataLookup dataLookupFunc, keyCmp keyCmpFunc, nodes []Node) *BpsTree { +func NewBpsTreeWithNodes(kv *seg.Reader, offt *eliasfano32.EliasFano, M uint64, dataLookup dataLookupFunc, keyCmp keyCmpFunc, nodes []*Node) *BpsTree { bt := &BpsTree{M: M, offt: offt, dataLookupFunc: dataLookup, keyCmpFunc: keyCmp, mx: nodes} nsz := uint64(unsafe.Sizeof(Node{})) @@ -80,6 +80,7 @@ func NewBpsTreeWithNodes(kv *seg.Reader, offt *eliasfano32.EliasFano, M uint64, } } cachedBytes += nsz + uint64(len(nodes[i].key)) + nodes[i].off = offt.Get(nodes[i].di) } @@ -88,14 +89,17 @@ func NewBpsTreeWithNodes(kv *seg.Reader, offt *eliasfano32.EliasFano, M uint64, type BpsTree struct { offt *eliasfano32.EliasFano // ef with offsets to key/vals - mx []Node + mx []*Node M uint64 // limit on amount of 'children' for node trace bool dataLookupFunc dataLookupFunc keyCmpFunc keyCmpFunc + cursorGetter cursorGetter } +type cursorGetter func(k, v []byte, di uint64, g *seg.Reader) *Cursor + type BpsTreeIterator struct { t *BpsTree i uint64 @@ -178,15 +182,17 @@ func encodeListNodes(nodes []Node, w io.Writer) error { return nil } -func decodeListNodes(data []byte) ([]Node, error) { +func decodeListNodes(data []byte) ([]*Node, error) { count := binary.BigEndian.Uint64(data[:8]) - nodes := make([]Node, count) + nodes := make([]*Node, count) pos := 8 for ni := 0; ni < int(count); ni++ { - dp, err := (&nodes[ni]).Decode(data[pos:]) + node := new(Node) + dp, err := node.Decode(data[pos:]) if err != nil { return nil, fmt.Errorf("decode node %d: %w", ni, err) } + nodes[ni] = node pos += int(dp) } return nodes, nil @@ -220,7 +226,7 @@ func (b *BpsTree) WarmUp(kv *seg.Reader) (err error) { if N == 0 { return nil } - b.mx = make([]Node, 0, N/b.M) + b.mx = make([]*Node, 0, N/b.M) if b.trace { fmt.Printf("mx cap %d N=%d M=%d\n", cap(b.mx), N, b.M) } @@ -240,7 +246,7 @@ func (b *BpsTree) WarmUp(kv *seg.Reader) (err error) { if err != nil { return err } - b.mx = append(b.mx, Node{off: b.offt.Get(di), key: common.Copy(key), di: di}) + b.mx = append(b.mx, &Node{off: b.offt.Get(di), key: common.Copy(key), di: di}) cachedBytes += nsz + uint64(len(key)) } @@ -252,9 +258,10 @@ func (b *BpsTree) WarmUp(kv *seg.Reader) (err error) { } // bs performs pre-seach over warmed-up list of nodes to figure out left and right bounds on di for key -func (b *BpsTree) bs(x []byte) (n Node, dl, dr uint64) { +func (b *BpsTree) bs(x []byte) (n *Node, dl, dr uint64) { dr = b.offt.Count() m, l, r := 0, 0, len(b.mx) //nolint + for l < r { m = (l + r) >> 1 n = b.mx[m] @@ -271,94 +278,91 @@ func (b *BpsTree) bs(x []byte) (n Node, dl, dr uint64) { case -1: l = m + 1 dl = n.di + if dl < dr { + dl++ + } } } return n, dl, dr } -// Seek returns first key which is >= key. -// Found is true iff exact key match is found. -// If key is nil, returns first key and found=true -// If found item.key has a prefix of key, returns found=false and item.key -// if key is greater than all keys, returns nil, found=false -func (b *BpsTree) Seek(g *seg.Reader, seekKey []byte) (key, value []byte, di uint64, found bool, err error) { +// Seek returns cursor pointing at first key which is >= seekKey. +// If key is nil, returns cursor with first key +// If found item.key has a prefix of key, returns item.key +// if key is greater than all keys, returns nil +func (b *BpsTree) Seek(g *seg.Reader, seekKey []byte) (cur *Cursor, err error) { //b.trace = true if b.trace { fmt.Printf("seek %x\n", seekKey) } + cur = b.cursorGetter(nil, nil, 0, g) if len(seekKey) == 0 && b.offt.Count() > 0 { - key, value, _, err = b.dataLookupFunc(0, g) - if err != nil { - return nil, nil, 0, false, err - } - //return key, value, 0, bytes.Compare(key, seekKey) >= 0, nil - return key, value, 0, bytes.Equal(key, seekKey), nil + cur.Reset(0, g) + return cur, nil } + // check cached nodes and narrow roi n, l, r := b.bs(seekKey) // l===r when key is found - if b.trace { - fmt.Printf("pivot di:%d di(LR): [%d %d] k: %x found: %t\n", n.di, l, r, n.key, l == r) - defer func() { fmt.Printf("found=%t %x [%d %d]\n", bytes.Equal(key, seekKey), seekKey, l, r) }() + if l == r { + cur.Reset(n.di, g) + return cur, nil } + + // if b.trace { + // fmt.Printf("pivot di:%d di(LR): [%d %d] k: %x found: %t\n", n.di, l, r, n.key, l == r) + // defer func() { fmt.Printf("found=%t %x [%d %d]\n", bytes.Equal(key, seekKey), seekKey, l, r) }() + // } var m uint64 var cmp int for l < r { + m = (l + r) >> 1 if r-l <= DefaultBtreeStartSkip { // found small range, faster to scan now - cmp, key, err = b.keyCmpFunc(seekKey, l, g, key[:0]) - if err != nil { - return nil, nil, 0, false, err - } - if b.trace { - fmt.Printf("fs di:[%d %d] k: %x\n", l, r, key) + // m = l + if cur.d == 0 { + cur.Reset(l, g) + } else { + cur.Next() } - //fmt.Printf("N %d l %d cmp %d (found %x want %x)\n", b.offt.Count(), l, cmp, key, seekKey) - if cmp == 0 { - r = l - break - } else if cmp < 0 { //found key is greater than seekKey - if l+1 < b.offt.Count() { - l++ - continue - } + + if cmp = bytes.Compare(cur.key, seekKey); cmp < 0 { + l++ + continue } - r = l - break + return cur, err } - m = (l + r) >> 1 - cmp, key, err = b.keyCmpFunc(seekKey, m, g, key[:0]) + cmp, cur.key, err = b.keyCmpFunc(seekKey, m, g, cur.key[:0]) if err != nil { - return nil, nil, 0, false, err + return nil, err } if b.trace { - fmt.Printf("fs di:[%d %d] k: %x\n", l, r, key) + fmt.Printf("[%d %d] k: %x\n", l, r, cur.key) } if cmp == 0 { - l, r = m, m break } else if cmp > 0 { r = m } else { l = m + 1 } - } if l == r { m = l } - key, value, _, err = b.dataLookupFunc(m, g) - if err != nil { - return nil, nil, 0, false, err + + err = cur.Reset(m, g) + if err != nil || bytes.Compare(cur.Key(), seekKey) < 0 { + return nil, err } - return key, value, l, bytes.Equal(key, seekKey), nil + return cur, nil } -// returns first key which is >= key. -// If key is nil, returns first key -// if key is greater than all keys, returns nil -func (b *BpsTree) Get(g *seg.Reader, key []byte) (k []byte, ok bool, i uint64, err error) { +// Get: returns for exact given key, value and offset in file where key starts +// If given key is nil, returns first key +// If no exact match found, returns nil values +func (b *BpsTree) Get(g *seg.Reader, key []byte) (v []byte, ok bool, offset uint64, err error) { if b.trace { fmt.Printf("get %x\n", key) } @@ -369,6 +373,7 @@ func (b *BpsTree) Get(g *seg.Reader, key []byte) (k []byte, ok bool, i uint64, e } return v0, true, 0, nil } + n, l, r := b.bs(key) // l===r when key is found if b.trace { fmt.Printf("pivot di: %d di(LR): [%d %d] k: %x found: %t\n", n.di, l, r, n.key, l == r) @@ -379,29 +384,55 @@ func (b *BpsTree) Get(g *seg.Reader, key []byte) (k []byte, ok bool, i uint64, e var m uint64 for l < r { m = (l + r) >> 1 - cmp, k, err = b.keyCmpFunc(key, m, g, k[:0]) + if r-l <= DefaultBtreeStartSkip { + m = l + if offset == 0 { + offset = b.offt.Get(m) + g.Reset(offset) + } + v, _ = g.Next(v[:0]) + if cmp = bytes.Compare(v, key); cmp > 0 { + return nil, false, 0, err + } else if cmp < 0 { + g.Skip() + l++ + continue + } + v, _ = g.Next(nil) + offset = b.offt.Get(m) + return v, true, offset, nil + } + + cmp, _, err = b.keyCmpFunc(key, m, g, v[:0]) if err != nil { return nil, false, 0, err } - if b.trace { - fmt.Printf("fs [%d %d]\n", l, r) - } - - switch cmp { - case 0: - return k, true, m, nil - case 1: + if cmp == 0 { + offset = b.offt.Get(m) + if !g.HasNext() { + return nil, false, 0, fmt.Errorf("pair %d/%d key not found in %s", m, b.offt.Count(), g.FileName()) + } + v, _ = g.Next(nil) + return v, true, offset, nil + } else if cmp > 0 { r = m - case -1: + } else { l = m + 1 } + if b.trace { + fmt.Printf("narrow [%d %d]\n", l, r) + } } - cmp, k, err = b.keyCmpFunc(key, l, g, k[:0]) + cmp, _, err = b.keyCmpFunc(key, l, g, v[:0]) if err != nil || cmp != 0 { return nil, false, 0, err } - return k, true, l, nil + if !g.HasNext() { + return nil, false, 0, fmt.Errorf("pair %d/%d key not found in %s", l, b.offt.Count(), g.FileName()) + } + v, _ = g.Next(nil) + return v, true, b.offt.Get(l), nil } func (b *BpsTree) Offsets() *eliasfano32.EliasFano { return b.offt } diff --git a/erigon-lib/state/bpstree_bench_test.go b/erigon-lib/state/bpstree_bench_test.go index 4af09ee48a4..c0ac5a3f2a6 100644 --- a/erigon-lib/state/bpstree_bench_test.go +++ b/erigon-lib/state/bpstree_bench_test.go @@ -44,8 +44,9 @@ func BenchmarkBpsTreeSeek(t *testing.B) { key, _ = getter.Next(key[:0]) getter.Skip() //_, err := bt.Seek(getter, keys[r.Intn(len(keys))]) - _, err := bt.Seek(getter, key) + c, err := bt.Seek(getter, key) require.NoError(t, err) + c.Close() } t.ReportAllocs() } diff --git a/erigon-lib/state/btree_index.go b/erigon-lib/state/btree_index.go index d35e051cd48..72906690bfa 100644 --- a/erigon-lib/state/btree_index.go +++ b/erigon-lib/state/btree_index.go @@ -19,7 +19,6 @@ package state import ( "bufio" "bytes" - "context" "encoding/binary" "errors" "fmt" @@ -29,6 +28,7 @@ import ( "path/filepath" "sort" "strings" + "sync" "time" "github.com/c2h5oh/datasize" @@ -75,26 +75,29 @@ type node struct { } type Cursor struct { - btt *BtIndex - ctx context.Context - getter *seg.Reader - key []byte - value []byte - d uint64 + ef *eliasfano32.EliasFano + returnInto *sync.Pool + getter *seg.Reader + key []byte + value []byte + d uint64 } -//getter should be alive all the time of cursor usage -//Key and value is valid until cursor.Next is called -//func NewCursor(ctx context.Context, k, v []byte, d uint64, g ArchiveGetter) *Cursor { -// return &Cursor{ -// ctx: ctx, -// getter: g, -// key: common.Copy(k), -// value: common.Copy(v), -// d: d, -// } -//} +func (c *Cursor) Close() { + if c == nil { + return + } + c.key = c.key[:0] + c.value = c.value[:0] + c.d = 0 + c.getter = nil + if c.returnInto != nil { + c.returnInto.Put(c) + } +} +// getter should be alive all the time of cursor usage +// Key and value is valid until cursor.Next is called func (c *Cursor) Key() []byte { return c.key } @@ -107,29 +110,56 @@ func (c *Cursor) Value() []byte { return c.value } -func (c *Cursor) Next() bool { +func (c *Cursor) Next() bool { // could return error instead if !c.next() { + // c.Close() return false } - key, value, _, err := c.btt.dataLookup(c.d, c.getter) - if err != nil { + if err := c.readKV(); err != nil { + fmt.Printf("nextKV error %v\n", err) return false } - c.key, c.value = key, value return true } // next returns if another key/value pair is available int that index. // moves pointer d to next element if successful func (c *Cursor) next() bool { - if c.d+1 == c.btt.ef.Count() { + if c.d+1 == c.ef.Count() { return false } c.d++ return true } +func (c *Cursor) Reset(di uint64, g *seg.Reader) error { + c.d = di + c.getter = g + return c.readKV() +} + +func (c *Cursor) readKV() error { + if c.d >= c.ef.Count() { + return fmt.Errorf("%w %d/%d", ErrBtIndexLookupBounds, c.d, c.ef.Count()) + } + if c.getter == nil { + return fmt.Errorf("getter is nil") + } + + offset := c.ef.Get(c.d) + c.getter.Reset(offset) + if !c.getter.HasNext() { + return fmt.Errorf("pair %d/%d key not found, file: %s/%s", c.d, c.ef.Count(), c.getter.FileName(), c.getter.FileName()) + } + c.key, _ = c.getter.Next(nil) + if !c.getter.HasNext() { + return fmt.Errorf("pair %d/%d val not found, file: %s/%s", c.d, c.ef.Count(), c.getter.FileName(), c.getter.FileName()) + } + c.value, _ = c.getter.Next(nil) // if value is not compressed, we getting ptr to slice from mmap, may need to copy + return nil +} + type btAlloc struct { d uint64 // depth M uint64 // child limit of any node @@ -607,7 +637,7 @@ func NewBtIndexWriter(args BtIndexWriterArgs, logger log.Logger) (*BtIndexWriter return btw, nil } -func (btw *BtIndexWriter) AddKey(key []byte, offset uint64) error { +func (btw *BtIndexWriter) AddKey(key []byte, offset uint64, keep bool) error { if btw.built { return errors.New("cannot add keys after perfect hash function had been built") } @@ -617,7 +647,7 @@ func (btw *BtIndexWriter) AddKey(key []byte, offset uint64) error { btw.maxOffset = offset } - keepKey := false + keepKey := keep if btw.keysWritten > 0 { delta := offset - btw.prevOffset if btw.keysWritten == 1 || delta < btw.minDelta { @@ -738,6 +768,7 @@ type BtIndex struct { size int64 modTime time.Time filePath string + pool sync.Pool } // Decompressor should be managed by caller (could be closed after index is built). When index is built, external getter should be passed to seekInFiles function @@ -798,9 +829,15 @@ func BuildBtreeIndexWithDecompressor(indexPath string, kv *seg.Decompressor, com key := make([]byte, 0, 64) var pos uint64 + var b0 [256]bool for getter.HasNext() { key, _ = getter.Next(key[:0]) - err = iw.AddKey(key, pos) + keep := false + if !b0[key[0]] { + b0[key[0]] = true + keep = true + } + err = iw.AddKey(key, pos, keep) if err != nil { return err } @@ -868,6 +905,10 @@ func OpenBtreeIndexWithDecompressor(indexPath string, M uint64, kv *seg.Decompre } idx.ef, pos = eliasfano32.ReadEliasFano(idx.data[pos:]) + idx.pool = sync.Pool{} + idx.pool.New = func() any { + return &Cursor{ef: idx.ef, returnInto: &idx.pool} + } defer kv.EnableMadvNormal().DisableReadAhead() kvGetter := seg.NewReader(kv.MakeGetter(), compress) @@ -879,6 +920,7 @@ func OpenBtreeIndexWithDecompressor(indexPath string, M uint64, kv *seg.Decompre case true: if len(idx.data[pos:]) == 0 { idx.bplus = NewBpsTree(kvGetter, idx.ef, M, idx.dataLookup, idx.keyCmp) + idx.bplus.cursorGetter = idx.newCursor // fallback for files without nodes encoded } else { nodes, err := decodeListNodes(idx.data[pos:]) @@ -886,6 +928,7 @@ func OpenBtreeIndexWithDecompressor(indexPath string, M uint64, kv *seg.Decompre return nil, err } idx.bplus = NewBpsTreeWithNodes(kvGetter, idx.ef, M, idx.dataLookup, idx.keyCmp, nodes) + idx.bplus.cursorGetter = idx.newCursor } default: idx.alloc = newBtAlloc(idx.ef.Count(), M, false, idx.dataLookup, idx.keyCmp) @@ -940,15 +983,15 @@ func (b *BtIndex) keyCmp(k []byte, di uint64, g *seg.Reader, resBuf []byte) (int // getter should be alive all the time of cursor usage // Key and value is valid until cursor.Next is called -func (b *BtIndex) newCursor(ctx context.Context, k, v []byte, d uint64, g *seg.Reader) *Cursor { - return &Cursor{ - ctx: ctx, - getter: g, - key: common.Copy(k), - value: common.Copy(v), - d: d, - btt: b, - } +func (b *BtIndex) newCursor(k, v []byte, d uint64, g *seg.Reader) *Cursor { + c := b.pool.Get().(*Cursor) + c.ef = b.ef + c.returnInto = &b.pool + + c.d, c.getter = d, g + c.key = append(c.key[:0], k...) + c.value = append(c.value[:0], v...) + return c } func (b *BtIndex) Size() int64 { return b.size } @@ -1009,13 +1052,18 @@ func (b *BtIndex) Get(lookup []byte, gr *seg.Reader) (k, v []byte, offsetInFile if b.bplus == nil { panic(fmt.Errorf("Get: `b.bplus` is nil: %s", gr.FileName())) } - // v is actual value, not offset. - // weak assumption that k will be ignored and used lookup instead. // since fetching k and v from data file is required to use Getter. // Why to do Getter.Reset twice when we can get kv right there. - k, found, index, err = b.bplus.Get(gr, lookup) + v, found, offsetInFile, err = b.bplus.Get(gr, lookup) + if err != nil { + if errors.Is(err, ErrBtIndexLookupBounds) { + return k, v, offsetInFile, false, nil + } + return lookup, v, offsetInFile, false, err + } + return lookup, v, offsetInFile, found, nil } else { if b.alloc == nil { return k, v, 0, false, err @@ -1029,10 +1077,6 @@ func (b *BtIndex) Get(lookup []byte, gr *seg.Reader) (k, v []byte, offsetInFile return nil, nil, 0, false, err } - // this comparation should be done by index get method, and in case of mismatch, key is not found - //if !bytes.Equal(k, lookup) { - // return k, v, false, nil - //} k, v, offsetInFile, err = b.dataLookup(index, gr) if err != nil { if errors.Is(err, ErrBtIndexLookupBounds) { @@ -1047,24 +1091,22 @@ func (b *BtIndex) Get(lookup []byte, gr *seg.Reader) (k, v []byte, offsetInFile // Then if x == nil - first key returned // // if x is larger than any other key in index, nil cursor is returned. +// +// Caller should close cursor after use. func (b *BtIndex) Seek(g *seg.Reader, x []byte) (*Cursor, error) { if b.Empty() { return nil, nil } if b.useBplus { - k, v, dt, _, err := b.bplus.Seek(g, x) - if err != nil /*|| !found*/ { + c, err := b.bplus.Seek(g, x) + if err != nil || c == nil { if errors.Is(err, ErrBtIndexLookupBounds) { return nil, nil } return nil, err } - if bytes.Compare(k, x) >= 0 { - return b.newCursor(context.Background(), k, v, dt, g), nil - } - return nil, nil + return c, nil } - _, dt, found, err := b.alloc.Seek(g, x) if err != nil || !found { if errors.Is(err, ErrBtIndexLookupBounds) { @@ -1080,7 +1122,7 @@ func (b *BtIndex) Seek(g *seg.Reader, x []byte) (*Cursor, error) { } return nil, err } - return b.newCursor(context.Background(), k, v, dt, g), nil + return b.newCursor(k, v, dt, g), nil } func (b *BtIndex) OrdinalLookup(getter *seg.Reader, i uint64) *Cursor { @@ -1088,7 +1130,7 @@ func (b *BtIndex) OrdinalLookup(getter *seg.Reader, i uint64) *Cursor { if err != nil { return nil } - return b.newCursor(context.Background(), k, v, i, getter) + return b.newCursor(k, v, i, getter) } func (b *BtIndex) Offsets() *eliasfano32.EliasFano { return b.bplus.Offsets() } func (b *BtIndex) Distances() (map[int]int, error) { return b.bplus.Distances() } diff --git a/erigon-lib/state/btree_index_test.go b/erigon-lib/state/btree_index_test.go index 9e2dcc0fe02..c916cc3366e 100644 --- a/erigon-lib/state/btree_index_test.go +++ b/erigon-lib/state/btree_index_test.go @@ -122,24 +122,24 @@ func Test_BtreeIndex_Seek(t *testing.T) { cur, err := bt.Seek(getter, common.FromHex("0xffffffffffffff")) //seek beyeon the last key require.NoError(t, err) require.Nil(t, cur) + cur.Close() }) c, err := bt.Seek(getter, nil) require.NoError(t, err) for i := 0; i < len(keys); i++ { k := c.Key() - //if !bytes.Equal(keys[i], k) { - // fmt.Printf("\tinvalid, want %x, got %x\n", keys[i], k) - //} require.EqualValues(t, keys[i], k) c.Next() } + c.Close() for i := 0; i < len(keys); i++ { cur, err := bt.Seek(getter, keys[i]) require.NoErrorf(t, err, "i=%d", i) require.EqualValuesf(t, keys[i], cur.key, "i=%d", i) require.NotEmptyf(t, cur.Value(), "i=%d", i) + cur.Close() // require.EqualValues(t, uint64(i), cur.Value()) } for i := 1; i < len(keys); i++ { @@ -153,6 +153,7 @@ func Test_BtreeIndex_Seek(t *testing.T) { cur, err := bt.Seek(getter, keys[i]) require.NoError(t, err) require.EqualValues(t, keys[i], cur.Key()) + cur.Close() } } @@ -190,10 +191,13 @@ func Test_BtreeIndex_Build(t *testing.T) { } c.Next() } + c.Close() + for i := 0; i < 10000; i++ { c, err := bt.Seek(getter, keys[i]) require.NoError(t, err) require.EqualValues(t, keys[i], c.Key()) + c.Close() } } @@ -246,25 +250,47 @@ func Test_BtreeIndex_Seek2(t *testing.T) { cur, err := bt.Seek(getter, common.FromHex("0xffffffffffffff")) //seek beyeon the last key require.NoError(t, err) require.Nil(t, cur) + cur.Close() }) - c, err := bt.Seek(getter, nil) - require.NoError(t, err) - for i := 0; i < len(keys); i++ { - k := c.Key() - if !bytes.Equal(keys[i], k) { - fmt.Printf("\tinvalid, want %x\n", keys[i]) + t.Run("checkNextAgainstGetter", func(t *testing.T) { + cur, err := bt.Seek(getter, nil) + require.NoError(t, err) + defer cur.Close() + + require.NoError(t, err) + require.EqualValues(t, keys[0], cur.Key()) + require.NotEmptyf(t, cur.Value(), "i=%d", 0) + + k, v, _, err := bt.dataLookup(0, getter) + require.NoError(t, err) + cur.Reset(0, getter) + + require.EqualValues(t, k, cur.Key()) + require.EqualValues(t, v, cur.Value()) + + totalKeys := kv.Count() / 2 + + for i := 1; i < totalKeys; i++ { + k, v, _, err = bt.dataLookup(uint64(i), getter) + require.NoError(t, err) + + b := cur.Next() + require.True(t, b) + + require.EqualValuesf(t, k, cur.Key(), "i=%d", i) + require.EqualValuesf(t, v, cur.Value(), "i=%d", i) + + curS, err := bt.Seek(getter, cur.Key()) + require.NoError(t, err) + + require.EqualValuesf(t, cur.Key(), curS.Key(), "i=%d", i) + require.EqualValuesf(t, cur.Value(), curS.Value(), "i=%d", i) + require.EqualValues(t, cur.d, curS.d) + require.EqualValues(t, cur.getter, curS.getter) } - c.Next() - } + }) - for i := 0; i < len(keys); i++ { - cur, err := bt.Seek(getter, keys[i]) - require.NoErrorf(t, err, "i=%d", i) - require.EqualValues(t, keys[i], cur.key) - require.NotEmptyf(t, cur.Value(), "i=%d", i) - // require.EqualValues(t, uint64(i), cur.Value()) - } for i := 1; i < len(keys); i++ { alt := common.Copy(keys[i]) for j := len(alt) - 1; j >= 0; j-- { @@ -323,20 +349,19 @@ func TestBpsTree_Seek(t *testing.T) { ir := NewMockIndexReader(efi) bp := NewBpsTree(g, efi, uint64(M), ir.dataLookup, ir.keyCmp) + bp.cursorGetter = ir.newCursor bp.trace = false for i := 0; i < len(keys); i++ { sk := keys[i] - k, _, di, found, err := bp.Seek(g, sk[:len(sk)/2]) - _ = di - _ = found + c, err := bp.Seek(g, sk[:len(sk)/2]) require.NoError(t, err) - require.NotNil(t, k) - require.False(t, found) // we are looking up by half of key, while FOUND=true when exact match found. + require.NotNil(t, c) + require.NotNil(t, c.Key()) //k, _, err := it.KVFromGetter(g) //require.NoError(t, err) - require.EqualValues(t, keys[i], k) + require.EqualValues(t, keys[i], c.Key()) } } @@ -348,6 +373,16 @@ type mockIndexReader struct { ef *eliasfano32.EliasFano } +func (b *mockIndexReader) newCursor(k, v []byte, di uint64, g *seg.Reader) *Cursor { + return &Cursor{ + ef: b.ef, + getter: g, + key: common.Copy(k), + value: common.Copy(v), + d: di, + } +} + func (b *mockIndexReader) dataLookup(di uint64, g *seg.Reader) (k, v []byte, offset uint64, err error) { if di >= b.ef.Count() { return nil, nil, 0, fmt.Errorf("%w: keyCount=%d, but key %d requested. file: %s", ErrBtIndexLookupBounds, b.ef.Count(), di, g.FileName()) diff --git a/erigon-lib/state/domain.go b/erigon-lib/state/domain.go index 8a8a5bc53d1..93f8569f0bd 100644 --- a/erigon-lib/state/domain.go +++ b/erigon-lib/state/domain.go @@ -384,7 +384,11 @@ func (d *Domain) openDirtyFiles() (err error) { d.logger.Warn("[agg] Domain.openDirtyFiles", "err", err, "f", fName) } if exists { - if item.bindex, err = OpenBtreeIndexWithDecompressor(fPath, DefaultBtreeM, item.decompressor, d.compression); err != nil { + btM := DefaultBtreeM + if toStep == 0 && d.filenameBase == "commitment" { + btM = 128 + } + if item.bindex, err = OpenBtreeIndexWithDecompressor(fPath, btM, item.decompressor, d.compression); err != nil { _, fName := filepath.Split(fPath) d.logger.Warn("[agg] Domain.openDirtyFiles", "err", err, "f", fName) // don't interrupt on error. other files may be good @@ -1122,7 +1126,12 @@ func (d *Domain) buildFileRange(ctx context.Context, stepFrom, stepTo uint64, co if d.indexList&withBTree != 0 { btPath := d.kvBtFilePath(stepFrom, stepTo) - bt, err = CreateBtreeIndexWithDecompressor(btPath, DefaultBtreeM, valuesDecomp, d.compression, *d.salt, ps, d.dirs.Tmp, d.logger, d.noFsync) + btM := DefaultBtreeM + if stepFrom == 0 && d.filenameBase == "commitment" { + btM = 128 + } + + bt, err = CreateBtreeIndexWithDecompressor(btPath, btM, valuesDecomp, d.compression, *d.salt, ps, d.dirs.Tmp, d.logger, d.noFsync) if err != nil { return StaticFiles{}, fmt.Errorf("build %s .bt idx: %w", d.filenameBase, err) } @@ -1220,7 +1229,11 @@ func (d *Domain) buildFiles(ctx context.Context, step uint64, collation Collatio if d.indexList&withBTree != 0 { btPath := d.kvBtFilePath(step, step+1) - bt, err = CreateBtreeIndexWithDecompressor(btPath, DefaultBtreeM, valuesDecomp, d.compression, *d.salt, ps, d.dirs.Tmp, d.logger, d.noFsync) + btM := DefaultBtreeM + if step == 0 && d.filenameBase == "commitment" { + btM = 128 + } + bt, err = CreateBtreeIndexWithDecompressor(btPath, btM, valuesDecomp, d.compression, *d.salt, ps, d.dirs.Tmp, d.logger, d.noFsync) if err != nil { return StaticFiles{}, fmt.Errorf("build %s .bt idx: %w", d.filenameBase, err) } diff --git a/erigon-lib/state/domain_shared.go b/erigon-lib/state/domain_shared.go index c4f0fe0a679..251c8eaa0da 100644 --- a/erigon-lib/state/domain_shared.go +++ b/erigon-lib/state/domain_shared.go @@ -835,6 +835,8 @@ func (sd *SharedDomains) IterateStoragePrefix(prefix []byte, it func(k []byte, v ci1.val = ci1.btCursor.Value() heap.Push(cpPtr, ci1) } + } else { + ci1.btCursor.Close() } } if indexList&withHashMap != 0 { @@ -847,6 +849,8 @@ func (sd *SharedDomains) IterateStoragePrefix(prefix []byte, it func(k []byte, v ci1.key = key ci1.val, ci1.latestOffset = ci1.dg.Next(nil) heap.Push(cpPtr, ci1) + } else { + ci1.dg = nil } } case DB_CURSOR: @@ -860,12 +864,15 @@ func (sd *SharedDomains) IterateStoragePrefix(prefix []byte, it func(k []byte, v step := ^binary.BigEndian.Uint64(v[:8]) endTxNum := step * sd.StepSize() // DB can store not-finished step, it means - then set first txn in step - it anyway will be ahead of files if haveRamUpdates && endTxNum >= sd.txNum { + ci1.cDup.Close() return fmt.Errorf("probably you didn't set SharedDomains.SetTxNum(). ram must be ahead of db: %d, %d", sd.txNum, endTxNum) } ci1.endTxNum = endTxNum ci1.val = common.Copy(v[8:]) ci1.step = step heap.Push(cpPtr, ci1) + } else { + ci1.cDup.Close() } } } diff --git a/erigon-lib/state/domain_stream.go b/erigon-lib/state/domain_stream.go index f9a293862f0..67d7605f849 100644 --- a/erigon-lib/state/domain_stream.go +++ b/erigon-lib/state/domain_stream.go @@ -200,6 +200,8 @@ func (hi *DomainLatestIterFile) advanceInFiles() error { if ci1.key != nil && (hi.to == nil || bytes.Compare(ci1.key, hi.to) < 0) { heap.Push(hi.h, ci1) } + } else { + ci1.btCursor.Close() } case DB_CURSOR: if hi.largeVals { @@ -229,6 +231,8 @@ func (hi *DomainLatestIterFile) advanceInFiles() error { ci1.val = common.Copy(v) heap.Push(hi.h, ci1) + } else { + ci1.cNonDup.Close() } } else { // start from current go to next @@ -247,6 +251,8 @@ func (hi *DomainLatestIterFile) advanceInFiles() error { ci1.val = common.Copy(v) heap.Push(hi.h, ci1) + } else { + ci1.cDup.Close() } } diff --git a/erigon-lib/state/domain_test.go b/erigon-lib/state/domain_test.go index 7b1767c1f08..9c5b3a810fe 100644 --- a/erigon-lib/state/domain_test.go +++ b/erigon-lib/state/domain_test.go @@ -1088,7 +1088,9 @@ func TestDomain_CollationBuildInMem(t *testing.T) { // Check index require.Equal(t, 3, int(sf.valuesBt.KeyCount())) for i := 0; i < len(words); i += 2 { - c, _ := sf.valuesBt.Seek(g, []byte(words[i])) + c, err := sf.valuesBt.Seek(g, []byte(words[i])) + require.NoError(t, err) + require.NotNil(t, c) require.Equal(t, words[i], string(c.Key())) require.Equal(t, words[i+1], string(c.Value())) } diff --git a/erigon-lib/state/merge.go b/erigon-lib/state/merge.go index 11eeb3ac266..f285eaadfab 100644 --- a/erigon-lib/state/merge.go +++ b/erigon-lib/state/merge.go @@ -528,7 +528,11 @@ func (dt *DomainRoTx) mergeFiles(ctx context.Context, domainFiles, indexFiles, h if dt.d.indexList&withBTree != 0 { btPath := dt.d.kvBtFilePath(fromStep, toStep) - valuesIn.bindex, err = CreateBtreeIndexWithDecompressor(btPath, DefaultBtreeM, valuesIn.decompressor, dt.d.compression, *dt.d.salt, ps, dt.d.dirs.Tmp, dt.d.logger, dt.d.noFsync) + btM := DefaultBtreeM + if toStep == 0 && dt.d.filenameBase == "commitment" { + btM = 128 + } + valuesIn.bindex, err = CreateBtreeIndexWithDecompressor(btPath, btM, valuesIn.decompressor, dt.d.compression, *dt.d.salt, ps, dt.d.dirs.Tmp, dt.d.logger, dt.d.noFsync) if err != nil { return nil, nil, nil, fmt.Errorf("merge %s btindex [%d-%d]: %w", dt.d.filenameBase, r.values.from, r.values.to, err) } diff --git a/turbo/app/snapshots_cmd.go b/turbo/app/snapshots_cmd.go index be66401f9b4..2244ccc68a0 100644 --- a/turbo/app/snapshots_cmd.go +++ b/turbo/app/snapshots_cmd.go @@ -454,6 +454,7 @@ func doBtSearch(cliCtx *cli.Context) error { if err != nil { return err } + defer cur.Close() if cur != nil { fmt.Printf("seek: %x, -> %x, %x\n", seek, cur.Key(), cur.Value()) } else {