diff --git a/cmd/hack/hack.go b/cmd/hack/hack.go index 618479a836a..092b7dffd27 100644 --- a/cmd/hack/hack.go +++ b/cmd/hack/hack.go @@ -1301,7 +1301,10 @@ func iterate(filename string, prefix string) error { txNum, _ := efIt.Next() var txKey [8]byte binary.BigEndian.PutUint64(txKey[:], txNum) - offset := r.Lookup2(txKey[:], key) + offset, ok := r.Lookup2(txKey[:], key) + if !ok { + continue + } gv.Reset(offset) v, _ := gv.Next(nil) fmt.Printf(" %d", txNum) diff --git a/cmd/rpcdaemon/rpcservices/eth_backend.go b/cmd/rpcdaemon/rpcservices/eth_backend.go index 575823056bb..59bcc39079f 100644 --- a/cmd/rpcdaemon/rpcservices/eth_backend.go +++ b/cmd/rpcdaemon/rpcservices/eth_backend.go @@ -289,8 +289,8 @@ func (back *RemoteBackend) EventLookup(ctx context.Context, tx kv.Getter, txnHas func (back *RemoteBackend) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.Hash, blockNum uint64) ([]rlp.RawValue, error) { return back.blockReader.EventsByBlock(ctx, tx, hash, blockNum) } -func (back *RemoteBackend) BorStartEventID(ctx context.Context, tx kv.Tx, blockNum uint64) (uint64, error) { - return back.blockReader.BorStartEventID(ctx, tx, blockNum) +func (back *RemoteBackend) BorStartEventID(ctx context.Context, tx kv.Tx, hash common.Hash, blockNum uint64) (uint64, error) { + return back.blockReader.BorStartEventID(ctx, tx, hash, blockNum) } func (back *RemoteBackend) LastSpanId(ctx context.Context, tx kv.Tx) (uint64, bool, error) { diff --git a/cmd/state/exec3/state.go b/cmd/state/exec3/state.go index b953e42cb9a..6fd36322885 100644 --- a/cmd/state/exec3/state.go +++ b/cmd/state/exec3/state.go @@ -286,8 +286,8 @@ func (cr ChainReader) HasBlock(hash libcommon.Hash, number uint64) bool { func (cr ChainReader) BorEventsByBlock(hash libcommon.Hash, number uint64) []rlp.RawValue { panic("") } -func (cr ChainReader) BorStartEventID(number uint64) uint64 { panic("") } -func (cr ChainReader) BorSpan(spanId uint64) []byte { panic("") } +func (cr ChainReader) BorStartEventID(hash libcommon.Hash, number uint64) uint64 { panic("") } +func (cr ChainReader) BorSpan(spanId uint64) []byte { panic("") } func NewWorkersPool(lock sync.Locker, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *exec22.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int) (reconWorkers []*Worker, applyWorker *Worker, rws *exec22.ResultsQueue, clear func(), wait func()) { reconWorkers = make([]*Worker, workerCount) diff --git a/consensus/consensus.go b/consensus/consensus.go index ce44ee190b7..4e8d5f75873 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -72,7 +72,7 @@ type ChainReader interface { HasBlock(hash libcommon.Hash, number uint64) bool BorEventsByBlock(hash libcommon.Hash, number uint64) []rlp.RawValue - BorStartEventID(number uint64) uint64 + BorStartEventID(hash libcommon.Hash, number uint64) uint64 } type SystemCall func(contract libcommon.Address, data []byte) ([]byte, error) diff --git a/core/chain_makers.go b/core/chain_makers.go index 4e118fe8e91..78961f4beae 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -653,7 +653,7 @@ func (cr *FakeChainReader) FrozenBlocks() uint64 func (cr *FakeChainReader) BorEventsByBlock(hash libcommon.Hash, number uint64) []rlp.RawValue { return nil } -func (cr *FakeChainReader) BorStartEventID(number uint64) uint64 { +func (cr *FakeChainReader) BorStartEventID(hash libcommon.Hash, number uint64) uint64 { return 0 } func (cr *FakeChainReader) BorSpan(spanId uint64) []byte { return nil } diff --git a/erigon-lib/recsplit/index.go b/erigon-lib/recsplit/index.go index 1f2da24ccab..eb36a3f6b3e 100644 --- a/erigon-lib/recsplit/index.go +++ b/erigon-lib/recsplit/index.go @@ -42,13 +42,30 @@ type Features byte const ( No Features = 0b0 - // Enums - Whether to build two level index with perfect hash table pointing to enumeration and enumeration pointing to offsets + + // Enums - To build 2-lvl index with perfect hash table pointing to enumeration and enumeration pointing to offsets Enums Features = 0b1 - //LessFalsePositives Features = 0b10 // example of adding new feature + // LessFalsePositives - Reduce false-positives to 1/256=0.4% in cost of 1byte per key + // Implementation: + // PerfectHashMap - does false-positives if unknown key is requested. But "false-positives itself" is not a problem. + // Problem is "nature of false-positives" - they are randomly/smashed across .seg files. + // It makes .seg files "warm" - which is bad because they are big and + // data-locality of touches is bad (and maybe need visit a lot of shards to find key). + // Can add build-in "existence filter" (like bloom/cucko/ribbon/xor-filter/fuse-filter) it will improve + // data-locality - filters are small-enough and existance-chekcs will be co-located on disk. + // But there are 2 additional properties we have in our data: + // "keys are known", "keys are hashed" (.idx works on murmur3), ".idx can calc key-number by key". + // It means: if we rely on this properties then we can do better than general-purpose-existance-filter. + // Seems just an "array of 1-st bytes of key-hashes" is great alternative: + // general-purpose-filter: 9bits/key, 0.3% false-positives, 3 mem access + // first-bytes-array: 8bits/key, 1/256=0.4% false-positives, 1 mem access + // + // See also: https://github.com/ledgerwatch/erigon/issues/9486 + LessFalsePositives Features = 0b10 // ) // SupportedFeaturs - if see feature not from this list (likely after downgrade) - return IncompatibleErr and recommend for user manually delete file -var SupportedFeatures = []Features{Enums} +var SupportedFeatures = []Features{Enums, LessFalsePositives} var IncompatibleErr = errors.New("incompatible. can re-build such files by command 'erigon snapshots index'") // Index implements index lookup from the file created by the RecSplit @@ -78,6 +95,9 @@ type Index struct { primaryAggrBound uint16 // The lower bound for primary key aggregation (computed from leafSize) enums bool + lessFalsePositives bool + existence []byte + readers *sync.Pool } @@ -153,11 +173,22 @@ func OpenIndex(indexFilePath string) (*Index, error) { } idx.enums = features&Enums != No + idx.lessFalsePositives = features&LessFalsePositives != No offset++ if idx.enums && idx.keyCount > 0 { var size int idx.offsetEf, size = eliasfano32.ReadEliasFano(idx.data[offset:]) offset += size + + if idx.lessFalsePositives { + arrSz := binary.BigEndian.Uint64(idx.data[offset:]) + offset += 8 + if arrSz != idx.keyCount { + return nil, fmt.Errorf("%w. size of existence filter %d != keys count %d", IncompatibleErr, arrSz, idx.keyCount) + } + idx.existence = idx.data[offset : offset+int(arrSz)] + offset += int(arrSz) + } } // Size of golomb rice params golombParamSize := binary.BigEndian.Uint16(idx.data[offset:]) @@ -248,13 +279,13 @@ func (idx *Index) KeyCount() uint64 { } // Lookup is not thread-safe because it used id.hasher -func (idx *Index) Lookup(bucketHash, fingerprint uint64) uint64 { +func (idx *Index) Lookup(bucketHash, fingerprint uint64) (uint64, bool) { if idx.keyCount == 0 { _, fName := filepath.Split(idx.filePath) panic("no Lookup should be done when keyCount==0, please use Empty function to guard " + fName) } if idx.keyCount == 1 { - return 0 + return 0, true } var gr GolombRiceReader gr.data = idx.grData @@ -311,7 +342,11 @@ func (idx *Index) Lookup(bucketHash, fingerprint uint64) uint64 { rec := int(cumKeys) + int(remap16(remix(fingerprint+idx.startSeed[level]+b), m)) pos := 1 + 8 + idx.bytesPerRec*(rec+1) - return binary.BigEndian.Uint64(idx.data[pos:]) & idx.recMask + found := binary.BigEndian.Uint64(idx.data[pos:]) & idx.recMask + if idx.lessFalsePositives { + return found, idx.existence[found] == byte(bucketHash) + } + return found, true } // OrdinalLookup returns the offset of i-th element in the index @@ -321,6 +356,13 @@ func (idx *Index) OrdinalLookup(i uint64) uint64 { return idx.offsetEf.Get(i) } +func (idx *Index) Has(bucketHash, i uint64) bool { + if idx.lessFalsePositives { + return idx.existence[i] == byte(bucketHash) + } + return true +} + func (idx *Index) ExtractOffsets() map[uint64]uint64 { m := map[uint64]uint64{} pos := 1 + 8 + idx.bytesPerRec diff --git a/erigon-lib/recsplit/index_reader.go b/erigon-lib/recsplit/index_reader.go index 0ad10ea0960..412e4485afe 100644 --- a/erigon-lib/recsplit/index_reader.go +++ b/erigon-lib/recsplit/index_reader.go @@ -55,20 +55,20 @@ func (r *IndexReader) sum2(key1, key2 []byte) (uint64, uint64) { } // Lookup wraps index Lookup -func (r *IndexReader) Lookup(key []byte) uint64 { +func (r *IndexReader) Lookup(key []byte) (uint64, bool) { bucketHash, fingerprint := r.sum(key) if r.index != nil { return r.index.Lookup(bucketHash, fingerprint) } - return 0 + return 0, true } -func (r *IndexReader) Lookup2(key1, key2 []byte) uint64 { +func (r *IndexReader) Lookup2(key1, key2 []byte) (uint64, bool) { bucketHash, fingerprint := r.sum2(key1, key2) if r.index != nil { return r.index.Lookup(bucketHash, fingerprint) } - return 0 + return 0, true } func (r *IndexReader) Empty() bool { diff --git a/erigon-lib/recsplit/index_test.go b/erigon-lib/recsplit/index_test.go index cf557016262..3e23dc4233b 100644 --- a/erigon-lib/recsplit/index_test.go +++ b/erigon-lib/recsplit/index_test.go @@ -73,7 +73,7 @@ func TestReWriteIndex(t *testing.T) { defer reidx.Close() for i := 0; i < 100; i++ { reader := NewIndexReader(reidx) - offset := reader.Lookup([]byte(fmt.Sprintf("key %d", i))) + offset, _ := reader.Lookup([]byte(fmt.Sprintf("key %d", i))) if offset != uint64(i*3965) { t.Errorf("expected offset: %d, looked up: %d", i*3965, offset) } diff --git a/erigon-lib/recsplit/recsplit.go b/erigon-lib/recsplit/recsplit.go index af15f973492..cc2b379d1c0 100644 --- a/erigon-lib/recsplit/recsplit.go +++ b/erigon-lib/recsplit/recsplit.go @@ -66,11 +66,15 @@ func remix(z uint64) uint64 { type RecSplit struct { hasher murmur3.Hash128 // Salted hash function to use for splitting into initial buckets and mapping to 64-bit fingerprints offsetCollector *etl.Collector // Collector that sorts by offsets + indexW *bufio.Writer indexF *os.File offsetEf *eliasfano32.EliasFano // Elias Fano instance for encoding the offsets bucketCollector *etl.Collector // Collector that sorts by buckets + existenceF *os.File + existenceW *bufio.Writer + indexFileName string indexFile, tmpFilePath string @@ -108,6 +112,7 @@ type RecSplit struct { numBuf [8]byte collision bool enums bool // Whether to build two level index with perfect hash table pointing to enumeration and enumeration pointing to offsets + lessFalsePositives bool built bool // Flag indicating that the hash function has been built and no more keys can be added trace bool logger log.Logger @@ -119,7 +124,8 @@ type RecSplitArgs struct { // Whether two level index needs to be built, where perfect hash map points to an enumeration, and enumeration points to offsets // if Enum=false: can have unsorted and duplicated values // if Enum=true: must have sorted values (can have duplicates) - monotonically growing sequence - Enums bool + Enums bool + LessFalsePositives bool IndexFile string // File name where the index and the minimal perfect hash function will be written to TmpDir string @@ -174,6 +180,15 @@ func NewRecSplit(args RecSplitArgs, logger log.Logger) (*RecSplit, error) { rs.offsetCollector = etl.NewCollector(RecSplitLogPrefix+" "+fname, rs.tmpDir, etl.NewSortableBuffer(rs.etlBufLimit), logger) rs.offsetCollector.LogLvl(log.LvlDebug) } + rs.lessFalsePositives = args.LessFalsePositives + if rs.enums && args.KeyCount > 0 && rs.lessFalsePositives { + bufferFile, err := os.CreateTemp(rs.tmpDir, "erigon-lfp-buf-") + if err != nil { + return nil, err + } + rs.existenceF = bufferFile + rs.existenceW = bufio.NewWriter(rs.existenceF) + } rs.currentBucket = make([]uint64, 0, args.BucketSize) rs.currentBucketOffs = make([]uint64, 0, args.BucketSize) rs.maxOffset = 0 @@ -198,6 +213,9 @@ func (rs *RecSplit) Close() { if rs.indexF != nil { rs.indexF.Close() } + if rs.existenceF != nil { + rs.existenceF.Close() + } if rs.bucketCollector != nil { rs.bucketCollector.Close() } @@ -214,8 +232,8 @@ func (rs *RecSplit) SetTrace(trace bool) { // remap converts the number x which is assumed to be uniformly distributed over the range [0..2^64) to the number that is uniformly // distributed over the range [0..n) -func remap(x uint64, n uint64) uint64 { - hi, _ := bits.Mul64(x, n) +func remap(x uint64, n uint64) (hi uint64) { + hi, _ = bits.Mul64(x, n) return hi } @@ -264,6 +282,8 @@ func splitParams(m, leafSize, primaryAggrBound, secondaryAggrBound uint16) (fano return } +var golombBaseLog2 = -math.Log((math.Sqrt(5) + 1.0) / 2.0) + func computeGolombRice(m uint16, table []uint32, leafSize, primaryAggrBound, secondaryAggrBound uint16) { fanout, unit := splitParams(m, leafSize, primaryAggrBound, secondaryAggrBound) k := make([]uint16, fanout) @@ -277,7 +297,7 @@ func computeGolombRice(m uint16, table []uint32, leafSize, primaryAggrBound, sec sqrtProd *= math.Sqrt(float64(k[i])) } p := math.Sqrt(float64(m)) / (math.Pow(2*math.Pi, (float64(fanout)-1.)/2.0) * sqrtProd) - golombRiceLength := uint32(math.Ceil(math.Log2(-math.Log((math.Sqrt(5)+1.0)/2.0) / math.Log1p(-p)))) // log2 Golomb modulus + golombRiceLength := uint32(math.Ceil(math.Log2(golombBaseLog2 / math.Log1p(-p)))) // log2 Golomb modulus if golombRiceLength > 0x1F { panic("golombRiceLength > 0x1F") } @@ -303,8 +323,7 @@ func computeGolombRice(m uint16, table []uint32, leafSize, primaryAggrBound, sec // salt for the part of the hash function separating m elements. It is based on // calculations with assumptions that we draw hash functions at random func (rs *RecSplit) golombParam(m uint16) int { - s := uint16(len(rs.golombRice)) - for m >= s { + for s := uint16(len(rs.golombRice)); m >= s; s++ { rs.golombRice = append(rs.golombRice, 0) // For the case where bucket is larger than planned if s == 0 { @@ -314,7 +333,6 @@ func (rs *RecSplit) golombParam(m uint16) int { } else { computeGolombRice(s, rs.golombRice, rs.leafSize, rs.primaryAggrBound, rs.secondaryAggrBound) } - s++ } return int(rs.golombRice[m] >> 27) } @@ -350,6 +368,12 @@ func (rs *RecSplit) AddKey(key []byte, offset uint64) error { if err := rs.bucketCollector.Collect(rs.bucketKeyBuf[:], rs.numBuf[:]); err != nil { return err } + if rs.lessFalsePositives { + //1 byte from each hashed key + if err := rs.existenceW.WriteByte(byte(hi)); err != nil { + return err + } + } } else { if err := rs.bucketCollector.Collect(rs.bucketKeyBuf[:], rs.numBuf[:]); err != nil { return err @@ -561,7 +585,7 @@ func (rs *RecSplit) Build(ctx context.Context) error { return fmt.Errorf("create index file %s: %w", rs.indexFile, err) } - rs.logger.Debug("[index] created", "file", rs.tmpFilePath, "fs", rs.indexF) + rs.logger.Debug("[index] created", "file", rs.tmpFilePath) defer rs.indexF.Close() rs.indexW = bufio.NewWriterSize(rs.indexF, etl.BufIOSize) @@ -652,6 +676,9 @@ func (rs *RecSplit) Build(ctx context.Context) error { var features Features if rs.enums { features |= Enums + if rs.lessFalsePositives { + features |= LessFalsePositives + } } if err := rs.indexW.WriteByte(byte(features)); err != nil { return fmt.Errorf("writing enums = true: %w", err) @@ -662,6 +689,10 @@ func (rs *RecSplit) Build(ctx context.Context) error { return fmt.Errorf("writing elias fano for offsets: %w", err) } } + if err := rs.flushExistenceFilter(); err != nil { + return err + } + // Write out the size of golomb rice params binary.BigEndian.PutUint16(rs.numBuf[:], uint16(len(rs.golombRice))) if _, err := rs.indexW.Write(rs.numBuf[:4]); err != nil { @@ -694,6 +725,31 @@ func (rs *RecSplit) Build(ctx context.Context) error { return nil } +func (rs *RecSplit) flushExistenceFilter() error { + if !rs.enums || rs.keysAdded == 0 || !rs.lessFalsePositives { + return nil + } + defer rs.existenceF.Close() + + //Write len of array + binary.BigEndian.PutUint64(rs.numBuf[:], rs.keysAdded) + if _, err := rs.indexW.Write(rs.numBuf[:]); err != nil { + return err + } + + // flush bufio and rewind before io.Copy, but no reason to fsync the file - it temporary + if err := rs.existenceW.Flush(); err != nil { + return err + } + if _, err := rs.existenceF.Seek(0, io.SeekStart); err != nil { + return err + } + if _, err := io.CopyN(rs.indexW, rs.existenceF, int64(rs.keysAdded)); err != nil { + return err + } + return nil +} + func (rs *RecSplit) DisableFsync() { rs.noFsync = true } // Fsync - other processes/goroutines must see only "fully-complete" (valid) files. No partial-writes. diff --git a/erigon-lib/recsplit/recsplit_fuzz_test.go b/erigon-lib/recsplit/recsplit_fuzz_test.go index ef2f58b9dc0..c31dbee4bc8 100644 --- a/erigon-lib/recsplit/recsplit_fuzz_test.go +++ b/erigon-lib/recsplit/recsplit_fuzz_test.go @@ -83,7 +83,7 @@ func FuzzRecSplit(f *testing.F) { bits := make([]uint64, bitCount) reader := NewIndexReader(idx) for i = 0; i < len(in)-l; i += l { - off = reader.Lookup(in[i : i+l]) + off, _ = reader.Lookup(in[i : i+l]) if int(off) >= count { t.Errorf("off %d >= count %d", off, count) } diff --git a/erigon-lib/recsplit/recsplit_test.go b/erigon-lib/recsplit/recsplit_test.go index ab4f818ebb1..9d4c4c4cc2f 100644 --- a/erigon-lib/recsplit/recsplit_test.go +++ b/erigon-lib/recsplit/recsplit_test.go @@ -127,7 +127,7 @@ func TestIndexLookup(t *testing.T) { defer idx.Close() for i := 0; i < 100; i++ { reader := NewIndexReader(idx) - offset := reader.Lookup([]byte(fmt.Sprintf("key %d", i))) + offset, _ := reader.Lookup([]byte(fmt.Sprintf("key %d", i))) if offset != uint64(i*17) { t.Errorf("expected offset: %d, looked up: %d", i*17, offset) } @@ -138,14 +138,16 @@ func TestTwoLayerIndex(t *testing.T) { logger := log.New() tmpDir := t.TempDir() indexFile := filepath.Join(tmpDir, "index") + salt := uint32(1) rs, err := NewRecSplit(RecSplitArgs{ - KeyCount: 100, - BucketSize: 10, - Salt: 0, - TmpDir: tmpDir, - IndexFile: indexFile, - LeafSize: 8, - Enums: true, + KeyCount: 100, + BucketSize: 10, + Salt: salt, + TmpDir: tmpDir, + IndexFile: indexFile, + LeafSize: 8, + Enums: true, + LessFalsePositives: true, }, logger) if err != nil { t.Fatal(err) @@ -163,7 +165,7 @@ func TestTwoLayerIndex(t *testing.T) { defer idx.Close() for i := 0; i < 100; i++ { reader := NewIndexReader(idx) - e := reader.Lookup([]byte(fmt.Sprintf("key %d", i))) + e, _ := reader.Lookup([]byte(fmt.Sprintf("key %d", i))) if e != uint64(i) { t.Errorf("expected enumeration: %d, lookup up: %d", i, e) } diff --git a/erigon-lib/state/aggregator_bench_test.go b/erigon-lib/state/aggregator_bench_test.go index c664b64e04b..77c5678194e 100644 --- a/erigon-lib/state/aggregator_bench_test.go +++ b/erigon-lib/state/aggregator_bench_test.go @@ -235,7 +235,7 @@ func Benchmark_Recsplit_Find_ExternalFile(b *testing.B) { for i := 0; i < b.N; i++ { p := rnd.Intn(len(keys)) - offset := idxr.Lookup(keys[p]) + offset, _ := idxr.Lookup(keys[p]) getter.Reset(offset) require.True(b, getter.HasNext()) diff --git a/erigon-lib/state/domain_test.go b/erigon-lib/state/domain_test.go index 4dd41f8b6f6..e0c44adef06 100644 --- a/erigon-lib/state/domain_test.go +++ b/erigon-lib/state/domain_test.go @@ -124,7 +124,7 @@ func TestCollationBuild(t *testing.T) { r := recsplit.NewIndexReader(sf.valuesIdx) defer r.Close() for i := 0; i < len(words); i += 2 { - offset := r.Lookup([]byte(words[i])) + offset, _ := r.Lookup([]byte(words[i])) g.Reset(offset) w, _ := g.Next(nil) require.Equal(t, words[i], string(w)) diff --git a/erigon-lib/state/history.go b/erigon-lib/state/history.go index 0ffb9f43a27..a71aeaba160 100644 --- a/erigon-lib/state/history.go +++ b/erigon-lib/state/history.go @@ -1211,7 +1211,10 @@ func (hc *HistoryContext) GetNoState(key []byte, txNum uint64) ([]byte, bool, er if reader.Empty() { return true } - offset := reader.Lookup(key) + offset, ok := reader.Lookup(key) + if !ok { + return false + } g := hc.ic.statelessGetter(item.i) g.Reset(offset) k, _ := g.NextUncompressed() @@ -1294,7 +1297,10 @@ func (hc *HistoryContext) GetNoState(key []byte, txNum uint64) ([]byte, bool, er var txKey [8]byte binary.BigEndian.PutUint64(txKey[:], foundTxNum) reader := hc.statelessIdxReader(historyItem.i) - offset := reader.Lookup2(txKey[:], key) + offset, ok := reader.Lookup2(txKey[:], key) + if !ok { + return nil, false, nil + } //fmt.Printf("offset = %d, txKey=[%x], key=[%x]\n", offset, txKey[:], key) g := hc.statelessGetter(historyItem.i) g.Reset(offset) @@ -1313,7 +1319,10 @@ func (hs *HistoryStep) GetNoState(key []byte, txNum uint64) ([]byte, bool, uint6 if hs.indexFile.reader.Empty() { return nil, false, txNum } - offset := hs.indexFile.reader.Lookup(key) + offset, ok := hs.indexFile.reader.Lookup(key) + if !ok { + return nil, false, txNum + } g := hs.indexFile.getter g.Reset(offset) k, _ := g.NextUncompressed() @@ -1329,7 +1338,10 @@ func (hs *HistoryStep) GetNoState(key []byte, txNum uint64) ([]byte, bool, uint6 } var txKey [8]byte binary.BigEndian.PutUint64(txKey[:], n) - offset = hs.historyFile.reader.Lookup2(txKey[:], key) + offset, ok = hs.historyFile.reader.Lookup2(txKey[:], key) + if !ok { + return nil, false, txNum + } //fmt.Printf("offset = %d, txKey=[%x], key=[%x]\n", offset, txKey[:], key) g = hs.historyFile.getter g.Reset(offset) @@ -1345,7 +1357,10 @@ func (hs *HistoryStep) MaxTxNum(key []byte) (bool, uint64) { if hs.indexFile.reader.Empty() { return false, 0 } - offset := hs.indexFile.reader.Lookup(key) + offset, ok := hs.indexFile.reader.Lookup(key) + if !ok { + return false, 0 + } g := hs.indexFile.getter g.Reset(offset) k, _ := g.NextUncompressed() @@ -1516,7 +1531,10 @@ func (hi *StateAsOfIterF) advanceInFiles() error { return fmt.Errorf("no %s file found for [%x]", hi.hc.h.filenameBase, hi.nextKey) } reader := hi.hc.statelessIdxReader(historyItem.i) - offset := reader.Lookup2(hi.txnKey[:], hi.nextKey) + offset, ok := reader.Lookup2(hi.txnKey[:], hi.nextKey) + if !ok { + continue + } g := hi.hc.statelessGetter(historyItem.i) g.Reset(offset) if hi.compressVals { @@ -1826,7 +1844,10 @@ func (hi *HistoryChangesIterFiles) advance() error { return fmt.Errorf("HistoryChangesIterFiles: no %s file found for [%x]", hi.hc.h.filenameBase, hi.nextKey) } reader := hi.hc.statelessIdxReader(historyItem.i) - offset := reader.Lookup2(hi.txnKey[:], hi.nextKey) + offset, ok := reader.Lookup2(hi.txnKey[:], hi.nextKey) + if !ok { + continue + } g := hi.hc.statelessGetter(historyItem.i) g.Reset(offset) if hi.compressVals { diff --git a/erigon-lib/state/history_test.go b/erigon-lib/state/history_test.go index f1ca17d1f74..1c3edede2ad 100644 --- a/erigon-lib/state/history_test.go +++ b/erigon-lib/state/history_test.go @@ -143,7 +143,7 @@ func TestHistoryCollationBuild(t *testing.T) { require.Equal([][]uint64{{2, 6}, {3, 6, 7}, {7}}, intArrs) r := recsplit.NewIndexReader(sf.efHistoryIdx) for i := 0; i < len(keyWords); i++ { - offset := r.Lookup([]byte(keyWords[i])) + offset, _ := r.Lookup([]byte(keyWords[i])) g.Reset(offset) w, _ := g.Next(nil) require.Equal(keyWords[i], string(w)) @@ -156,7 +156,10 @@ func TestHistoryCollationBuild(t *testing.T) { for j := 0; j < len(ints); j++ { var txKey [8]byte binary.BigEndian.PutUint64(txKey[:], ints[j]) - offset := r.Lookup2(txKey[:], []byte(keyWords[i])) + offset, ok := r.Lookup2(txKey[:], []byte(keyWords[i])) + if !ok { + continue + } g.Reset(offset) w, _ := g.Next(nil) require.Equal(valWords[vi], string(w)) diff --git a/erigon-lib/state/inverted_index.go b/erigon-lib/state/inverted_index.go index f02bc55ae90..501ffd3191a 100644 --- a/erigon-lib/state/inverted_index.go +++ b/erigon-lib/state/inverted_index.go @@ -765,7 +765,10 @@ func (it *FrozenInvertedIdxIter) advanceInFiles() { } item := it.stack[len(it.stack)-1] it.stack = it.stack[:len(it.stack)-1] - offset := item.reader.Lookup(it.key) + offset, ok := item.reader.Lookup(it.key) + if !ok { + continue + } g := item.getter g.Reset(offset) k, _ := g.NextUncompressed() diff --git a/erigon-lib/state/inverted_index_test.go b/erigon-lib/state/inverted_index_test.go index c23dcb5d0a0..f60759f5368 100644 --- a/erigon-lib/state/inverted_index_test.go +++ b/erigon-lib/state/inverted_index_test.go @@ -126,7 +126,7 @@ func TestInvIndexCollationBuild(t *testing.T) { require.Equal(t, [][]uint64{{2, 6}, {3}, {6}}, intArrs) r := recsplit.NewIndexReader(sf.index) for i := 0; i < len(words); i++ { - offset := r.Lookup([]byte(words[i])) + offset, _ := r.Lookup([]byte(words[i])) g.Reset(offset) w, _ := g.Next(nil) require.Equal(t, words[i], string(w)) diff --git a/erigon-lib/state/locality_index.go b/erigon-lib/state/locality_index.go index 8d126a08751..8f5d9141096 100644 --- a/erigon-lib/state/locality_index.go +++ b/erigon-lib/state/locality_index.go @@ -263,7 +263,11 @@ func (li *LocalityIndex) lookupIdxFiles(loc *ctxLocalityIdx, key []byte, fromTxN } fromFileNum := fromTxNum / li.aggregationStep / StepsInBiggestFile - fn1, fn2, ok1, ok2, err := loc.bm.First2At(loc.reader.Lookup(key), fromFileNum) + i, ok := loc.reader.Lookup(key) + if !ok { + return 0, 0, fromTxNum, false, false + } + fn1, fn2, ok1, ok2, err := loc.bm.First2At(i, fromFileNum) if err != nil { panic(err) } diff --git a/erigon-lib/state/state_recon.go b/erigon-lib/state/state_recon.go index 32519fb77f0..2b8d0629ac6 100644 --- a/erigon-lib/state/state_recon.go +++ b/erigon-lib/state/state_recon.go @@ -185,13 +185,15 @@ func (hii *HistoryIteratorInc) advance() { if n, ok := ef.Search(hii.uptoTxNum); ok { var txKey [8]byte binary.BigEndian.PutUint64(txKey[:], n) - offset := hii.r.Lookup2(txKey[:], hii.key) - hii.historyG.Reset(offset) - hii.nextKey = hii.key - if hii.compressVals { - hii.nextVal, _ = hii.historyG.Next(nil) - } else { - hii.nextVal, _ = hii.historyG.NextUncompressed() + offset, ok := hii.r.Lookup2(txKey[:], hii.key) + if ok { + hii.historyG.Reset(offset) + hii.nextKey = hii.key + if hii.compressVals { + hii.nextVal, _ = hii.historyG.Next(nil) + } else { + hii.nextVal, _ = hii.historyG.NextUncompressed() + } } } if hii.indexG.HasNext() { diff --git a/eth/ethutils/utils.go b/eth/ethutils/utils.go index a81d6ec5691..8768eb7d502 100644 --- a/eth/ethutils/utils.go +++ b/eth/ethutils/utils.go @@ -5,6 +5,7 @@ import ( "reflect" libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/crypto/kzg" "github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/erigon/consensus" @@ -12,9 +13,10 @@ import ( ) var ( - ErrNilBlobHashes = errors.New("nil blob hashes array") - ErrMaxBlobGasUsed = errors.New("max blob gas used") - ErrMismatchBlobHashes = errors.New("mismatch blob hashes") + ErrNilBlobHashes = errors.New("nil blob hashes array") + ErrMaxBlobGasUsed = errors.New("max blob gas used") + ErrMismatchBlobHashes = errors.New("mismatch blob hashes") + ErrInvalidVersiondHash = errors.New("invalid blob versioned hash, must start with VERSIONED_HASH_VERSION_KZG") ) // IsLocalBlock checks whether the specified block is mined @@ -48,7 +50,14 @@ func ValidateBlobs(blobGasUsed, maxBlobsGas, maxBlobsPerBlock uint64, expectedBl } actualBlobHashes := []libcommon.Hash{} for _, txn := range *transactions { - actualBlobHashes = append(actualBlobHashes, txn.GetBlobHashes()...) + if txn.Type() == types.BlobTxType { + for _, h := range txn.GetBlobHashes() { + if h[0] != kzg.BlobCommitmentVersionKZG { + return ErrInvalidVersiondHash + } + actualBlobHashes = append(actualBlobHashes, h) + } + } } if len(actualBlobHashes) > int(maxBlobsPerBlock) || blobGasUsed > maxBlobsGas { return ErrMaxBlobGasUsed diff --git a/eth/stagedsync/chain_reader.go b/eth/stagedsync/chain_reader.go index bae8a668f49..072818304ed 100644 --- a/eth/stagedsync/chain_reader.go +++ b/eth/stagedsync/chain_reader.go @@ -81,7 +81,7 @@ func (cr ChainReader) FrozenBlocks() uint64 { return cr.BlockReader.FrozenBlocks() } -func (cr ChainReader) BorStartEventID(_ uint64) uint64 { +func (cr ChainReader) BorStartEventID(_ libcommon.Hash, _ uint64) uint64 { panic("bor events by block not implemented") } func (cr ChainReader) BorEventsByBlock(_ libcommon.Hash, _ uint64) []rlp.RawValue { diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index a4055ba07a1..3e4fce82762 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -604,8 +604,8 @@ func (cr ChainReaderImpl) BorEventsByBlock(hash libcommon.Hash, number uint64) [ } return events } -func (cr ChainReaderImpl) BorStartEventID(blockNum uint64) uint64 { - id, err := cr.blockReader.BorStartEventID(context.Background(), cr.tx, blockNum) +func (cr ChainReaderImpl) BorStartEventID(hash libcommon.Hash, blockNum uint64) uint64 { + id, err := cr.blockReader.BorStartEventID(context.Background(), cr.tx, hash, blockNum) if err != nil { cr.logger.Error("BorEventsByBlock failed", "err", err) return 0 diff --git a/eth/stagedsync/stage_log_index.go b/eth/stagedsync/stage_log_index.go index 2740e969529..29bbadcacd5 100644 --- a/eth/stagedsync/stage_log_index.go +++ b/eth/stagedsync/stage_log_index.go @@ -5,10 +5,11 @@ import ( "context" "encoding/binary" "fmt" - "github.com/ledgerwatch/erigon-lib/kv/dbutils" "runtime" "time" + "github.com/ledgerwatch/erigon-lib/kv/dbutils" + "github.com/RoaringBitmap/roaring" "github.com/c2h5oh/datasize" libcommon "github.com/ledgerwatch/erigon-lib/common" diff --git a/turbo/engineapi/engine_server.go b/turbo/engineapi/engine_server.go index 33858570d1a..3af16f80869 100644 --- a/turbo/engineapi/engine_server.go +++ b/turbo/engineapi/engine_server.go @@ -224,10 +224,10 @@ func (s *EngineServer) newPayload(ctx context.Context, req *engine_types.Executi LatestValidHash: &latestValidHash, }, nil } - if errors.Is(err, ethutils.ErrMismatchBlobHashes) { + if errors.Is(err, ethutils.ErrMismatchBlobHashes) || errors.Is(err, ethutils.ErrInvalidVersiondHash) { return &engine_types.PayloadStatus{ Status: engine_types.InvalidStatus, - ValidationError: engine_types.NewStringifiedErrorFromString("mismatch in blob hashes"), + ValidationError: engine_types.NewStringifiedErrorFromString(err.Error()), }, nil } } diff --git a/turbo/jsonrpc/trace_adhoc.go b/turbo/jsonrpc/trace_adhoc.go index 845f6f72216..105df7b883d 100644 --- a/turbo/jsonrpc/trace_adhoc.go +++ b/turbo/jsonrpc/trace_adhoc.go @@ -1132,6 +1132,9 @@ func (api *TraceAPIImpl) doCallMany(ctx context.Context, dbtx kv.Tx, msgs []type if err != nil { return nil, nil, err } + if parentBlock == nil { + return nil, nil, fmt.Errorf("parent block %d(%x) not found", blockNumber, hash) + } parentHeader := parentBlock.Header() if parentHeader == nil { return nil, nil, fmt.Errorf("parent header %d(%x) not found", blockNumber, hash) diff --git a/turbo/jsonrpc/tracing.go b/turbo/jsonrpc/tracing.go index c618f278af9..8c039d29f8f 100644 --- a/turbo/jsonrpc/tracing.go +++ b/turbo/jsonrpc/tracing.go @@ -436,6 +436,10 @@ func (api *PrivateDebugAPIImpl) TraceCallMany(ctx context.Context, bundles []Bun stream.WriteNil() return err } + if block == nil { + stream.WriteNil() + return fmt.Errorf("block %d not found", blockNum) + } // -1 is a default value for transaction index. // If it's -1, we will try to replay every single transaction in that block diff --git a/turbo/services/interfaces.go b/turbo/services/interfaces.go index e329255de7e..80ae44e5e57 100644 --- a/turbo/services/interfaces.go +++ b/turbo/services/interfaces.go @@ -40,7 +40,7 @@ type BorEventReader interface { LastEventId(ctx context.Context, tx kv.Tx) (uint64, bool, error) EventLookup(ctx context.Context, tx kv.Getter, txnHash common.Hash) (uint64, bool, error) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.Hash, blockNum uint64) ([]rlp.RawValue, error) - BorStartEventID(ctx context.Context, tx kv.Tx, blockNum uint64) (uint64, error) + BorStartEventID(ctx context.Context, tx kv.Tx, hash common.Hash, blockNum uint64) (uint64, error) LastFrozenEventId() uint64 } diff --git a/turbo/snapshotsync/freezeblocks/block_reader.go b/turbo/snapshotsync/freezeblocks/block_reader.go index 147012b3b7f..0ad1d4cdbb4 100644 --- a/turbo/snapshotsync/freezeblocks/block_reader.go +++ b/turbo/snapshotsync/freezeblocks/block_reader.go @@ -260,7 +260,7 @@ func (r *RemoteBlockReader) EventsByBlock(ctx context.Context, tx kv.Tx, hash co } return result, nil } -func (r *RemoteBlockReader) BorStartEventID(ctx context.Context, tx kv.Tx, blockHeight uint64) (uint64, error) { +func (r *RemoteBlockReader) BorStartEventID(ctx context.Context, tx kv.Tx, hash common.Hash, blockHeight uint64) (uint64, error) { panic("not implemented") } @@ -662,7 +662,10 @@ func (r *BlockReader) headerFromSnapshotByHash(hash common.Hash, sn *Segment, bu } reader := recsplit.NewIndexReader(index) - localID := reader.Lookup(hash[:]) + localID, ok := reader.Lookup(hash[:]) + if !ok { + return nil, nil + } headerOffset := index.OrdinalLookup(localID) gg := sn.MakeGetter() gg.Reset(headerOffset) @@ -812,7 +815,10 @@ func (r *BlockReader) txnByHash(txnHash common.Hash, segments []*Segment, buf [] } reader := recsplit.NewIndexReader(idxTxnHash) - txnId := reader.Lookup(txnHash[:]) + txnId, ok := reader.Lookup(txnHash[:]) + if !ok { + continue + } offset := idxTxnHash.OrdinalLookup(txnId) gg := sn.MakeGetter() gg.Reset(offset) @@ -832,7 +838,10 @@ func (r *BlockReader) txnByHash(txnHash common.Hash, segments []*Segment, buf [] txn.SetSender(sender) // see: https://tip.golang.org/ref/spec#Conversions_from_slice_to_array_pointer reader2 := recsplit.NewIndexReader(idxTxnHash2BlockNum) - blockNum := reader2.Lookup(txnHash[:]) + blockNum, ok := reader2.Lookup(txnHash[:]) + if !ok { + continue + } // final txnHash check - completely avoid false-positives if txn.Hash() == txnHash { @@ -1096,7 +1105,10 @@ func (r *BlockReader) borBlockByEventHash(txnHash common.Hash, segments []*Segme continue } reader := recsplit.NewIndexReader(idxBorTxnHash) - blockEventId := reader.Lookup(txnHash[:]) + blockEventId, exists := reader.Lookup(txnHash[:]) + if !exists { + continue + } offset := idxBorTxnHash.OrdinalLookup(blockEventId) gg := sn.MakeGetter() gg.Reset(offset) @@ -1111,13 +1123,44 @@ func (r *BlockReader) borBlockByEventHash(txnHash common.Hash, segments []*Segme return } -func (r *BlockReader) BorStartEventID(ctx context.Context, tx kv.Tx, blockHeight uint64) (uint64, error) { - v, err := tx.GetOne(kv.BorEventNums, hexutility.EncodeTs(blockHeight)) - if err != nil { - return 0, err +func (r *BlockReader) BorStartEventID(ctx context.Context, tx kv.Tx, hash common.Hash, blockHeight uint64) (uint64, error) { + maxBlockNumInFiles := r.FrozenBorBlocks() + if maxBlockNumInFiles == 0 || blockHeight > maxBlockNumInFiles { + v, err := tx.GetOne(kv.BorEventNums, hexutility.EncodeTs(blockHeight)) + if err != nil { + return 0, err + } + startEventId := binary.BigEndian.Uint64(v) + return startEventId, nil + } + + borTxHash := types.ComputeBorTxHash(blockHeight, hash) + view := r.borSn.View() + defer view.Close() + + segments := view.Events() + for i := len(segments) - 1; i >= 0; i-- { + sn := segments[i] + if sn.from > blockHeight { + continue + } + if sn.to <= blockHeight { + break + } + + idxBorTxnHash := sn.Index() + + if idxBorTxnHash == nil { + continue + } + if idxBorTxnHash.KeyCount() == 0 { + continue + } + reader := recsplit.NewIndexReader(idxBorTxnHash) + blockEventId, _ := reader.Lookup(borTxHash[:]) + return idxBorTxnHash.BaseDataID() + blockEventId, nil } - startEventId := binary.BigEndian.Uint64(v) - return startEventId, nil + return 0, nil } func (r *BlockReader) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.Hash, blockHeight uint64) ([]rlp.RawValue, error) { @@ -1190,7 +1233,10 @@ func (r *BlockReader) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.H continue } reader := recsplit.NewIndexReader(idxBorTxnHash) - blockEventId := reader.Lookup(borTxHash[:]) + blockEventId, ok := reader.Lookup(borTxHash[:]) + if !ok { + continue + } offset := idxBorTxnHash.OrdinalLookup(blockEventId) gg := sn.MakeGetter() gg.Reset(offset) diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index 7c5be25d3a9..6e9b525af1e 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -1850,8 +1850,11 @@ func TransactionsIdx(ctx context.Context, chainConfig *chain.Config, sn snaptype } txnHashIdx, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{ - KeyCount: d.Count(), - Enums: true, + KeyCount: d.Count(), + + Enums: true, + LessFalsePositives: true, + BucketSize: 2000, LeafSize: 8, TmpDir: tmpDir,