Skip to content

Commit

Permalink
Merge branch 'devel' of github.com:ledgerwatch/erigon into astrid-p2p…
Browse files Browse the repository at this point in the history
…-retry-fetch-headers-timeouts
  • Loading branch information
taratorio committed Mar 1, 2024
2 parents 03f31aa + a4cabed commit 46fb674
Show file tree
Hide file tree
Showing 29 changed files with 284 additions and 82 deletions.
5 changes: 4 additions & 1 deletion cmd/hack/hack.go
Original file line number Diff line number Diff line change
Expand Up @@ -1301,7 +1301,10 @@ func iterate(filename string, prefix string) error {
txNum, _ := efIt.Next()
var txKey [8]byte
binary.BigEndian.PutUint64(txKey[:], txNum)
offset := r.Lookup2(txKey[:], key)
offset, ok := r.Lookup2(txKey[:], key)
if !ok {
continue
}
gv.Reset(offset)
v, _ := gv.Next(nil)
fmt.Printf(" %d", txNum)
Expand Down
4 changes: 2 additions & 2 deletions cmd/rpcdaemon/rpcservices/eth_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,8 @@ func (back *RemoteBackend) EventLookup(ctx context.Context, tx kv.Getter, txnHas
func (back *RemoteBackend) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.Hash, blockNum uint64) ([]rlp.RawValue, error) {
return back.blockReader.EventsByBlock(ctx, tx, hash, blockNum)
}
func (back *RemoteBackend) BorStartEventID(ctx context.Context, tx kv.Tx, blockNum uint64) (uint64, error) {
return back.blockReader.BorStartEventID(ctx, tx, blockNum)
func (back *RemoteBackend) BorStartEventID(ctx context.Context, tx kv.Tx, hash common.Hash, blockNum uint64) (uint64, error) {
return back.blockReader.BorStartEventID(ctx, tx, hash, blockNum)
}

func (back *RemoteBackend) LastSpanId(ctx context.Context, tx kv.Tx) (uint64, bool, error) {
Expand Down
4 changes: 2 additions & 2 deletions cmd/state/exec3/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ func (cr ChainReader) HasBlock(hash libcommon.Hash, number uint64) bool {
func (cr ChainReader) BorEventsByBlock(hash libcommon.Hash, number uint64) []rlp.RawValue {
panic("")
}
func (cr ChainReader) BorStartEventID(number uint64) uint64 { panic("") }
func (cr ChainReader) BorSpan(spanId uint64) []byte { panic("") }
func (cr ChainReader) BorStartEventID(hash libcommon.Hash, number uint64) uint64 { panic("") }
func (cr ChainReader) BorSpan(spanId uint64) []byte { panic("") }

func NewWorkersPool(lock sync.Locker, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *exec22.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int) (reconWorkers []*Worker, applyWorker *Worker, rws *exec22.ResultsQueue, clear func(), wait func()) {
reconWorkers = make([]*Worker, workerCount)
Expand Down
2 changes: 1 addition & 1 deletion consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type ChainReader interface {
HasBlock(hash libcommon.Hash, number uint64) bool

BorEventsByBlock(hash libcommon.Hash, number uint64) []rlp.RawValue
BorStartEventID(number uint64) uint64
BorStartEventID(hash libcommon.Hash, number uint64) uint64
}

type SystemCall func(contract libcommon.Address, data []byte) ([]byte, error)
Expand Down
2 changes: 1 addition & 1 deletion core/chain_makers.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ func (cr *FakeChainReader) FrozenBlocks() uint64
func (cr *FakeChainReader) BorEventsByBlock(hash libcommon.Hash, number uint64) []rlp.RawValue {
return nil
}
func (cr *FakeChainReader) BorStartEventID(number uint64) uint64 {
func (cr *FakeChainReader) BorStartEventID(hash libcommon.Hash, number uint64) uint64 {
return 0
}
func (cr *FakeChainReader) BorSpan(spanId uint64) []byte { return nil }
54 changes: 48 additions & 6 deletions erigon-lib/recsplit/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,30 @@ type Features byte

const (
No Features = 0b0
// Enums - Whether to build two level index with perfect hash table pointing to enumeration and enumeration pointing to offsets

// Enums - To build 2-lvl index with perfect hash table pointing to enumeration and enumeration pointing to offsets
Enums Features = 0b1
//LessFalsePositives Features = 0b10 // example of adding new feature
// LessFalsePositives - Reduce false-positives to 1/256=0.4% in cost of 1byte per key
// Implementation:
// PerfectHashMap - does false-positives if unknown key is requested. But "false-positives itself" is not a problem.
// Problem is "nature of false-positives" - they are randomly/smashed across .seg files.
// It makes .seg files "warm" - which is bad because they are big and
// data-locality of touches is bad (and maybe need visit a lot of shards to find key).
// Can add build-in "existence filter" (like bloom/cucko/ribbon/xor-filter/fuse-filter) it will improve
// data-locality - filters are small-enough and existance-chekcs will be co-located on disk.
// But there are 2 additional properties we have in our data:
// "keys are known", "keys are hashed" (.idx works on murmur3), ".idx can calc key-number by key".
// It means: if we rely on this properties then we can do better than general-purpose-existance-filter.
// Seems just an "array of 1-st bytes of key-hashes" is great alternative:
// general-purpose-filter: 9bits/key, 0.3% false-positives, 3 mem access
// first-bytes-array: 8bits/key, 1/256=0.4% false-positives, 1 mem access
//
// See also: https://github.com/ledgerwatch/erigon/issues/9486
LessFalsePositives Features = 0b10 //
)

// SupportedFeaturs - if see feature not from this list (likely after downgrade) - return IncompatibleErr and recommend for user manually delete file
var SupportedFeatures = []Features{Enums}
var SupportedFeatures = []Features{Enums, LessFalsePositives}
var IncompatibleErr = errors.New("incompatible. can re-build such files by command 'erigon snapshots index'")

// Index implements index lookup from the file created by the RecSplit
Expand Down Expand Up @@ -78,6 +95,9 @@ type Index struct {
primaryAggrBound uint16 // The lower bound for primary key aggregation (computed from leafSize)
enums bool

lessFalsePositives bool
existence []byte

readers *sync.Pool
}

Expand Down Expand Up @@ -153,11 +173,22 @@ func OpenIndex(indexFilePath string) (*Index, error) {
}

idx.enums = features&Enums != No
idx.lessFalsePositives = features&LessFalsePositives != No
offset++
if idx.enums && idx.keyCount > 0 {
var size int
idx.offsetEf, size = eliasfano32.ReadEliasFano(idx.data[offset:])
offset += size

if idx.lessFalsePositives {
arrSz := binary.BigEndian.Uint64(idx.data[offset:])
offset += 8
if arrSz != idx.keyCount {
return nil, fmt.Errorf("%w. size of existence filter %d != keys count %d", IncompatibleErr, arrSz, idx.keyCount)
}
idx.existence = idx.data[offset : offset+int(arrSz)]
offset += int(arrSz)
}
}
// Size of golomb rice params
golombParamSize := binary.BigEndian.Uint16(idx.data[offset:])
Expand Down Expand Up @@ -248,13 +279,13 @@ func (idx *Index) KeyCount() uint64 {
}

// Lookup is not thread-safe because it used id.hasher
func (idx *Index) Lookup(bucketHash, fingerprint uint64) uint64 {
func (idx *Index) Lookup(bucketHash, fingerprint uint64) (uint64, bool) {
if idx.keyCount == 0 {
_, fName := filepath.Split(idx.filePath)
panic("no Lookup should be done when keyCount==0, please use Empty function to guard " + fName)
}
if idx.keyCount == 1 {
return 0
return 0, true
}
var gr GolombRiceReader
gr.data = idx.grData
Expand Down Expand Up @@ -311,7 +342,11 @@ func (idx *Index) Lookup(bucketHash, fingerprint uint64) uint64 {
rec := int(cumKeys) + int(remap16(remix(fingerprint+idx.startSeed[level]+b), m))
pos := 1 + 8 + idx.bytesPerRec*(rec+1)

return binary.BigEndian.Uint64(idx.data[pos:]) & idx.recMask
found := binary.BigEndian.Uint64(idx.data[pos:]) & idx.recMask
if idx.lessFalsePositives {
return found, idx.existence[found] == byte(bucketHash)
}
return found, true
}

// OrdinalLookup returns the offset of i-th element in the index
Expand All @@ -321,6 +356,13 @@ func (idx *Index) OrdinalLookup(i uint64) uint64 {
return idx.offsetEf.Get(i)
}

func (idx *Index) Has(bucketHash, i uint64) bool {
if idx.lessFalsePositives {
return idx.existence[i] == byte(bucketHash)
}
return true
}

func (idx *Index) ExtractOffsets() map[uint64]uint64 {
m := map[uint64]uint64{}
pos := 1 + 8 + idx.bytesPerRec
Expand Down
8 changes: 4 additions & 4 deletions erigon-lib/recsplit/index_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,20 @@ func (r *IndexReader) sum2(key1, key2 []byte) (uint64, uint64) {
}

// Lookup wraps index Lookup
func (r *IndexReader) Lookup(key []byte) uint64 {
func (r *IndexReader) Lookup(key []byte) (uint64, bool) {
bucketHash, fingerprint := r.sum(key)
if r.index != nil {
return r.index.Lookup(bucketHash, fingerprint)
}
return 0
return 0, true
}

func (r *IndexReader) Lookup2(key1, key2 []byte) uint64 {
func (r *IndexReader) Lookup2(key1, key2 []byte) (uint64, bool) {
bucketHash, fingerprint := r.sum2(key1, key2)
if r.index != nil {
return r.index.Lookup(bucketHash, fingerprint)
}
return 0
return 0, true
}

func (r *IndexReader) Empty() bool {
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/recsplit/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestReWriteIndex(t *testing.T) {
defer reidx.Close()
for i := 0; i < 100; i++ {
reader := NewIndexReader(reidx)
offset := reader.Lookup([]byte(fmt.Sprintf("key %d", i)))
offset, _ := reader.Lookup([]byte(fmt.Sprintf("key %d", i)))
if offset != uint64(i*3965) {
t.Errorf("expected offset: %d, looked up: %d", i*3965, offset)
}
Expand Down
72 changes: 64 additions & 8 deletions erigon-lib/recsplit/recsplit.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,15 @@ func remix(z uint64) uint64 {
type RecSplit struct {
hasher murmur3.Hash128 // Salted hash function to use for splitting into initial buckets and mapping to 64-bit fingerprints
offsetCollector *etl.Collector // Collector that sorts by offsets

indexW *bufio.Writer
indexF *os.File
offsetEf *eliasfano32.EliasFano // Elias Fano instance for encoding the offsets
bucketCollector *etl.Collector // Collector that sorts by buckets

existenceF *os.File
existenceW *bufio.Writer

indexFileName string
indexFile, tmpFilePath string

Expand Down Expand Up @@ -108,6 +112,7 @@ type RecSplit struct {
numBuf [8]byte
collision bool
enums bool // Whether to build two level index with perfect hash table pointing to enumeration and enumeration pointing to offsets
lessFalsePositives bool
built bool // Flag indicating that the hash function has been built and no more keys can be added
trace bool
logger log.Logger
Expand All @@ -119,7 +124,8 @@ type RecSplitArgs struct {
// Whether two level index needs to be built, where perfect hash map points to an enumeration, and enumeration points to offsets
// if Enum=false: can have unsorted and duplicated values
// if Enum=true: must have sorted values (can have duplicates) - monotonically growing sequence
Enums bool
Enums bool
LessFalsePositives bool

IndexFile string // File name where the index and the minimal perfect hash function will be written to
TmpDir string
Expand Down Expand Up @@ -174,6 +180,15 @@ func NewRecSplit(args RecSplitArgs, logger log.Logger) (*RecSplit, error) {
rs.offsetCollector = etl.NewCollector(RecSplitLogPrefix+" "+fname, rs.tmpDir, etl.NewSortableBuffer(rs.etlBufLimit), logger)
rs.offsetCollector.LogLvl(log.LvlDebug)
}
rs.lessFalsePositives = args.LessFalsePositives
if rs.enums && args.KeyCount > 0 && rs.lessFalsePositives {
bufferFile, err := os.CreateTemp(rs.tmpDir, "erigon-lfp-buf-")
if err != nil {
return nil, err
}
rs.existenceF = bufferFile
rs.existenceW = bufio.NewWriter(rs.existenceF)
}
rs.currentBucket = make([]uint64, 0, args.BucketSize)
rs.currentBucketOffs = make([]uint64, 0, args.BucketSize)
rs.maxOffset = 0
Expand All @@ -198,6 +213,9 @@ func (rs *RecSplit) Close() {
if rs.indexF != nil {
rs.indexF.Close()
}
if rs.existenceF != nil {
rs.existenceF.Close()
}
if rs.bucketCollector != nil {
rs.bucketCollector.Close()
}
Expand All @@ -214,8 +232,8 @@ func (rs *RecSplit) SetTrace(trace bool) {

// remap converts the number x which is assumed to be uniformly distributed over the range [0..2^64) to the number that is uniformly
// distributed over the range [0..n)
func remap(x uint64, n uint64) uint64 {
hi, _ := bits.Mul64(x, n)
func remap(x uint64, n uint64) (hi uint64) {
hi, _ = bits.Mul64(x, n)
return hi
}

Expand Down Expand Up @@ -264,6 +282,8 @@ func splitParams(m, leafSize, primaryAggrBound, secondaryAggrBound uint16) (fano
return
}

var golombBaseLog2 = -math.Log((math.Sqrt(5) + 1.0) / 2.0)

func computeGolombRice(m uint16, table []uint32, leafSize, primaryAggrBound, secondaryAggrBound uint16) {
fanout, unit := splitParams(m, leafSize, primaryAggrBound, secondaryAggrBound)
k := make([]uint16, fanout)
Expand All @@ -277,7 +297,7 @@ func computeGolombRice(m uint16, table []uint32, leafSize, primaryAggrBound, sec
sqrtProd *= math.Sqrt(float64(k[i]))
}
p := math.Sqrt(float64(m)) / (math.Pow(2*math.Pi, (float64(fanout)-1.)/2.0) * sqrtProd)
golombRiceLength := uint32(math.Ceil(math.Log2(-math.Log((math.Sqrt(5)+1.0)/2.0) / math.Log1p(-p)))) // log2 Golomb modulus
golombRiceLength := uint32(math.Ceil(math.Log2(golombBaseLog2 / math.Log1p(-p)))) // log2 Golomb modulus
if golombRiceLength > 0x1F {
panic("golombRiceLength > 0x1F")
}
Expand All @@ -303,8 +323,7 @@ func computeGolombRice(m uint16, table []uint32, leafSize, primaryAggrBound, sec
// salt for the part of the hash function separating m elements. It is based on
// calculations with assumptions that we draw hash functions at random
func (rs *RecSplit) golombParam(m uint16) int {
s := uint16(len(rs.golombRice))
for m >= s {
for s := uint16(len(rs.golombRice)); m >= s; s++ {
rs.golombRice = append(rs.golombRice, 0)
// For the case where bucket is larger than planned
if s == 0 {
Expand All @@ -314,7 +333,6 @@ func (rs *RecSplit) golombParam(m uint16) int {
} else {
computeGolombRice(s, rs.golombRice, rs.leafSize, rs.primaryAggrBound, rs.secondaryAggrBound)
}
s++
}
return int(rs.golombRice[m] >> 27)
}
Expand Down Expand Up @@ -350,6 +368,12 @@ func (rs *RecSplit) AddKey(key []byte, offset uint64) error {
if err := rs.bucketCollector.Collect(rs.bucketKeyBuf[:], rs.numBuf[:]); err != nil {
return err
}
if rs.lessFalsePositives {
//1 byte from each hashed key
if err := rs.existenceW.WriteByte(byte(hi)); err != nil {
return err
}
}
} else {
if err := rs.bucketCollector.Collect(rs.bucketKeyBuf[:], rs.numBuf[:]); err != nil {
return err
Expand Down Expand Up @@ -561,7 +585,7 @@ func (rs *RecSplit) Build(ctx context.Context) error {
return fmt.Errorf("create index file %s: %w", rs.indexFile, err)
}

rs.logger.Debug("[index] created", "file", rs.tmpFilePath, "fs", rs.indexF)
rs.logger.Debug("[index] created", "file", rs.tmpFilePath)

defer rs.indexF.Close()
rs.indexW = bufio.NewWriterSize(rs.indexF, etl.BufIOSize)
Expand Down Expand Up @@ -652,6 +676,9 @@ func (rs *RecSplit) Build(ctx context.Context) error {
var features Features
if rs.enums {
features |= Enums
if rs.lessFalsePositives {
features |= LessFalsePositives
}
}
if err := rs.indexW.WriteByte(byte(features)); err != nil {
return fmt.Errorf("writing enums = true: %w", err)
Expand All @@ -662,6 +689,10 @@ func (rs *RecSplit) Build(ctx context.Context) error {
return fmt.Errorf("writing elias fano for offsets: %w", err)
}
}
if err := rs.flushExistenceFilter(); err != nil {
return err
}

// Write out the size of golomb rice params
binary.BigEndian.PutUint16(rs.numBuf[:], uint16(len(rs.golombRice)))
if _, err := rs.indexW.Write(rs.numBuf[:4]); err != nil {
Expand Down Expand Up @@ -694,6 +725,31 @@ func (rs *RecSplit) Build(ctx context.Context) error {
return nil
}

func (rs *RecSplit) flushExistenceFilter() error {
if !rs.enums || rs.keysAdded == 0 || !rs.lessFalsePositives {
return nil
}
defer rs.existenceF.Close()

//Write len of array
binary.BigEndian.PutUint64(rs.numBuf[:], rs.keysAdded)
if _, err := rs.indexW.Write(rs.numBuf[:]); err != nil {
return err
}

// flush bufio and rewind before io.Copy, but no reason to fsync the file - it temporary
if err := rs.existenceW.Flush(); err != nil {
return err
}
if _, err := rs.existenceF.Seek(0, io.SeekStart); err != nil {
return err
}
if _, err := io.CopyN(rs.indexW, rs.existenceF, int64(rs.keysAdded)); err != nil {
return err
}
return nil
}

func (rs *RecSplit) DisableFsync() { rs.noFsync = true }

// Fsync - other processes/goroutines must see only "fully-complete" (valid) files. No partial-writes.
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/recsplit/recsplit_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func FuzzRecSplit(f *testing.F) {
bits := make([]uint64, bitCount)
reader := NewIndexReader(idx)
for i = 0; i < len(in)-l; i += l {
off = reader.Lookup(in[i : i+l])
off, _ = reader.Lookup(in[i : i+l])
if int(off) >= count {
t.Errorf("off %d >= count %d", off, count)
}
Expand Down
Loading

0 comments on commit 46fb674

Please sign in to comment.