diff --git a/.github/workflows/scripts/run_rpc_tests.sh b/.github/workflows/scripts/run_rpc_tests.sh index d08883623c3..b3a5bdf7af2 100755 --- a/.github/workflows/scripts/run_rpc_tests.sh +++ b/.github/workflows/scripts/run_rpc_tests.sh @@ -20,12 +20,15 @@ disabled_tests=( eth_getBlockByHash/test_10.json eth_getBlockByNumber/test_12.json # Erigon bugs: https://github.com/erigontech/erigon/pull/12609 - debug_accountRange,debug_storageRangeAt + debug_accountRange + debug_storageRangeAt # need update rpc-test - because Erigon is correct (@AskAlexSharov will do after https://github.com/erigontech/erigon/pull/12634) # remove this line after https://github.com/erigontech/rpc-tests/pull/273 - debug_getModifiedAccountsByHash,debug_getModifiedAccountsByNumber + debug_getModifiedAccountsByHash + debug_getModifiedAccountsByNumber # Erigon bug https://github.com/erigontech/erigon/issues/12603 - erigon_getLatestLogs,erigon_getLogsByHash/test_04.json + erigon_getLatestLogs + erigon_getLogsByHash/test_04.json # Erigon bug https://github.com/erigontech/erigon/issues/12637 debug_traceBlockByNumber/test_05.tar debug_traceBlockByNumber/test_08.tar @@ -36,7 +39,7 @@ disabled_tests=( # remove this line after https://github.com/erigontech/rpc-tests/pull/281 parity_getBlockReceipts parity_listStorageKeys/test_12.json - # to investigate + # created task https://github.com/erigontech/erigon/issues/12668 debug_traceCallMany/test_02.tar debug_traceCallMany/test_04.tar debug_traceCallMany/test_05.tar @@ -44,6 +47,7 @@ disabled_tests=( debug_traceCallMany/test_07.tar debug_traceCallMany/test_09.json debug_traceCallMany/test_10.tar + # to investigate engine_exchangeCapabilities/test_1.json engine_exchangeTransitionConfigurationV1/test_01.json engine_getClientVersionV1/test_1.json diff --git a/ChangeLog.md b/ChangeLog.md index 03a0d079a2d..17b971f8c59 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -3,6 +3,11 @@ ChangeLog ## v3.0.0-alpha5 (in development) +- Mostly we did work on rpc-compatibility tests +- Caplin eating 1Gb less RAM. And Erigon3 works on 16gb machine. +- Hard-time-limit on chain-tip pruning: https://github.com/erigontech/erigon/pull/12535 +- + ### TODO Acknowledgements: diff --git a/core/rawdb/rawtemporaldb/accessors_receipt_test.go b/core/rawdb/rawtemporaldb/accessors_receipt_test.go index d96d99e257e..03f5e95155b 100644 --- a/core/rawdb/rawtemporaldb/accessors_receipt_test.go +++ b/core/rawdb/rawtemporaldb/accessors_receipt_test.go @@ -49,27 +49,27 @@ func TestAppendReceipt(t *testing.T) { require.NoError(err) ttx := tx.(kv.TemporalTx) - v, ok, err := ttx.HistorySeek(kv.ReceiptHistory, FirstLogIndexKey, 0) + v, ok, err := ttx.HistorySeek(kv.ReceiptDomain, FirstLogIndexKey, 0) require.NoError(err) require.True(ok) require.Empty(v) - v, ok, err = ttx.HistorySeek(kv.ReceiptHistory, FirstLogIndexKey, 1) + v, ok, err = ttx.HistorySeek(kv.ReceiptDomain, FirstLogIndexKey, 1) require.NoError(err) require.True(ok) require.Equal(uint64(0), uvarint(v)) - v, ok, err = ttx.HistorySeek(kv.ReceiptHistory, FirstLogIndexKey, 2) + v, ok, err = ttx.HistorySeek(kv.ReceiptDomain, FirstLogIndexKey, 2) require.NoError(err) require.True(ok) require.Equal(uint64(1), uvarint(v)) - v, ok, err = ttx.HistorySeek(kv.ReceiptHistory, FirstLogIndexKey, 3) + v, ok, err = ttx.HistorySeek(kv.ReceiptDomain, FirstLogIndexKey, 3) require.NoError(err) require.True(ok) require.Equal(uint64(1), uvarint(v)) - v, ok, err = ttx.HistorySeek(kv.ReceiptHistory, FirstLogIndexKey, 4) + v, ok, err = ttx.HistorySeek(kv.ReceiptDomain, FirstLogIndexKey, 4) require.NoError(err) require.True(ok) require.Equal(uint64(0), uvarint(v)) diff --git a/core/state/history_reader_v3.go b/core/state/history_reader_v3.go index e971215628f..0e883e5de57 100644 --- a/core/state/history_reader_v3.go +++ b/core/state/history_reader_v3.go @@ -222,7 +222,7 @@ func (s *PlainState) ForEachStorage(addr common.Address, startLocation common.Ha st := btree.New(16) var k [length.Addr + length.Incarnation + length.Hash]byte copy(k[:], addr[:]) - accData, err := GetAsOf(s.tx, s.accHistoryC, s.accChangesC, false , addr[:], s.blockNr) + accData, err := DomainGetAsOf(s.tx, s.accHistoryC, s.accChangesC, false , addr[:], s.blockNr) if err != nil { return err } diff --git a/erigon-lib/kv/bitmapdb/fixed_size.go b/erigon-lib/kv/bitmapdb/fixed_size_bitmaps.go similarity index 100% rename from erigon-lib/kv/bitmapdb/fixed_size.go rename to erigon-lib/kv/bitmapdb/fixed_size_bitmaps.go diff --git a/erigon-lib/kv/bitmapdb/fixed_size_test.go b/erigon-lib/kv/bitmapdb/fixed_size_bitmaps_test.go similarity index 100% rename from erigon-lib/kv/bitmapdb/fixed_size_test.go rename to erigon-lib/kv/bitmapdb/fixed_size_bitmaps_test.go diff --git a/erigon-lib/kv/kv_interface.go b/erigon-lib/kv/kv_interface.go index e40e7f6fda6..185ae931a76 100644 --- a/erigon-lib/kv/kv_interface.go +++ b/erigon-lib/kv/kv_interface.go @@ -513,13 +513,10 @@ type TemporalTx interface { // DomainGetAsOf - state as of given `ts` // Example: GetAsOf(Account, key, txNum) - retuns account's value before `txNum` transaction changed it - // Means if you want re-execute `txNum` on historical state - do `GetAsOf(key, txNum)` to read state + // Means if you want re-execute `txNum` on historical state - do `DomainGetAsOf(key, txNum)` to read state // `ok = false` means: key not found. or "future txNum" passed. DomainGetAsOf(name Domain, k, k2 []byte, ts uint64) (v []byte, ok bool, err error) - - // HistorySeek - like `DomainGetAsOf` but without latest state - only for `History` - // `ok == true && v != nil && len(v) == 0` means key-creation even - HistorySeek(name History, k []byte, ts uint64) (v []byte, ok bool, err error) + DomainRange(name Domain, fromKey, toKey []byte, ts uint64, asc order.By, limit int) (it stream.KV, err error) // IndexRange - return iterator over range of inverted index for given key `k` // Asc semantic: [from, to) AND from > to @@ -529,11 +526,14 @@ type TemporalTx interface { // Example: IndexRange("IndexName", 10, 5, order.Desc, -1) // Example: IndexRange("IndexName", -1, -1, order.Asc, 10) IndexRange(name InvertedIdx, k []byte, fromTs, toTs int, asc order.By, limit int) (timestamps stream.U64, err error) - DomainRange(name Domain, fromKey, toKey []byte, ts uint64, asc order.By, limit int) (it stream.KV, err error) + + // HistorySeek - like `DomainGetAsOf` but without latest state - only for `History` + // `ok == true && v != nil && len(v) == 0` means key-creation even + HistorySeek(name Domain, k []byte, ts uint64) (v []byte, ok bool, err error) // HistoryRange - producing "state patch" - sorted list of keys updated at [fromTs,toTs) with their most-recent value. // no duplicates - HistoryRange(name History, fromTs, toTs int, asc order.By, limit int) (it stream.KV, err error) + HistoryRange(name Domain, fromTs, toTs int, asc order.By, limit int) (it stream.KV, err error) } type TemporalRwTx interface { diff --git a/erigon-lib/kv/membatchwithdb/memory_mutation.go b/erigon-lib/kv/membatchwithdb/memory_mutation.go index 9e2b2d37b42..22d6e03d26d 100644 --- a/erigon-lib/kv/membatchwithdb/memory_mutation.go +++ b/erigon-lib/kv/membatchwithdb/memory_mutation.go @@ -717,7 +717,7 @@ func (m *MemoryMutation) DomainGet(name kv.Domain, k, k2 []byte) (v []byte, step //return m.db.(kv.TemporalTx).DomainGet(name, k, k2) } -func (m *MemoryMutation) DomainGetAsOf(name kv.Domain, k, k2 []byte, ts uint64) (v []byte, ok bool, err error) { +func (m *MemoryMutation) GetAsOf(name kv.Domain, k, k2 []byte, ts uint64) (v []byte, ok bool, err error) { panic("not supported") //return m.db.(kv.TemporalTx).DomainGetAsOf(name, k, k2, ts) } diff --git a/erigon-lib/kv/remotedb/kv_remote.go b/erigon-lib/kv/remotedb/kv_remote.go index b6e7330ba1a..6aee9582a1b 100644 --- a/erigon-lib/kv/remotedb/kv_remote.go +++ b/erigon-lib/kv/remotedb/kv_remote.go @@ -650,16 +650,16 @@ func (tx *tx) DomainRange(name kv.Domain, fromKey, toKey []byte, ts uint64, asc return reply.Keys, reply.Values, reply.NextPageToken, nil }), nil } -func (tx *tx) HistorySeek(name kv.History, k []byte, ts uint64) (v []byte, ok bool, err error) { - reply, err := tx.db.remoteKV.HistorySeek(tx.ctx, &remote.HistorySeekReq{TxId: tx.id, Table: string(name), K: k, Ts: ts}) +func (tx *tx) HistorySeek(name kv.Domain, k []byte, ts uint64) (v []byte, ok bool, err error) { + reply, err := tx.db.remoteKV.HistorySeek(tx.ctx, &remote.HistorySeekReq{TxId: tx.id, Table: name.String(), K: k, Ts: ts}) if err != nil { return nil, false, err } return reply.V, reply.Ok, nil } -func (tx *tx) HistoryRange(name kv.History, fromTs, toTs int, asc order.By, limit int) (it stream.KV, err error) { +func (tx *tx) HistoryRange(name kv.Domain, fromTs, toTs int, asc order.By, limit int) (it stream.KV, err error) { return stream.PaginateKV(func(pageToken string) (keys, vals [][]byte, nextPageToken string, err error) { - reply, err := tx.db.remoteKV.HistoryRange(tx.ctx, &remote.HistoryRangeReq{TxId: tx.id, Table: string(name), FromTs: int64(fromTs), ToTs: int64(toTs), OrderAscend: bool(asc), Limit: int64(limit), PageToken: pageToken}) + reply, err := tx.db.remoteKV.HistoryRange(tx.ctx, &remote.HistoryRangeReq{TxId: tx.id, Table: name.String(), FromTs: int64(fromTs), ToTs: int64(toTs), OrderAscend: bool(asc), Limit: int64(limit), PageToken: pageToken}) if err != nil { return nil, nil, "", err } diff --git a/erigon-lib/kv/remotedbserver/remotedbserver.go b/erigon-lib/kv/remotedbserver/remotedbserver.go index a4d4b493945..1c1499bdb6d 100644 --- a/erigon-lib/kv/remotedbserver/remotedbserver.go +++ b/erigon-lib/kv/remotedbserver/remotedbserver.go @@ -561,7 +561,11 @@ func (s *KvServer) HistorySeek(_ context.Context, req *remote.HistorySeekReq) (r if !ok { return errors.New("server DB doesn't implement kv.Temporal interface") } - reply.V, reply.Ok, err = ttx.HistorySeek(kv.History(req.Table), req.K, req.Ts) + domain, err := kv.String2Domain(req.Table) + if err != nil { + return err + } + reply.V, reply.Ok, err = ttx.HistorySeek(domain, req.K, req.Ts) if err != nil { return err } @@ -629,7 +633,11 @@ func (s *KvServer) HistoryRange(_ context.Context, req *remote.HistoryRangeReq) if !ok { return fmt.Errorf("server DB doesn't implement kv.Temporal interface") } - it, err := ttx.HistoryRange(kv.History(req.Table), fromTs, int(req.ToTs), order.By(req.OrderAscend), limit) + domain, err := kv.String2Domain(req.Table) + if err != nil { + return err + } + it, err := ttx.HistoryRange(domain, fromTs, int(req.ToTs), order.By(req.OrderAscend), limit) if err != nil { return err } diff --git a/erigon-lib/kv/tables.go b/erigon-lib/kv/tables.go index 4391e16bb00..ed9d15c3164 100644 --- a/erigon-lib/kv/tables.go +++ b/erigon-lib/kv/tables.go @@ -126,7 +126,7 @@ AccountsHistory and StorageHistory - indices designed to serve next 2 type of re 1. what is smallest block number >= X where account A changed 2. get last shard of A - to append there new block numbers -Task 1. is part of "get historical state" operation (see `core/state:GetAsOf`): +Task 1. is part of "get historical state" operation (see `core/state:DomainGetAsOf`): If `db.seekInFiles(A+bigEndian(X))` returns non-last shard - then get block number from shard value Y := RoaringBitmap(shard_value).GetGte(X) @@ -863,14 +863,6 @@ const ( DomainLen Domain = 5 ) -const ( - AccountsHistory History = "AccountsHistory" - StorageHistory History = "StorageHistory" - CodeHistory History = "CodeHistory" - CommitmentHistory History = "CommitmentHistory" - ReceiptHistory History = "ReceiptHistory" -) - const ( AccountsHistoryIdx InvertedIdx = "AccountsHistoryIdx" StorageHistoryIdx InvertedIdx = "StorageHistoryIdx" diff --git a/erigon-lib/kv/temporal/kv_temporal.go b/erigon-lib/kv/temporal/kv_temporal.go index 8aecc7565c5..fb5099397c2 100644 --- a/erigon-lib/kv/temporal/kv_temporal.go +++ b/erigon-lib/kv/temporal/kv_temporal.go @@ -218,10 +218,10 @@ func (tx *Tx) DomainGetAsOf(name kv.Domain, key, key2 []byte, ts uint64) (v []by if key2 != nil { key = append(common.Copy(key), key2...) } - return tx.filesTx.DomainGetAsOf(tx.MdbxTx, name, key, ts) + return tx.filesTx.GetAsOf(tx.MdbxTx, name, key, ts) } -func (tx *Tx) HistorySeek(name kv.History, key []byte, ts uint64) (v []byte, ok bool, err error) { +func (tx *Tx) HistorySeek(name kv.Domain, key []byte, ts uint64) (v []byte, ok bool, err error) { return tx.filesTx.HistorySeek(name, key, ts, tx.MdbxTx) } @@ -234,7 +234,7 @@ func (tx *Tx) IndexRange(name kv.InvertedIdx, k []byte, fromTs, toTs int, asc or return timestamps, nil } -func (tx *Tx) HistoryRange(name kv.History, fromTs, toTs int, asc order.By, limit int) (stream.KV, error) { +func (tx *Tx) HistoryRange(name kv.Domain, fromTs, toTs int, asc order.By, limit int) (stream.KV, error) { it, err := tx.filesTx.HistoryRange(name, fromTs, toTs, asc, limit, tx.MdbxTx) if err != nil { return nil, err diff --git a/erigon-lib/state/aggregator.go b/erigon-lib/state/aggregator.go index 6053ded41cc..e29c4ccc843 100644 --- a/erigon-lib/state/aggregator.go +++ b/erigon-lib/state/aggregator.go @@ -1732,10 +1732,10 @@ func (ac *AggregatorRoTx) IndexRange(name kv.InvertedIdx, k []byte, fromTs, toTs // -- range end -func (ac *AggregatorRoTx) HistorySeek(name kv.History, key []byte, ts uint64, tx kv.Tx) (v []byte, ok bool, err error) { - switch name { - case kv.AccountsHistory: - v, ok, err = ac.d[kv.AccountsDomain].ht.HistorySeek(key, ts, tx) +func (ac *AggregatorRoTx) HistorySeek(domain kv.Domain, key []byte, ts uint64, tx kv.Tx) (v []byte, ok bool, err error) { + switch domain { + case kv.AccountsDomain: + v, ok, err = ac.d[domain].ht.HistorySeek(key, ts, tx) if err != nil { return nil, false, err } @@ -1743,37 +1743,13 @@ func (ac *AggregatorRoTx) HistorySeek(name kv.History, key []byte, ts uint64, tx return v, ok, nil } return v, true, nil - case kv.StorageHistory: - return ac.d[kv.StorageDomain].ht.HistorySeek(key, ts, tx) - case kv.CodeHistory: - return ac.d[kv.CodeDomain].ht.HistorySeek(key, ts, tx) - case kv.CommitmentHistory: - return ac.d[kv.CommitmentDomain].ht.HistorySeek(key, ts, tx) - case kv.ReceiptHistory: - return ac.d[kv.ReceiptDomain].ht.HistorySeek(key, ts, tx) - //case kv.GasUsedHistory: - // return ac.d[kv.GasUsedDomain].ht.HistorySeek(key, ts, tx) default: - panic(fmt.Sprintf("unexpected: %s", name)) + return ac.d[domain].ht.HistorySeek(key, ts, tx) } } -func (ac *AggregatorRoTx) HistoryRange(name kv.History, fromTs, toTs int, asc order.By, limit int, tx kv.Tx) (it stream.KV, err error) { - //TODO: aggTx to store array of histories - var domainName kv.Domain - - switch name { - case kv.AccountsHistory: - domainName = kv.AccountsDomain - case kv.StorageHistory: - domainName = kv.StorageDomain - case kv.CodeHistory: - domainName = kv.CodeDomain - default: - return nil, fmt.Errorf("unexpected history name: %s", name) - } - - hr, err := ac.d[domainName].ht.HistoryRange(fromTs, toTs, asc, limit, tx) +func (ac *AggregatorRoTx) HistoryRange(domain kv.Domain, fromTs, toTs int, asc order.By, limit int, tx kv.Tx) (it stream.KV, err error) { + hr, err := ac.d[domain].ht.HistoryRange(fromTs, toTs, asc, limit, tx) if err != nil { return nil, err } @@ -1844,11 +1820,11 @@ func (ac *AggregatorRoTx) DomainRange(ctx context.Context, tx kv.Tx, domain kv.D func (ac *AggregatorRoTx) DomainRangeLatest(tx kv.Tx, domain kv.Domain, from, to []byte, limit int) (stream.KV, error) { return ac.d[domain].DomainRangeLatest(tx, from, to, limit) } -func (ac *AggregatorRoTx) DomainGetAsOfFile(name kv.Domain, key []byte, ts uint64) (v []byte, ok bool, err error) { +func (ac *AggregatorRoTx) GetAsOfFile(name kv.Domain, key []byte, ts uint64) (v []byte, ok bool, err error) { return ac.d[name].GetAsOfFile(key, ts) } -func (ac *AggregatorRoTx) DomainGetAsOf(tx kv.Tx, name kv.Domain, key []byte, ts uint64) (v []byte, ok bool, err error) { +func (ac *AggregatorRoTx) GetAsOf(tx kv.Tx, name kv.Domain, key []byte, ts uint64) (v []byte, ok bool, err error) { return ac.d[name].GetAsOf(key, ts, tx) } func (ac *AggregatorRoTx) GetLatest(domain kv.Domain, k, k2 []byte, tx kv.Tx) (v []byte, step uint64, ok bool, err error) { diff --git a/erigon-lib/state/domain.go b/erigon-lib/state/domain.go index 52d3e60c012..8ce0425ca29 100644 --- a/erigon-lib/state/domain.go +++ b/erigon-lib/state/domain.go @@ -18,19 +18,14 @@ package state import ( "bytes" - "container/heap" "context" "encoding/binary" "errors" "fmt" "math" "path/filepath" - "regexp" "sort" - "strconv" - "strings" "sync" - "sync/atomic" "time" "github.com/erigontech/erigon-lib/metrics" @@ -104,7 +99,6 @@ type Domain struct { compression seg.FileCompression valsTable string // key -> inverted_step + values (Dupsort) - stats DomainStats indexList idxList } @@ -146,7 +140,6 @@ func NewDomain(cfg domainCfg, aggregationStep uint64, name kv.Domain, valsTable, compression: cfg.compress, dirtyFiles: btree2.NewBTreeGOptions[*filesItem](filesItemLess, btree2.Options{Degree: 128, NoLocks: false}), - stats: DomainStats{FilesQueries: &atomic.Uint64{}, TotalQueries: &atomic.Uint64{}}, indexList: withBTree | withExistence, replaceKeysInValues: cfg.replaceKeysInValues, // for commitment domain only @@ -255,14 +248,6 @@ func (d *Domain) openFolder() error { return nil } -func (d *Domain) GetAndResetStats() DomainStats { - r := d.stats - r.DataSize, r.IndexSize, r.FilesCount = d.collectFilesStats() - - d.stats = DomainStats{FilesQueries: &atomic.Uint64{}, TotalQueries: &atomic.Uint64{}} - return r -} - func (d *Domain) closeFilesAfterStep(lowerBound uint64) { var toClose []*filesItem d.dirtyFiles.Scan(func(item *filesItem) bool { @@ -317,52 +302,18 @@ func (d *Domain) closeFilesAfterStep(lowerBound uint64) { } func (d *Domain) scanDirtyFiles(fileNames []string) (garbageFiles []*filesItem) { - re := regexp.MustCompile("^v([0-9]+)-" + d.filenameBase + ".([0-9]+)-([0-9]+).kv$") - var err error - - for _, name := range fileNames { - subs := re.FindStringSubmatch(name) - if len(subs) != 4 { - if len(subs) != 0 { - d.logger.Warn("File ignored by domain scan, more than 4 submatches", "name", name, "submatches", len(subs)) - } - continue - } - var startStep, endStep uint64 - if startStep, err = strconv.ParseUint(subs[2], 10, 64); err != nil { - d.logger.Warn("File ignored by domain scan, parsing startTxNum", "error", err, "name", name) - continue - } - if endStep, err = strconv.ParseUint(subs[3], 10, 64); err != nil { - d.logger.Warn("File ignored by domain scan, parsing endTxNum", "error", err, "name", name) - continue - } - if startStep > endStep { - d.logger.Warn("File ignored by domain scan, startTxNum > endTxNum", "name", name) - continue - } - + for _, dirtyFile := range scanDirtyFiles(fileNames, d.aggregationStep, d.filenameBase, "kv", d.logger) { + startStep, endStep := dirtyFile.startTxNum/d.aggregationStep, dirtyFile.endTxNum/d.aggregationStep domainName, _ := kv.String2Domain(d.filenameBase) if d.integrityCheck != nil && !d.integrityCheck(domainName, startStep, endStep) { - d.logger.Debug("[agg] skip garbage file", "name", name) + d.logger.Debug("[agg] skip garbage file", "name", d.filenameBase, "startStep", startStep, "endStep", endStep) continue } + dirtyFile.frozen = false - // Semantic: [startTxNum, endTxNum) - // Example: - // stepSize = 4 - // 0-1.kv: [0, 8) - // 0-2.kv: [0, 16) - // 1-2.kv: [8, 16) - startTxNum, endTxNum := startStep*d.aggregationStep, endStep*d.aggregationStep - - var newFile = newFilesItem(startTxNum, endTxNum, d.aggregationStep) - newFile.frozen = false - - if _, has := d.dirtyFiles.Get(newFile); has { - continue + if _, has := d.dirtyFiles.Get(dirtyFile); !has { + d.dirtyFiles.Set(dirtyFile) } - d.dirtyFiles.Set(newFile) } return garbageFiles } @@ -681,69 +632,6 @@ func (w *domainBufferedWriter) addValue(key1, key2, value []byte) error { return nil } -type CursorType uint8 - -const ( - FILE_CURSOR CursorType = iota - DB_CURSOR - RAM_CURSOR -) - -// CursorItem is the item in the priority queue used to do merge interation -// over storage of a given account -type CursorItem struct { - cDup kv.CursorDupSort - cNonDup kv.Cursor - - iter btree2.MapIter[string, dataWithPrevStep] - dg *seg.Reader - dg2 *seg.Reader - btCursor *Cursor - key []byte - val []byte - step uint64 - startTxNum uint64 - endTxNum uint64 - latestOffset uint64 // offset of the latest value in the file - t CursorType // Whether this item represents state file or DB record, or tree - reverse bool -} - -type CursorHeap []*CursorItem - -func (ch CursorHeap) Len() int { - return len(ch) -} - -func (ch CursorHeap) Less(i, j int) bool { - cmp := bytes.Compare(ch[i].key, ch[j].key) - if cmp == 0 { - // when keys match, the items with later blocks are preferred - if ch[i].reverse { - return ch[i].endTxNum > ch[j].endTxNum - } - return ch[i].endTxNum < ch[j].endTxNum - } - return cmp < 0 -} - -func (ch *CursorHeap) Swap(i, j int) { - (*ch)[i], (*ch)[j] = (*ch)[j], (*ch)[i] -} - -func (ch *CursorHeap) Push(x interface{}) { - *ch = append(*ch, x.(*CursorItem)) -} - -func (ch *CursorHeap) Pop() interface{} { - old := *ch - n := len(old) - x := old[n-1] - old[n-1] = nil - *ch = old[0 : n-1] - return x -} - // DomainRoTx allows accesing the same domain from multiple go-routines type DomainRoTx struct { files visibleFiles @@ -870,40 +758,6 @@ func (dt *DomainRoTx) DebugEFKey(k []byte) error { return nil } -func (d *Domain) collectFilesStats() (datsz, idxsz, files uint64) { - d.History.dirtyFiles.Walk(func(items []*filesItem) bool { - for _, item := range items { - if item.index == nil { - return false - } - datsz += uint64(item.decompressor.Size()) - idxsz += uint64(item.index.Size()) - idxsz += uint64(item.bindex.Size()) - files += 3 - } - return true - }) - - d.dirtyFiles.Walk(func(items []*filesItem) bool { - for _, item := range items { - if item.index == nil { - return false - } - datsz += uint64(item.decompressor.Size()) - idxsz += uint64(item.index.Size()) - idxsz += uint64(item.bindex.Size()) - files += 3 - } - return true - }) - - fcnt, fsz, isz := d.History.InvertedIndex.collectFilesStat() - datsz += fsz - files += fcnt - idxsz += isz - return -} - func (d *Domain) BeginFilesRo() *DomainRoTx { for i := 0; i < len(d._visible.files); i++ { if !d._visible.files[i].src.frozen { @@ -970,7 +824,6 @@ func (d *Domain) collateETL(ctx context.Context, stepFrom, stepTo uint64, wal *e if closeCollation { coll.Close() } - d.stats.LastCollationTook = time.Since(started) mxCollateTook.ObserveDuration(started) }() @@ -1071,7 +924,6 @@ func (d *Domain) collate(ctx context.Context, step, txFrom, txTo uint64, roTx kv started := time.Now() defer func() { - d.stats.LastCollationTook = time.Since(started) mxCollateTook.ObserveDuration(started) }() @@ -1209,7 +1061,6 @@ func (d *Domain) buildFileRange(ctx context.Context, stepFrom, stepTo uint64, co start := time.Now() defer func() { - d.stats.LastFileBuildingTook = time.Since(start) mxBuildTook.ObserveDuration(start) }() @@ -1304,7 +1155,6 @@ func (d *Domain) buildFiles(ctx context.Context, step uint64, collation Collatio start := time.Now() defer func() { - d.stats.LastFileBuildingTook = time.Since(start) mxBuildTook.ObserveDuration(start) }() @@ -1772,12 +1622,12 @@ func (dt *DomainRoTx) GetAsOf(key []byte, txNum uint64, roTx kv.Tx) ([]byte, boo if hOk { if len(v) == 0 { // if history successfuly found marker of key creation if traceGetAsOf == dt.d.filenameBase { - fmt.Printf("GetAsOf(%s , %x, %d) -> not found in history\n", dt.d.filenameBase, key, txNum) + fmt.Printf("DomainGetAsOf(%s , %x, %d) -> not found in history\n", dt.d.filenameBase, key, txNum) } return nil, false, nil } if traceGetAsOf == dt.d.filenameBase { - fmt.Printf("GetAsOf(%s, %x, %d) -> found in history\n", dt.d.filenameBase, key, txNum) + fmt.Printf("DomainGetAsOf(%s, %x, %d) -> found in history\n", dt.d.filenameBase, key, txNum) } return v, v != nil, nil } @@ -2218,205 +2068,6 @@ func (sr *SegStreamReader) Next() (k, v []byte, err error) { return k, v, nil } -type DomainLatestIterFile struct { - aggStep uint64 - roTx kv.Tx - valsTable string - - limit int - largeVals bool - from, to []byte - orderAscend order.By - - h *CursorHeap - - nextKey, nextVal []byte - k, v, kBackup, vBackup []byte - - logger log.Logger -} - -func (hi *DomainLatestIterFile) Close() { -} -func (hi *DomainLatestIterFile) Trace(prefix string) *stream.TracedDuo[[]byte, []byte] { - return stream.TraceDuo(hi, hi.logger, "[dbg] DomainLatestIterFile.Next "+prefix) -} -func (hi *DomainLatestIterFile) init(dc *DomainRoTx) error { - // Implementation: - // File endTxNum = last txNum of file step - // DB endTxNum = first txNum of step in db - // RAM endTxNum = current txnum - // Example: stepSize=8, file=0-2.kv, db has key of step 2, current txn num is 17 - // File endTxNum = 15, because `0-2.kv` has steps 0 and 1, last txNum of step 1 is 15 - // DB endTxNum = 16, because db has step 2, and first txNum of step 2 is 16. - // RAM endTxNum = 17, because current tcurrent txNum is 17 - hi.largeVals = dc.d.largeVals - heap.Init(hi.h) - var key, value []byte - - if dc.d.largeVals { - valsCursor, err := hi.roTx.Cursor(dc.d.valsTable) - if err != nil { - return err - } - if key, value, err = valsCursor.Seek(hi.from); err != nil { - return err - } - if key != nil && (hi.to == nil || bytes.Compare(key[:len(key)-8], hi.to) < 0) { - k := key[:len(key)-8] - stepBytes := key[len(key)-8:] - step := ^binary.BigEndian.Uint64(stepBytes) - endTxNum := step * dc.d.aggregationStep // DB can store not-finished step, it means - then set first txn in step - it anyway will be ahead of files - - heap.Push(hi.h, &CursorItem{t: DB_CURSOR, key: common.Copy(k), val: common.Copy(value), cNonDup: valsCursor, endTxNum: endTxNum, reverse: true}) - } - } else { - valsCursor, err := hi.roTx.CursorDupSort(dc.d.valsTable) - if err != nil { - return err - } - - if key, value, err = valsCursor.Seek(hi.from); err != nil { - return err - } - if key != nil && (hi.to == nil || bytes.Compare(key, hi.to) < 0) { - stepBytes := value[:8] - value = value[8:] - step := ^binary.BigEndian.Uint64(stepBytes) - endTxNum := step * dc.d.aggregationStep // DB can store not-finished step, it means - then set first txn in step - it anyway will be ahead of files - - heap.Push(hi.h, &CursorItem{t: DB_CURSOR, key: common.Copy(key), val: common.Copy(value), cDup: valsCursor, endTxNum: endTxNum, reverse: true}) - } - } - - for i, item := range dc.files { - // todo release btcursor when iter over/make it truly stateless - btCursor, err := dc.statelessBtree(i).Seek(dc.statelessGetter(i), hi.from) - if err != nil { - return err - } - if btCursor == nil { - continue - } - - key := btCursor.Key() - if key != nil && (hi.to == nil || bytes.Compare(key, hi.to) < 0) { - val := btCursor.Value() - txNum := item.endTxNum - 1 // !important: .kv files have semantic [from, t) - heap.Push(hi.h, &CursorItem{t: FILE_CURSOR, key: key, val: val, btCursor: btCursor, endTxNum: txNum, reverse: true}) - } - } - return hi.advanceInFiles() -} - -func (hi *DomainLatestIterFile) advanceInFiles() error { - for hi.h.Len() > 0 { - lastKey := (*hi.h)[0].key - lastVal := (*hi.h)[0].val - - // Advance all the items that have this key (including the top) - for hi.h.Len() > 0 && bytes.Equal((*hi.h)[0].key, lastKey) { - ci1 := heap.Pop(hi.h).(*CursorItem) - switch ci1.t { - case FILE_CURSOR: - if ci1.btCursor.Next() { - ci1.key = ci1.btCursor.Key() - ci1.val = ci1.btCursor.Value() - if ci1.key != nil && (hi.to == nil || bytes.Compare(ci1.key, hi.to) < 0) { - heap.Push(hi.h, ci1) - } - } - case DB_CURSOR: - if hi.largeVals { - // start from current go to next - initial, v, err := ci1.cNonDup.Current() - if err != nil { - return err - } - var k []byte - for initial != nil && (k == nil || bytes.Equal(initial[:len(initial)-8], k[:len(k)-8])) { - k, v, err = ci1.cNonDup.Next() - if err != nil { - return err - } - if k == nil { - break - } - } - - if len(k) > 0 && (hi.to == nil || bytes.Compare(k[:len(k)-8], hi.to) < 0) { - stepBytes := k[len(k)-8:] - k = k[:len(k)-8] - ci1.key = common.Copy(k) - step := ^binary.BigEndian.Uint64(stepBytes) - endTxNum := step * hi.aggStep // DB can store not-finished step, it means - then set first txn in step - it anyway will be ahead of files - ci1.endTxNum = endTxNum - - ci1.val = common.Copy(v) - heap.Push(hi.h, ci1) - } - } else { - // start from current go to next - k, stepBytesWithValue, err := ci1.cDup.NextNoDup() - if err != nil { - return err - } - - if len(k) > 0 && (hi.to == nil || bytes.Compare(k, hi.to) < 0) { - stepBytes := stepBytesWithValue[:8] - v := stepBytesWithValue[8:] - ci1.key = common.Copy(k) - step := ^binary.BigEndian.Uint64(stepBytes) - endTxNum := step * hi.aggStep // DB can store not-finished step, it means - then set first txn in step - it anyway will be ahead of files - ci1.endTxNum = endTxNum - - ci1.val = common.Copy(v) - heap.Push(hi.h, ci1) - } - } - - } - } - if len(lastVal) > 0 { - hi.nextKey, hi.nextVal = lastKey, lastVal - return nil // founc - } - } - hi.nextKey = nil - return nil -} - -func (hi *DomainLatestIterFile) HasNext() bool { - if hi.limit == 0 { // limit reached - return false - } - if hi.nextKey == nil { // EndOfTable - return false - } - if hi.to == nil { // s.nextK == nil check is above - return true - } - - //Asc: [from, to) AND from < to - //Desc: [from, to) AND from > to - cmp := bytes.Compare(hi.nextKey, hi.to) - return (bool(hi.orderAscend) && cmp < 0) || (!bool(hi.orderAscend) && cmp > 0) -} - -func (hi *DomainLatestIterFile) Next() ([]byte, []byte, error) { - hi.limit-- - hi.k, hi.v = append(hi.k[:0], hi.nextKey...), append(hi.v[:0], hi.nextVal...) - - // Satisfy iter.Dual Invariant 2 - hi.k, hi.kBackup, hi.v, hi.vBackup = hi.kBackup, hi.k, hi.vBackup, hi.v - if err := hi.advanceInFiles(); err != nil { - return nil, nil, err - } - order.Asc.Assert(hi.kBackup, hi.nextKey) - // TODO: remove `common.Copy`. it protecting from some existing bug. https://github.com/erigontech/erigon/issues/12672 - return common.Copy(hi.kBackup), common.Copy(hi.vBackup), nil -} - func (d *Domain) stepsRangeInDBAsStr(tx kv.Tx) string { a1, a2 := d.History.InvertedIndex.stepsRangeInDB(tx) //ad1, ad2 := d.stepsRangeInDB(tx) @@ -2434,99 +2085,3 @@ func (dt *DomainRoTx) Files() (res []string) { } return append(res, dt.ht.Files()...) } - -type SelectedStaticFiles struct { - accounts []*filesItem - accountsIdx []*filesItem - accountsHist []*filesItem - storage []*filesItem - storageIdx []*filesItem - storageHist []*filesItem - code []*filesItem - codeIdx []*filesItem - codeHist []*filesItem - commitment []*filesItem - commitmentIdx []*filesItem - commitmentHist []*filesItem - //codeI int - //storageI int - //accountsI int - //commitmentI int -} - -//func (sf SelectedStaticFiles) FillV3(s *SelectedStaticFilesV3) SelectedStaticFiles { -// sf.accounts, sf.accountsIdx, sf.accountsHist = s.accounts, s.accountsIdx, s.accountsHist -// sf.storage, sf.storageIdx, sf.storageHist = s.storage, s.storageIdx, s.storageHist -// sf.code, sf.codeIdx, sf.codeHist = s.code, s.codeIdx, s.codeHist -// sf.commitment, sf.commitmentIdx, sf.commitmentHist = s.commitment, s.commitmentIdx, s.commitmentHist -// sf.codeI, sf.accountsI, sf.storageI, sf.commitmentI = s.codeI, s.accountsI, s.storageI, s.commitmentI -// return sf -//} - -func (sf SelectedStaticFiles) Close() { - for _, group := range [][]*filesItem{ - sf.accounts, sf.accountsIdx, sf.accountsHist, - sf.storage, sf.storageIdx, sf.storageHist, - sf.code, sf.codeIdx, sf.codeHist, - sf.commitment, sf.commitmentIdx, sf.commitmentHist, - } { - for _, item := range group { - if item != nil { - if item.decompressor != nil { - item.decompressor.Close() - } - if item.index != nil { - item.index.Close() - } - if item.bindex != nil { - item.bindex.Close() - } - } - } - } -} - -type DomainStats struct { - MergesCount uint64 - LastCollationTook time.Duration - LastPruneTook time.Duration - LastPruneHistTook time.Duration - LastFileBuildingTook time.Duration - LastCollationSize uint64 - LastPruneSize uint64 - - FilesQueries *atomic.Uint64 - TotalQueries *atomic.Uint64 - EfSearchTime time.Duration - DataSize uint64 - IndexSize uint64 - FilesCount uint64 -} - -func (ds *DomainStats) Accumulate(other DomainStats) { - if other.FilesQueries != nil { - ds.FilesQueries.Add(other.FilesQueries.Load()) - } - if other.TotalQueries != nil { - ds.TotalQueries.Add(other.TotalQueries.Load()) - } - ds.EfSearchTime += other.EfSearchTime - ds.IndexSize += other.IndexSize - ds.DataSize += other.DataSize - ds.FilesCount += other.FilesCount -} - -func ParseStepsFromFileName(fileName string) (from, to uint64, err error) { - rangeString := strings.Split(fileName, ".")[1] - rangeNums := strings.Split(rangeString, "-") - // convert the range to uint64 - from, err = strconv.ParseUint(rangeNums[0], 10, 64) - if err != nil { - return 0, 0, fmt.Errorf("failed to parse to %s: %w", rangeNums[1], err) - } - to, err = strconv.ParseUint(rangeNums[1], 10, 64) - if err != nil { - return 0, 0, fmt.Errorf("failed to parse to %s: %w", rangeNums[1], err) - } - return from, to, nil -} diff --git a/erigon-lib/state/domain_shared.go b/erigon-lib/state/domain_shared.go index 077e2383818..aa7065f1f64 100644 --- a/erigon-lib/state/domain_shared.go +++ b/erigon-lib/state/domain_shared.go @@ -211,7 +211,7 @@ func (sd *SharedDomains) Unwind(ctx context.Context, rwTx kv.RwTx, blockUnwindTo } func (sd *SharedDomains) rebuildCommitment(ctx context.Context, roTx kv.Tx, blockNum uint64) ([]byte, error) { - it, err := sd.aggTx.HistoryRange(kv.AccountsHistory, int(sd.TxNum()), math.MaxInt64, order.Asc, -1, roTx) + it, err := sd.aggTx.HistoryRange(kv.StorageDomain, int(sd.TxNum()), math.MaxInt64, order.Asc, -1, roTx) if err != nil { return nil, err } @@ -224,7 +224,7 @@ func (sd *SharedDomains) rebuildCommitment(ctx context.Context, roTx kv.Tx, bloc sd.sdCtx.TouchKey(kv.AccountsDomain, string(k), nil) } - it, err = sd.aggTx.HistoryRange(kv.StorageHistory, int(sd.TxNum()), math.MaxInt64, order.Asc, -1, roTx) + it, err = sd.aggTx.HistoryRange(kv.StorageDomain, int(sd.TxNum()), math.MaxInt64, order.Asc, -1, roTx) if err != nil { return nil, err } @@ -972,8 +972,8 @@ func (sd *SharedDomains) DomainGet(domain kv.Domain, k, k2 []byte) (v []byte, st return v, step, nil } -// DomainGetAsOfFile returns value from domain with respect to limit ofMaxTxnum -func (sd *SharedDomains) domainGetAsOfFile(domain kv.Domain, k, k2 []byte, ofMaxTxnum uint64) (v []byte, step uint64, err error) { +// GetAsOfFile returns value from domain with respect to limit ofMaxTxnum +func (sd *SharedDomains) getAsOfFile(domain kv.Domain, k, k2 []byte, ofMaxTxnum uint64) (v []byte, step uint64, err error) { if domain == kv.CommitmentDomain { return sd.LatestCommitment(k) } @@ -981,7 +981,7 @@ func (sd *SharedDomains) domainGetAsOfFile(domain kv.Domain, k, k2 []byte, ofMax k = append(k, k2...) } - v, ok, err := sd.aggTx.DomainGetAsOfFile(domain, k, ofMaxTxnum) + v, ok, err := sd.aggTx.GetAsOfFile(domain, k, ofMaxTxnum) if err != nil { return nil, 0, fmt.Errorf("domain '%s' %x txn=%d read error: %w", domain, k, ofMaxTxnum, err) } @@ -1185,7 +1185,7 @@ func (sdc *SharedDomainsCommitmentContext) Account(plainKey []byte) (u *commitme return nil, fmt.Errorf("GetAccount failed: %w", err) } } else { - encAccount, _, err = sdc.sharedDomains.domainGetAsOfFile(kv.AccountsDomain, plainKey, nil, sdc.limitReadAsOfTxNum) + encAccount, _, err = sdc.sharedDomains.getAsOfFile(kv.AccountsDomain, plainKey, nil, sdc.limitReadAsOfTxNum) if err != nil { return nil, fmt.Errorf("GetAccount failed: %w", err) } @@ -1216,7 +1216,7 @@ func (sdc *SharedDomainsCommitmentContext) Account(plainKey []byte) (u *commitme if sdc.limitReadAsOfTxNum == 0 { code, _, err = sdc.sharedDomains.DomainGet(kv.CodeDomain, plainKey, nil) } else { - code, _, err = sdc.sharedDomains.domainGetAsOfFile(kv.CodeDomain, plainKey, nil, sdc.limitReadAsOfTxNum) + code, _, err = sdc.sharedDomains.getAsOfFile(kv.CodeDomain, plainKey, nil, sdc.limitReadAsOfTxNum) } if err != nil { return nil, fmt.Errorf("GetAccount/Code: failed to read latest code: %w", err) @@ -1244,7 +1244,7 @@ func (sdc *SharedDomainsCommitmentContext) Storage(plainKey []byte) (u *commitme if sdc.limitReadAsOfTxNum == 0 { enc, _, err = sdc.sharedDomains.DomainGet(kv.StorageDomain, plainKey, nil) } else { - enc, _, err = sdc.sharedDomains.domainGetAsOfFile(kv.StorageDomain, plainKey, nil, sdc.limitReadAsOfTxNum) + enc, _, err = sdc.sharedDomains.getAsOfFile(kv.StorageDomain, plainKey, nil, sdc.limitReadAsOfTxNum) } if err != nil { return nil, err diff --git a/erigon-lib/state/domain_shared_bench_test.go b/erigon-lib/state/domain_shared_bench_test.go index 2c8700e1734..220dc90823d 100644 --- a/erigon-lib/state/domain_shared_bench_test.go +++ b/erigon-lib/state/domain_shared_bench_test.go @@ -103,7 +103,7 @@ func Benchmark_SharedDomains_GetLatest(t *testing.B) { for ik := 0; ik < t.N; ik++ { for i := 0; i < len(keys); i++ { ts := uint64(rnd.IntN(int(maxTx))) - v, ok, err := ac2.HistorySeek(kv.AccountsHistory, keys[i], ts, rwTx) + v, ok, err := ac2.HistorySeek(kv.AccountsDomain, keys[i], ts, rwTx) require.True(t, ok) require.NotNil(t, v) diff --git a/erigon-lib/state/domain_stream.go b/erigon-lib/state/domain_stream.go new file mode 100644 index 00000000000..c28d46642d7 --- /dev/null +++ b/erigon-lib/state/domain_stream.go @@ -0,0 +1,293 @@ +// Copyright 2022 The Erigon Authors +// This file is part of Erigon. +// +// Erigon is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Erigon is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with Erigon. If not, see . + +package state + +import ( + "bytes" + "container/heap" + "encoding/binary" + + "github.com/erigontech/erigon-lib/common" + "github.com/erigontech/erigon-lib/kv" + "github.com/erigontech/erigon-lib/kv/order" + "github.com/erigontech/erigon-lib/kv/stream" + "github.com/erigontech/erigon-lib/log/v3" + "github.com/erigontech/erigon-lib/seg" + btree2 "github.com/tidwall/btree" +) + +type CursorType uint8 + +const ( + FILE_CURSOR CursorType = iota + DB_CURSOR + RAM_CURSOR +) + +// CursorItem is the item in the priority queue used to do merge interation +// over storage of a given account +type CursorItem struct { + cDup kv.CursorDupSort + cNonDup kv.Cursor + + iter btree2.MapIter[string, dataWithPrevStep] + dg *seg.Reader + dg2 *seg.Reader + btCursor *Cursor + key []byte + val []byte + step uint64 + startTxNum uint64 + endTxNum uint64 + latestOffset uint64 // offset of the latest value in the file + t CursorType // Whether this item represents state file or DB record, or tree + reverse bool +} + +type CursorHeap []*CursorItem + +func (ch CursorHeap) Len() int { + return len(ch) +} + +func (ch CursorHeap) Less(i, j int) bool { + cmp := bytes.Compare(ch[i].key, ch[j].key) + if cmp == 0 { + // when keys match, the items with later blocks are preferred + if ch[i].reverse { + return ch[i].endTxNum > ch[j].endTxNum + } + return ch[i].endTxNum < ch[j].endTxNum + } + return cmp < 0 +} + +func (ch *CursorHeap) Swap(i, j int) { + (*ch)[i], (*ch)[j] = (*ch)[j], (*ch)[i] +} + +func (ch *CursorHeap) Push(x interface{}) { + *ch = append(*ch, x.(*CursorItem)) +} + +func (ch *CursorHeap) Pop() interface{} { + old := *ch + n := len(old) + x := old[n-1] + old[n-1] = nil + *ch = old[0 : n-1] + return x +} + +type DomainLatestIterFile struct { + aggStep uint64 + roTx kv.Tx + valsTable string + + limit int + largeVals bool + from, to []byte + orderAscend order.By + + h *CursorHeap + + nextKey, nextVal []byte + k, v, kBackup, vBackup []byte + + logger log.Logger +} + +func (hi *DomainLatestIterFile) Close() { +} +func (hi *DomainLatestIterFile) Trace(prefix string) *stream.TracedDuo[[]byte, []byte] { + return stream.TraceDuo(hi, hi.logger, "[dbg] DomainLatestIterFile.Next "+prefix) +} +func (hi *DomainLatestIterFile) init(dc *DomainRoTx) error { + // Implementation: + // File endTxNum = last txNum of file step + // DB endTxNum = first txNum of step in db + // RAM endTxNum = current txnum + // Example: stepSize=8, file=0-2.kv, db has key of step 2, current txn num is 17 + // File endTxNum = 15, because `0-2.kv` has steps 0 and 1, last txNum of step 1 is 15 + // DB endTxNum = 16, because db has step 2, and first txNum of step 2 is 16. + // RAM endTxNum = 17, because current tcurrent txNum is 17 + hi.largeVals = dc.d.largeVals + heap.Init(hi.h) + var key, value []byte + + if dc.d.largeVals { + valsCursor, err := hi.roTx.Cursor(dc.d.valsTable) + if err != nil { + return err + } + if key, value, err = valsCursor.Seek(hi.from); err != nil { + return err + } + if key != nil && (hi.to == nil || bytes.Compare(key[:len(key)-8], hi.to) < 0) { + k := key[:len(key)-8] + stepBytes := key[len(key)-8:] + step := ^binary.BigEndian.Uint64(stepBytes) + endTxNum := step * dc.d.aggregationStep // DB can store not-finished step, it means - then set first txn in step - it anyway will be ahead of files + + heap.Push(hi.h, &CursorItem{t: DB_CURSOR, key: common.Copy(k), val: common.Copy(value), cNonDup: valsCursor, endTxNum: endTxNum, reverse: true}) + } + } else { + valsCursor, err := hi.roTx.CursorDupSort(dc.d.valsTable) + if err != nil { + return err + } + + if key, value, err = valsCursor.Seek(hi.from); err != nil { + return err + } + if key != nil && (hi.to == nil || bytes.Compare(key, hi.to) < 0) { + stepBytes := value[:8] + value = value[8:] + step := ^binary.BigEndian.Uint64(stepBytes) + endTxNum := step * dc.d.aggregationStep // DB can store not-finished step, it means - then set first txn in step - it anyway will be ahead of files + + heap.Push(hi.h, &CursorItem{t: DB_CURSOR, key: common.Copy(key), val: common.Copy(value), cDup: valsCursor, endTxNum: endTxNum, reverse: true}) + } + } + + for i, item := range dc.files { + // todo release btcursor when iter over/make it truly stateless + btCursor, err := dc.statelessBtree(i).Seek(dc.statelessGetter(i), hi.from) + if err != nil { + return err + } + if btCursor == nil { + continue + } + + key := btCursor.Key() + if key != nil && (hi.to == nil || bytes.Compare(key, hi.to) < 0) { + val := btCursor.Value() + txNum := item.endTxNum - 1 // !important: .kv files have semantic [from, t) + heap.Push(hi.h, &CursorItem{t: FILE_CURSOR, key: key, val: val, btCursor: btCursor, endTxNum: txNum, reverse: true}) + } + } + return hi.advanceInFiles() +} + +func (hi *DomainLatestIterFile) advanceInFiles() error { + for hi.h.Len() > 0 { + lastKey := (*hi.h)[0].key + lastVal := (*hi.h)[0].val + + // Advance all the items that have this key (including the top) + for hi.h.Len() > 0 && bytes.Equal((*hi.h)[0].key, lastKey) { + ci1 := heap.Pop(hi.h).(*CursorItem) + switch ci1.t { + case FILE_CURSOR: + if ci1.btCursor.Next() { + ci1.key = ci1.btCursor.Key() + ci1.val = ci1.btCursor.Value() + if ci1.key != nil && (hi.to == nil || bytes.Compare(ci1.key, hi.to) < 0) { + heap.Push(hi.h, ci1) + } + } + case DB_CURSOR: + if hi.largeVals { + // start from current go to next + initial, v, err := ci1.cNonDup.Current() + if err != nil { + return err + } + var k []byte + for initial != nil && (k == nil || bytes.Equal(initial[:len(initial)-8], k[:len(k)-8])) { + k, v, err = ci1.cNonDup.Next() + if err != nil { + return err + } + if k == nil { + break + } + } + + if len(k) > 0 && (hi.to == nil || bytes.Compare(k[:len(k)-8], hi.to) < 0) { + stepBytes := k[len(k)-8:] + k = k[:len(k)-8] + ci1.key = common.Copy(k) + step := ^binary.BigEndian.Uint64(stepBytes) + endTxNum := step * hi.aggStep // DB can store not-finished step, it means - then set first txn in step - it anyway will be ahead of files + ci1.endTxNum = endTxNum + + ci1.val = common.Copy(v) + heap.Push(hi.h, ci1) + } + } else { + // start from current go to next + k, stepBytesWithValue, err := ci1.cDup.NextNoDup() + if err != nil { + return err + } + + if len(k) > 0 && (hi.to == nil || bytes.Compare(k, hi.to) < 0) { + stepBytes := stepBytesWithValue[:8] + v := stepBytesWithValue[8:] + ci1.key = common.Copy(k) + step := ^binary.BigEndian.Uint64(stepBytes) + endTxNum := step * hi.aggStep // DB can store not-finished step, it means - then set first txn in step - it anyway will be ahead of files + ci1.endTxNum = endTxNum + + ci1.val = common.Copy(v) + heap.Push(hi.h, ci1) + } + } + + } + } + if len(lastVal) > 0 { + hi.nextKey, hi.nextVal = lastKey, lastVal + return nil // founc + } + } + hi.nextKey = nil + return nil +} + +func (hi *DomainLatestIterFile) HasNext() bool { + if hi.limit == 0 { // limit reached + return false + } + if hi.nextKey == nil { // EndOfTable + return false + } + if hi.to == nil { // s.nextK == nil check is above + return true + } + + //Asc: [from, to) AND from < to + //Desc: [from, to) AND from > to + cmp := bytes.Compare(hi.nextKey, hi.to) + return (bool(hi.orderAscend) && cmp < 0) || (!bool(hi.orderAscend) && cmp > 0) +} + +func (hi *DomainLatestIterFile) Next() ([]byte, []byte, error) { + hi.limit-- + hi.k, hi.v = append(hi.k[:0], hi.nextKey...), append(hi.v[:0], hi.nextVal...) + + // Satisfy iter.Dual Invariant 2 + hi.k, hi.kBackup, hi.v, hi.vBackup = hi.kBackup, hi.k, hi.vBackup, hi.v + if err := hi.advanceInFiles(); err != nil { + return nil, nil, err + } + order.Asc.Assert(hi.kBackup, hi.nextKey) + // TODO: remove `common.Copy`. it protecting from some existing bug. https://github.com/erigontech/erigon/issues/12672 + return common.Copy(hi.kBackup), common.Copy(hi.vBackup), nil +} diff --git a/erigon-lib/state/files_item.go b/erigon-lib/state/files_item.go index fd37ee035a4..237f2f0a775 100644 --- a/erigon-lib/state/files_item.go +++ b/erigon-lib/state/files_item.go @@ -17,13 +17,16 @@ package state import ( + "fmt" "os" + "regexp" + "strconv" + "strings" "sync/atomic" btree2 "github.com/tidwall/btree" "github.com/erigontech/erigon-lib/config3" - "github.com/erigontech/erigon-lib/kv/bitmapdb" "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon-lib/recsplit" "github.com/erigontech/erigon-lib/seg" @@ -44,7 +47,6 @@ type filesItem struct { decompressor *seg.Decompressor index *recsplit.Index bindex *BtIndex - bm *bitmapdb.FixedSizeBitmaps existence *ExistenceFilter startTxNum, endTxNum uint64 //[startTxNum, endTxNum) @@ -92,10 +94,6 @@ func (i *filesItem) closeFiles() { i.bindex.Close() i.bindex = nil } - if i.bm != nil { - i.bm.Close() - i.bm = nil - } if i.existence != nil { i.existence.Close() i.existence = nil @@ -139,16 +137,6 @@ func (i *filesItem) closeFilesAndRemove() { } i.bindex = nil } - if i.bm != nil { - i.bm.Close() - if err := os.Remove(i.bm.FilePath()); err != nil { - log.Trace("remove after close", "err", err, "file", i.bm.FileName()) - } - if err := os.Remove(i.bm.FilePath() + ".torrent"); err != nil { - log.Trace("remove after close", "err", err, "file", i.bm.FileName()) - } - i.bm = nil - } if i.existence != nil { i.existence.Close() if err := os.Remove(i.existence.FilePath); err != nil { @@ -161,6 +149,61 @@ func (i *filesItem) closeFilesAndRemove() { } } +func scanDirtyFiles(fileNames []string, stepSize uint64, filenameBase, ext string, logger log.Logger) (res []*filesItem) { + re := regexp.MustCompile("^v([0-9]+)-" + filenameBase + ".([0-9]+)-([0-9]+)." + ext + "$") + var err error + + for _, name := range fileNames { + subs := re.FindStringSubmatch(name) + if len(subs) != 4 { + if len(subs) != 0 { + logger.Warn("File ignored by domain scan, more than 4 submatches", "name", name, "submatches", len(subs)) + } + continue + } + var startStep, endStep uint64 + if startStep, err = strconv.ParseUint(subs[2], 10, 64); err != nil { + logger.Warn("File ignored by domain scan, parsing startTxNum", "error", err, "name", name) + continue + } + if endStep, err = strconv.ParseUint(subs[3], 10, 64); err != nil { + logger.Warn("File ignored by domain scan, parsing endTxNum", "error", err, "name", name) + continue + } + if startStep > endStep { + logger.Warn("File ignored by domain scan, startTxNum > endTxNum", "name", name) + continue + } + + // Semantic: [startTxNum, endTxNum) + // Example: + // stepSize = 4 + // 0-1.kv: [0, 8) + // 0-2.kv: [0, 16) + // 1-2.kv: [8, 16) + startTxNum, endTxNum := startStep*stepSize, endStep*stepSize + + var newFile = newFilesItem(startTxNum, endTxNum, stepSize) + res = append(res, newFile) + } + return res +} + +func ParseStepsFromFileName(fileName string) (from, to uint64, err error) { + rangeString := strings.Split(fileName, ".")[1] + rangeNums := strings.Split(rangeString, "-") + // convert the range to uint64 + from, err = strconv.ParseUint(rangeNums[0], 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("failed to parse to %s: %w", rangeNums[1], err) + } + to, err = strconv.ParseUint(rangeNums[1], 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("failed to parse to %s: %w", rangeNums[1], err) + } + return from, to, nil +} + func deleteMergeFile(dirtyFiles *btree2.BTreeG[*filesItem], outs []*filesItem, filenameBase string, logger log.Logger) { for _, out := range outs { if out == nil { diff --git a/erigon-lib/state/history.go b/erigon-lib/state/history.go index 800768b61c7..ed9ff4b08e5 100644 --- a/erigon-lib/state/history.go +++ b/erigon-lib/state/history.go @@ -25,8 +25,6 @@ import ( "fmt" "math" "path/filepath" - "regexp" - "strconv" "sync" "time" @@ -170,46 +168,17 @@ func (h *History) openFolder() error { return h.openList(idxFiles, histFiles) } -// scanDirtyFiles -// returns `uselessFiles` where file "is useless" means: it's subset of frozen file. such files can be safely deleted. subset of non-frozen file may be useful -func (h *History) scanDirtyFiles(fNames []string) (garbageFiles []*filesItem) { - re := regexp.MustCompile("^v([0-9]+)-" + h.filenameBase + ".([0-9]+)-([0-9]+).v$") - var err error - for _, name := range fNames { - subs := re.FindStringSubmatch(name) - if len(subs) != 4 { - if len(subs) != 0 { - h.logger.Warn("[snapshots] file ignored by inverted index scan, more than 3 submatches", "name", name, "submatches", len(subs)) - } - continue - } - var startStep, endStep uint64 - if startStep, err = strconv.ParseUint(subs[2], 10, 64); err != nil { - h.logger.Warn("[snapshots] file ignored by inverted index scan, parsing startTxNum", "error", err, "name", name) - continue - } - if endStep, err = strconv.ParseUint(subs[3], 10, 64); err != nil { - h.logger.Warn("[snapshots] file ignored by inverted index scan, parsing endTxNum", "error", err, "name", name) - continue - } - if startStep > endStep { - h.logger.Warn("[snapshots] file ignored by inverted index scan, startTxNum > endTxNum", "name", name) - continue - } - - startTxNum, endTxNum := startStep*h.aggregationStep, endStep*h.aggregationStep - var newFile = newFilesItem(startTxNum, endTxNum, h.aggregationStep) - +func (h *History) scanDirtyFiles(fileNames []string) { + for _, dirtyFile := range scanDirtyFiles(fileNames, h.aggregationStep, h.filenameBase, "v", h.logger) { + startStep, endStep := dirtyFile.startTxNum/h.aggregationStep, dirtyFile.endTxNum/h.aggregationStep if h.integrityCheck != nil && !h.integrityCheck(startStep, endStep) { + h.logger.Debug("[agg] skip garbage file", "name", h.filenameBase, "startStep", startStep, "endStep", endStep) continue } - - if _, has := h.dirtyFiles.Get(newFile); has { - continue + if _, has := h.dirtyFiles.Get(dirtyFile); !has { + h.dirtyFiles.Set(dirtyFile) } - h.dirtyFiles.Set(newFile) } - return garbageFiles } func (h *History) openDirtyFiles() error { @@ -1203,69 +1172,11 @@ func (ht *HistoryRoTx) historySeekInFiles(key []byte, txNum uint64) ([]byte, boo v, _ := g.Next(nil) if traceGetAsOf == ht.h.filenameBase { - fmt.Printf("GetAsOf(%s, %x, %d) -> %s, histTxNum=%d, isNil(v)=%t\n", ht.h.filenameBase, key, txNum, g.FileName(), histTxNum, v == nil) + fmt.Printf("DomainGetAsOf(%s, %x, %d) -> %s, histTxNum=%d, isNil(v)=%t\n", ht.h.filenameBase, key, txNum, g.FileName(), histTxNum, v == nil) } return v, true, nil } -func (hs *HistoryStep) GetNoState(key []byte, txNum uint64) ([]byte, bool, uint64) { - //fmt.Printf("historySeekInFiles [%x] %d\n", key, txNum) - if hs.indexFile.reader.Empty() { - return nil, false, txNum - } - offset, ok := hs.indexFile.reader.TwoLayerLookup(key) - if !ok { - return nil, false, txNum - } - g := hs.indexFile.getter - g.Reset(offset) - k, _ := g.NextUncompressed() - if !bytes.Equal(k, key) { - return nil, false, txNum - } - //fmt.Printf("Found key=%x\n", k) - eliasVal, _ := g.NextUncompressed() - ef, _ := eliasfano32.ReadEliasFano(eliasVal) - n, ok := ef.Search(txNum) - if !ok { - return nil, false, ef.Max() - } - var txKey [8]byte - binary.BigEndian.PutUint64(txKey[:], n) - offset, ok = hs.historyFile.reader.Lookup2(txKey[:], key) - if !ok { - return nil, false, txNum - } - //fmt.Printf("offset = %d, txKey=[%x], key=[%x]\n", offset, txKey[:], key) - g = hs.historyFile.getter - g.Reset(offset) - if hs.compressVals { - v, _ := g.Next(nil) - return v, true, txNum - } - v, _ := g.NextUncompressed() - return v, true, txNum -} - -func (hs *HistoryStep) MaxTxNum(key []byte) (bool, uint64) { - if hs.indexFile.reader.Empty() { - return false, 0 - } - offset, ok := hs.indexFile.reader.TwoLayerLookup(key) - if !ok { - return false, 0 - } - g := hs.indexFile.getter - g.Reset(offset) - k, _ := g.NextUncompressed() - if !bytes.Equal(k, key) { - return false, 0 - } - //fmt.Printf("Found key=%x\n", k) - eliasVal, _ := g.NextUncompressed() - return true, eliasfano32.Max(eliasVal) -} - func (ht *HistoryRoTx) encodeTs(txNum uint64, key []byte) []byte { if ht._bufTs == nil { ht._bufTs = make([]byte, 8+len(key)) @@ -1380,322 +1291,6 @@ func (ht *HistoryRoTx) WalkAsOf(ctx context.Context, startTxNum uint64, from, to return stream.UnionKV(hi, dbit, limit), nil } -// StateAsOfIter - returns state range at given time in history -type StateAsOfIterF struct { - hc *HistoryRoTx - limit int - - from, toPrefix []byte - nextVal []byte - nextKey []byte - - h ReconHeap - startTxNum uint64 - startTxKey [8]byte - txnKey [8]byte - - k, v, kBackup, vBackup []byte - orderAscend order.By - - logger log.Logger - ctx context.Context -} - -func (hi *StateAsOfIterF) Close() { -} - -func (hi *StateAsOfIterF) init(files visibleFiles) error { - for i, item := range files { - if item.endTxNum <= hi.startTxNum { - continue - } - // TODO: seek(from) - g := seg.NewReader(item.src.decompressor.MakeGetter(), hi.hc.h.compression) - - idx := hi.hc.iit.statelessIdxReader(i) - var offset uint64 - if len(hi.from) > 0 { - n := item.src.decompressor.Count() / 2 - var ok bool - offset, ok = g.BinarySearch(hi.from, n, idx.OrdinalLookup) - if !ok { - offset = 0 - } - } - g.Reset(offset) - if g.HasNext() { - key, offset := g.Next(nil) - heap.Push(&hi.h, &ReconItem{g: g, key: key, startTxNum: item.startTxNum, endTxNum: item.endTxNum, txNum: item.endTxNum, startOffset: offset, lastOffset: offset}) - } - } - binary.BigEndian.PutUint64(hi.startTxKey[:], hi.startTxNum) - return hi.advanceInFiles() -} - -func (hi *StateAsOfIterF) Trace(prefix string) *stream.TracedDuo[[]byte, []byte] { - return stream.TraceDuo(hi, hi.logger, "[dbg] StateAsOfIterF.Next "+prefix) -} - -func (hi *StateAsOfIterF) advanceInFiles() error { - for hi.h.Len() > 0 { - top := heap.Pop(&hi.h).(*ReconItem) - key := top.key - var idxVal []byte - //if hi.compressVals { - idxVal, _ = top.g.Next(nil) - //} else { - // idxVal, _ = top.g.NextUncompressed() - //} - if top.g.HasNext() { - //if hi.compressVals { - top.key, _ = top.g.Next(nil) - //} else { - // top.key, _ = top.g.NextUncompressed() - //} - if hi.toPrefix == nil || bytes.Compare(top.key, hi.toPrefix) < 0 { - heap.Push(&hi.h, top) - } - } - - if hi.from != nil && bytes.Compare(key, hi.from) < 0 { //TODO: replace by seekInFiles() - continue - } - - if bytes.Equal(key, hi.nextKey) { - continue - } - n, ok := eliasfano32.Seek(idxVal, hi.startTxNum) - if !ok { - continue - } - - hi.nextKey = key - binary.BigEndian.PutUint64(hi.txnKey[:], n) - historyItem, ok := hi.hc.getFileDeprecated(top.startTxNum, top.endTxNum) - if !ok { - return fmt.Errorf("no %s file found for [%x]", hi.hc.h.filenameBase, hi.nextKey) - } - reader := hi.hc.statelessIdxReader(historyItem.i) - offset, ok := reader.Lookup2(hi.txnKey[:], hi.nextKey) - if !ok { - continue - } - g := hi.hc.statelessGetter(historyItem.i) - g.Reset(offset) - hi.nextVal, _ = g.Next(nil) - return nil - } - hi.nextKey = nil - return nil -} - -func (hi *StateAsOfIterF) HasNext() bool { - if hi.limit == 0 { // limit reached - return false - } - if hi.nextKey == nil { // EndOfTable - return false - } - if hi.toPrefix == nil { // s.nextK == nil check is above - return true - } - - //Asc: [from, to) AND from < to - //Desc: [from, to) AND from > to - cmp := bytes.Compare(hi.nextKey, hi.toPrefix) - return (bool(hi.orderAscend) && cmp < 0) || (!bool(hi.orderAscend) && cmp > 0) -} - -func (hi *StateAsOfIterF) Next() ([]byte, []byte, error) { - select { - case <-hi.ctx.Done(): - return nil, nil, hi.ctx.Err() - default: - } - - hi.limit-- - hi.k, hi.v = append(hi.k[:0], hi.nextKey...), append(hi.v[:0], hi.nextVal...) - - // Satisfy stream.Duo Invariant 2 - hi.k, hi.kBackup, hi.v, hi.vBackup = hi.kBackup, hi.k, hi.vBackup, hi.v - if err := hi.advanceInFiles(); err != nil { - return nil, nil, err - } - hi.orderAscend.Assert(hi.kBackup, hi.nextKey) - // TODO: remove `common.Copy`. it protecting from some existing bug. https://github.com/erigontech/erigon/issues/12672 - return common.Copy(hi.kBackup), common.Copy(hi.vBackup), nil -} - -// StateAsOfIterDB - returns state range at given time in history -type StateAsOfIterDB struct { - largeValues bool - roTx kv.Tx - valsC kv.Cursor - valsCDup kv.CursorDupSort - valsTable string - - from, toPrefix []byte - orderAscend order.By - limit int - - nextKey, nextVal []byte - - startTxNum uint64 - startTxKey [8]byte - - k, v, kBackup, vBackup []byte - err error - - logger log.Logger - ctx context.Context -} - -func (hi *StateAsOfIterDB) Close() { - if hi.valsC != nil { - hi.valsC.Close() - } -} - -func (hi *StateAsOfIterDB) Trace(prefix string) *stream.TracedDuo[[]byte, []byte] { - return stream.TraceDuo(hi, hi.logger, "[dbg] StateAsOfIterDB.Next "+prefix) -} - -func (hi *StateAsOfIterDB) advance() (err error) { - // not large: - // keys: txNum -> key1+key2 - // vals: key1+key2 -> txNum + value (DupSort) - // large: - // keys: txNum -> key1+key2 - // vals: key1+key2+txNum -> value (not DupSort) - if hi.largeValues { - return hi.advanceLargeVals() - } - return hi.advanceSmallVals() -} -func (hi *StateAsOfIterDB) advanceLargeVals() error { - var seek []byte - var err error - if hi.valsC == nil { - if hi.valsC, err = hi.roTx.Cursor(hi.valsTable); err != nil { - return err - } - firstKey, _, err := hi.valsC.Seek(hi.from) - if err != nil { - return err - } - if firstKey == nil { - hi.nextKey = nil - return nil - } - seek = append(common.Copy(firstKey[:len(firstKey)-8]), hi.startTxKey[:]...) - } else { - next, ok := kv.NextSubtree(hi.nextKey) - if !ok { - hi.nextKey = nil - return nil - } - - seek = append(next, hi.startTxKey[:]...) - } - for k, v, err := hi.valsC.Seek(seek); k != nil; k, v, err = hi.valsC.Seek(seek) { - if err != nil { - return err - } - if hi.toPrefix != nil && bytes.Compare(k[:len(k)-8], hi.toPrefix) >= 0 { - break - } - if !bytes.Equal(seek[:len(k)-8], k[:len(k)-8]) { - copy(seek[:len(k)-8], k[:len(k)-8]) - continue - } - hi.nextKey = k[:len(k)-8] - hi.nextVal = v - return nil - } - hi.nextKey = nil - return nil -} -func (hi *StateAsOfIterDB) advanceSmallVals() error { - var seek []byte - var err error - if hi.valsCDup == nil { - if hi.valsCDup, err = hi.roTx.CursorDupSort(hi.valsTable); err != nil { - return err - } - seek = hi.from - } else { - next, ok := kv.NextSubtree(hi.nextKey) - if !ok { - hi.nextKey = nil - return nil - } - seek = next - } - for k, _, err := hi.valsCDup.Seek(seek); k != nil; k, _, err = hi.valsCDup.NextNoDup() { - if err != nil { - return err - } - if hi.toPrefix != nil && bytes.Compare(k, hi.toPrefix) >= 0 { - break - } - v, err := hi.valsCDup.SeekBothRange(k, hi.startTxKey[:]) - if err != nil { - return err - } - if v == nil { - continue - } - hi.nextKey = k - hi.nextVal = v[8:] - return nil - } - hi.nextKey = nil - return nil -} - -func (hi *StateAsOfIterDB) HasNext() bool { - if hi.err != nil { - return true - } - if hi.limit == 0 { // limit reached - return false - } - if hi.nextKey == nil { // EndOfTable - return false - } - if hi.toPrefix == nil { // s.nextK == nil check is above - return true - } - - //Asc: [from, to) AND from < to - //Desc: [from, to) AND from > to - cmp := bytes.Compare(hi.nextKey, hi.toPrefix) - return (bool(hi.orderAscend) && cmp < 0) || (!bool(hi.orderAscend) && cmp > 0) -} - -func (hi *StateAsOfIterDB) Next() ([]byte, []byte, error) { - select { - case <-hi.ctx.Done(): - return nil, nil, hi.ctx.Err() - default: - } - - if hi.err != nil { - return nil, nil, hi.err - } - hi.limit-- - hi.k, hi.v = hi.nextKey, hi.nextVal - - // Satisfy stream.Duo Invariant 2 - hi.k, hi.kBackup, hi.v, hi.vBackup = hi.kBackup, hi.k, hi.vBackup, hi.v - if err := hi.advance(); err != nil { - return nil, nil, err - } - hi.orderAscend.Assert(hi.kBackup, hi.nextKey) - // TODO: remove `common.Copy`. it protecting from some existing bug. https://github.com/erigontech/erigon/issues/12672 - return common.Copy(hi.kBackup), common.Copy(hi.vBackup), nil -} - func (ht *HistoryRoTx) iterateChangedFrozen(fromTxNum, toTxNum int, asc order.By, limit int) (stream.KV, error) { if asc == false { panic("not supported yet") @@ -1778,8 +1373,7 @@ func (ht *HistoryRoTx) HistoryRange(fromTxNum, toTxNum int, asc order.By, limit return stream.MergeKVS(itOnDB, itOnFiles, limit), nil } -func (ht *HistoryRoTx) idxRangeRecent(key []byte, startTxNum, endTxNum int, asc order.By, limit int, roTx kv.Tx) (stream.U64, error) { - var dbIt stream.U64 +func (ht *HistoryRoTx) idxRangeOnDB(key []byte, startTxNum, endTxNum int, asc order.By, limit int, roTx kv.Tx) (stream.U64, error) { if ht.h.historyLargeValues { from := make([]byte, len(key)+8) copy(from, key) @@ -1798,384 +1392,43 @@ func (ht *HistoryRoTx) idxRangeRecent(key []byte, startTxNum, endTxNum int, asc if err != nil { return nil, err } - dbIt = stream.TransformKV2U64(it, func(k, v []byte) (uint64, error) { + return stream.TransformKV2U64(it, func(k, v []byte) (uint64, error) { if len(k) < 8 { return 0, fmt.Errorf("unexpected large key length %d", len(k)) } return binary.BigEndian.Uint64(k[len(k)-8:]), nil - }) - } else { - var from, to []byte - if startTxNum >= 0 { - from = make([]byte, 8) - binary.BigEndian.PutUint64(from, uint64(startTxNum)) - } - if endTxNum >= 0 { - to = make([]byte, 8) - binary.BigEndian.PutUint64(to, uint64(endTxNum)) - } - it, err := roTx.RangeDupSort(ht.h.historyValsTable, key, from, to, asc, limit) - if err != nil { - return nil, err - } - dbIt = stream.TransformKV2U64(it, func(k, v []byte) (uint64, error) { - if len(v) < 8 { - return 0, fmt.Errorf("unexpected small value length %d", len(v)) - } - return binary.BigEndian.Uint64(v), nil - }) + }), nil } - return dbIt, nil -} -func (ht *HistoryRoTx) IdxRange(key []byte, startTxNum, endTxNum int, asc order.By, limit int, roTx kv.Tx) (stream.U64, error) { - frozenIt, err := ht.iit.iterateRangeFrozen(key, startTxNum, endTxNum, asc, limit) - if err != nil { - return nil, err + var from, to []byte + if startTxNum >= 0 { + from = make([]byte, 8) + binary.BigEndian.PutUint64(from, uint64(startTxNum)) + } + if endTxNum >= 0 { + to = make([]byte, 8) + binary.BigEndian.PutUint64(to, uint64(endTxNum)) } - recentIt, err := ht.idxRangeRecent(key, startTxNum, endTxNum, asc, limit, roTx) + it, err := roTx.RangeDupSort(ht.h.historyValsTable, key, from, to, asc, limit) if err != nil { return nil, err } - return stream.Union[uint64](frozenIt, recentIt, asc, limit), nil -} - -type HistoryChangesIterFiles struct { - hc *HistoryRoTx - nextVal []byte - nextKey []byte - h ReconHeap - startTxNum uint64 - endTxNum int - startTxKey [8]byte - txnKey [8]byte - - k, v, kBackup, vBackup []byte - err error - limit int -} - -func (hi *HistoryChangesIterFiles) Close() { -} - -func (hi *HistoryChangesIterFiles) advance() error { - for hi.h.Len() > 0 { - top := heap.Pop(&hi.h).(*ReconItem) - key := top.key - var idxVal []byte - //if hi.compressVals { - idxVal, _ = top.g.Next(nil) - //} else { - // idxVal, _ = top.g.NextUncompressed() - //} - if top.g.HasNext() { - //if hi.compressVals { - top.key, _ = top.g.Next(nil) - //} else { - // top.key, _ = top.g.NextUncompressed() - //} - heap.Push(&hi.h, top) - } - - if bytes.Equal(key, hi.nextKey) { - continue - } - n, ok := eliasfano32.Seek(idxVal, hi.startTxNum) - if !ok { - continue - } - if int(n) >= hi.endTxNum { - continue - } - - hi.nextKey = key - binary.BigEndian.PutUint64(hi.txnKey[:], n) - historyItem, ok := hi.hc.getFileDeprecated(top.startTxNum, top.endTxNum) - if !ok { - return fmt.Errorf("HistoryChangesIterFiles: no %s file found for [%x]", hi.hc.h.filenameBase, hi.nextKey) - } - reader := hi.hc.statelessIdxReader(historyItem.i) - offset, ok := reader.Lookup2(hi.txnKey[:], hi.nextKey) - if !ok { - continue - } - g := hi.hc.statelessGetter(historyItem.i) - g.Reset(offset) - hi.nextVal, _ = g.Next(nil) - return nil - } - hi.nextKey = nil - return nil -} - -func (hi *HistoryChangesIterFiles) HasNext() bool { - if hi.err != nil { // always true, then .Next() call will return this error - return true - } - if hi.limit == 0 { // limit reached - return false - } - if hi.nextKey == nil { // EndOfTable - return false - } - return true -} - -func (hi *HistoryChangesIterFiles) Next() ([]byte, []byte, error) { - if hi.err != nil { - return nil, nil, hi.err - } - hi.limit-- - hi.k, hi.v = append(hi.k[:0], hi.nextKey...), append(hi.v[:0], hi.nextVal...) - - // Satisfy iter.Duo Invariant 2 - hi.k, hi.kBackup, hi.v, hi.vBackup = hi.kBackup, hi.k, hi.vBackup, hi.v - if err := hi.advance(); err != nil { - return nil, nil, err - } - fmt.Printf("[dbg] hist.Next: %x\n", hi.kBackup) - return hi.kBackup, hi.vBackup, nil -} - -type HistoryChangesIterDB struct { - largeValues bool - roTx kv.Tx - valsC kv.Cursor - valsCDup kv.CursorDupSort - valsTable string - limit, endTxNum int - startTxKey [8]byte - - nextKey, nextVal []byte - nextStep uint64 - k, v []byte - step uint64 - err error -} - -func (hi *HistoryChangesIterDB) Close() { - if hi.valsC != nil { - hi.valsC.Close() - } - if hi.valsCDup != nil { - hi.valsCDup.Close() - } -} -func (hi *HistoryChangesIterDB) advance() (err error) { - // not large: - // keys: txNum -> key1+key2 - // vals: key1+key2 -> txNum + value (DupSort) - // large: - // keys: txNum -> key1+key2 - // vals: key1+key2+txNum -> value (not DupSort) - if hi.largeValues { - return hi.advanceLargeVals() - } - return hi.advanceSmallVals() -} - -func (hi *HistoryChangesIterDB) advanceLargeVals() error { - var seek []byte - var err error - if hi.valsC == nil { - if hi.valsC, err = hi.roTx.Cursor(hi.valsTable); err != nil { - return err - } - firstKey, _, err := hi.valsC.First() - if err != nil { - return err - } - if firstKey == nil { - hi.nextKey = nil - return nil - } - seek = append(common.Copy(firstKey[:len(firstKey)-8]), hi.startTxKey[:]...) - } else { - next, ok := kv.NextSubtree(hi.nextKey) - if !ok { - hi.nextKey = nil - return nil - } - - seek = append(next, hi.startTxKey[:]...) - } - for k, v, err := hi.valsC.Seek(seek); k != nil; k, v, err = hi.valsC.Seek(seek) { - if err != nil { - return err - } - if hi.endTxNum >= 0 && int(binary.BigEndian.Uint64(k[len(k)-8:])) >= hi.endTxNum { - next, ok := kv.NextSubtree(k[:len(k)-8]) - if !ok { - hi.nextKey = nil - return nil - } - seek = append(next, hi.startTxKey[:]...) - continue - } - if hi.nextKey != nil && bytes.Equal(k[:len(k)-8], hi.nextKey) && bytes.Equal(v, hi.nextVal) { - // stuck on the same key, move to first key larger than seek - for { - k, v, err = hi.valsC.Next() - if err != nil { - return err - } - if k == nil { - hi.nextKey = nil - return nil - } - if bytes.Compare(seek[:len(seek)-8], k[:len(k)-8]) < 0 { - break - } - } - } - //fmt.Printf("[seek=%x][RET=%t] '%x' '%x'\n", seek, bytes.Equal(seek[:len(seek)-8], k[:len(k)-8]), k, v) - if !bytes.Equal(seek[:len(seek)-8], k[:len(k)-8]) /*|| int(binary.BigEndian.Uint64(k[len(k)-8:])) > hi.endTxNum */ { - if len(seek) != len(k) { - seek = append(append(seek[:0], k[:len(k)-8]...), hi.startTxKey[:]...) - continue - } - copy(seek[:len(k)-8], k[:len(k)-8]) - continue - } - hi.nextKey = k[:len(k)-8] - hi.nextVal = v - return nil - } - hi.nextKey = nil - return nil -} -func (hi *HistoryChangesIterDB) advanceSmallVals() (err error) { - var k []byte - if hi.valsCDup == nil { - if hi.valsCDup, err = hi.roTx.CursorDupSort(hi.valsTable); err != nil { - return err - } - - if k, _, err = hi.valsCDup.First(); err != nil { - return err - } - } else { - if k, _, err = hi.valsCDup.NextNoDup(); err != nil { - return err - } - } - for ; k != nil; k, _, err = hi.valsCDup.NextNoDup() { - if err != nil { - return err - } - v, err := hi.valsCDup.SeekBothRange(k, hi.startTxKey[:]) - if err != nil { - return err + return stream.TransformKV2U64(it, func(k, v []byte) (uint64, error) { + if len(v) < 8 { + return 0, fmt.Errorf("unexpected small value length %d", len(v)) } - if v == nil { - continue - } - foundTxNumVal := v[:8] - if hi.endTxNum >= 0 && int(binary.BigEndian.Uint64(foundTxNumVal)) >= hi.endTxNum { - continue - } - hi.nextKey = k - hi.nextVal = v[8:] - return nil - } - hi.nextKey = nil - return nil -} - -func (hi *HistoryChangesIterDB) HasNext() bool { - if hi.err != nil { // always true, then .Next() call will return this error - return true - } - if hi.limit == 0 { // limit reached - return false - } - if hi.nextKey == nil { // EndOfTable - return false - } - return true + return binary.BigEndian.Uint64(v), nil + }), nil } -func (hi *HistoryChangesIterDB) Next() ([]byte, []byte, uint64, error) { - if hi.err != nil { - return nil, nil, 0, hi.err - } - hi.limit-- - hi.k, hi.v, hi.step = hi.nextKey, hi.nextVal, hi.nextStep - if err := hi.advance(); err != nil { - return nil, nil, 0, err +func (ht *HistoryRoTx) IdxRange(key []byte, startTxNum, endTxNum int, asc order.By, limit int, roTx kv.Tx) (stream.U64, error) { + frozenIt, err := ht.iit.iterateRangeOnFiles(key, startTxNum, endTxNum, asc, limit) + if err != nil { + return nil, err } - order.Asc.Assert(hi.k, hi.nextKey) - return hi.k, hi.v, hi.step, nil -} - -// HistoryStep used for incremental state reconsitution, it isolates only one snapshot interval -type HistoryStep struct { - compressVals bool - indexItem *filesItem - indexFile visibleFile - historyItem *filesItem - historyFile visibleFile -} - -// MakeSteps [0, toTxNum) -func (h *History) MakeSteps(toTxNum uint64) []*HistoryStep { - var steps []*HistoryStep - h.InvertedIndex.dirtyFiles.Walk(func(items []*filesItem) bool { - for _, item := range items { - if item.index == nil || !item.frozen || item.startTxNum >= toTxNum { - continue - } - - step := &HistoryStep{ - compressVals: h.compression&seg.CompressVals != 0, - indexItem: item, - indexFile: visibleFile{ - startTxNum: item.startTxNum, - endTxNum: item.endTxNum, - getter: item.decompressor.MakeGetter(), - reader: recsplit.NewIndexReader(item.index), - }, - } - steps = append(steps, step) - } - return true - }) - i := 0 - h.dirtyFiles.Walk(func(items []*filesItem) bool { - for _, item := range items { - if item.index == nil || !item.frozen || item.startTxNum >= toTxNum { - continue - } - steps[i].historyItem = item - steps[i].historyFile = visibleFile{ - startTxNum: item.startTxNum, - endTxNum: item.endTxNum, - getter: item.decompressor.MakeGetter(), - reader: recsplit.NewIndexReader(item.index), - } - i++ - } - return true - }) - return steps -} - -func (hs *HistoryStep) Clone() *HistoryStep { - return &HistoryStep{ - compressVals: hs.compressVals, - indexItem: hs.indexItem, - indexFile: visibleFile{ - startTxNum: hs.indexFile.startTxNum, - endTxNum: hs.indexFile.endTxNum, - getter: hs.indexItem.decompressor.MakeGetter(), - reader: recsplit.NewIndexReader(hs.indexItem.index), - }, - historyItem: hs.historyItem, - historyFile: visibleFile{ - startTxNum: hs.historyFile.startTxNum, - endTxNum: hs.historyFile.endTxNum, - getter: hs.historyItem.decompressor.MakeGetter(), - reader: recsplit.NewIndexReader(hs.historyItem.index), - }, + recentIt, err := ht.idxRangeOnDB(key, startTxNum, endTxNum, asc, limit, roTx) + if err != nil { + return nil, err } + return stream.Union[uint64](frozenIt, recentIt, asc, limit), nil } diff --git a/erigon-lib/state/history_stream.go b/erigon-lib/state/history_stream.go new file mode 100644 index 00000000000..a0f01459c9e --- /dev/null +++ b/erigon-lib/state/history_stream.go @@ -0,0 +1,617 @@ +// Copyright 2022 The Erigon Authors +// This file is part of Erigon. +// +// Erigon is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Erigon is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with Erigon. If not, see . + +package state + +import ( + "bytes" + "container/heap" + "context" + "encoding/binary" + "fmt" + + "github.com/erigontech/erigon-lib/common" + "github.com/erigontech/erigon-lib/kv" + "github.com/erigontech/erigon-lib/kv/order" + "github.com/erigontech/erigon-lib/kv/stream" + "github.com/erigontech/erigon-lib/log/v3" + "github.com/erigontech/erigon-lib/recsplit/eliasfano32" + "github.com/erigontech/erigon-lib/seg" +) + +// StateAsOfIter - returns state range at given time in history +type StateAsOfIterF struct { + hc *HistoryRoTx + limit int + + from, toPrefix []byte + nextVal []byte + nextKey []byte + + h ReconHeap + startTxNum uint64 + startTxKey [8]byte + txnKey [8]byte + + k, v, kBackup, vBackup []byte + orderAscend order.By + + logger log.Logger + ctx context.Context +} + +func (hi *StateAsOfIterF) Close() { +} + +func (hi *StateAsOfIterF) init(files visibleFiles) error { + for i, item := range files { + if item.endTxNum <= hi.startTxNum { + continue + } + // TODO: seek(from) + g := seg.NewReader(item.src.decompressor.MakeGetter(), hi.hc.h.compression) + + idx := hi.hc.iit.statelessIdxReader(i) + var offset uint64 + if len(hi.from) > 0 { + n := item.src.decompressor.Count() / 2 + var ok bool + offset, ok = g.BinarySearch(hi.from, n, idx.OrdinalLookup) + if !ok { + offset = 0 + } + } + g.Reset(offset) + if g.HasNext() { + key, offset := g.Next(nil) + heap.Push(&hi.h, &ReconItem{g: g, key: key, startTxNum: item.startTxNum, endTxNum: item.endTxNum, txNum: item.endTxNum, startOffset: offset, lastOffset: offset}) + } + } + binary.BigEndian.PutUint64(hi.startTxKey[:], hi.startTxNum) + return hi.advanceInFiles() +} + +func (hi *StateAsOfIterF) Trace(prefix string) *stream.TracedDuo[[]byte, []byte] { + return stream.TraceDuo(hi, hi.logger, "[dbg] StateAsOfIterF.Next "+prefix) +} + +func (hi *StateAsOfIterF) advanceInFiles() error { + for hi.h.Len() > 0 { + top := heap.Pop(&hi.h).(*ReconItem) + key := top.key + var idxVal []byte + //if hi.compressVals { + idxVal, _ = top.g.Next(nil) + //} else { + // idxVal, _ = top.g.NextUncompressed() + //} + if top.g.HasNext() { + //if hi.compressVals { + top.key, _ = top.g.Next(nil) + //} else { + // top.key, _ = top.g.NextUncompressed() + //} + if hi.toPrefix == nil || bytes.Compare(top.key, hi.toPrefix) < 0 { + heap.Push(&hi.h, top) + } + } + + if hi.from != nil && bytes.Compare(key, hi.from) < 0 { //TODO: replace by seekInFiles() + continue + } + + if bytes.Equal(key, hi.nextKey) { + continue + } + n, ok := eliasfano32.Seek(idxVal, hi.startTxNum) + if !ok { + continue + } + + hi.nextKey = key + binary.BigEndian.PutUint64(hi.txnKey[:], n) + historyItem, ok := hi.hc.getFileDeprecated(top.startTxNum, top.endTxNum) + if !ok { + return fmt.Errorf("no %s file found for [%x]", hi.hc.h.filenameBase, hi.nextKey) + } + reader := hi.hc.statelessIdxReader(historyItem.i) + offset, ok := reader.Lookup2(hi.txnKey[:], hi.nextKey) + if !ok { + continue + } + g := hi.hc.statelessGetter(historyItem.i) + g.Reset(offset) + hi.nextVal, _ = g.Next(nil) + return nil + } + hi.nextKey = nil + return nil +} + +func (hi *StateAsOfIterF) HasNext() bool { + if hi.limit == 0 { // limit reached + return false + } + if hi.nextKey == nil { // EndOfTable + return false + } + if hi.toPrefix == nil { // s.nextK == nil check is above + return true + } + + //Asc: [from, to) AND from < to + //Desc: [from, to) AND from > to + cmp := bytes.Compare(hi.nextKey, hi.toPrefix) + return (bool(hi.orderAscend) && cmp < 0) || (!bool(hi.orderAscend) && cmp > 0) +} + +func (hi *StateAsOfIterF) Next() ([]byte, []byte, error) { + select { + case <-hi.ctx.Done(): + return nil, nil, hi.ctx.Err() + default: + } + + hi.limit-- + hi.k, hi.v = append(hi.k[:0], hi.nextKey...), append(hi.v[:0], hi.nextVal...) + + // Satisfy stream.Duo Invariant 2 + hi.k, hi.kBackup, hi.v, hi.vBackup = hi.kBackup, hi.k, hi.vBackup, hi.v + if err := hi.advanceInFiles(); err != nil { + return nil, nil, err + } + hi.orderAscend.Assert(hi.kBackup, hi.nextKey) + // TODO: remove `common.Copy`. it protecting from some existing bug. https://github.com/erigontech/erigon/issues/12672 + return common.Copy(hi.kBackup), common.Copy(hi.vBackup), nil +} + +// StateAsOfIterDB - returns state range at given time in history +type StateAsOfIterDB struct { + largeValues bool + roTx kv.Tx + valsC kv.Cursor + valsCDup kv.CursorDupSort + valsTable string + + from, toPrefix []byte + orderAscend order.By + limit int + + nextKey, nextVal []byte + + startTxNum uint64 + startTxKey [8]byte + + k, v, kBackup, vBackup []byte + err error + + logger log.Logger + ctx context.Context +} + +func (hi *StateAsOfIterDB) Close() { + if hi.valsC != nil { + hi.valsC.Close() + } +} + +func (hi *StateAsOfIterDB) Trace(prefix string) *stream.TracedDuo[[]byte, []byte] { + return stream.TraceDuo(hi, hi.logger, "[dbg] StateAsOfIterDB.Next "+prefix) +} + +func (hi *StateAsOfIterDB) advance() (err error) { + // not large: + // keys: txNum -> key1+key2 + // vals: key1+key2 -> txNum + value (DupSort) + // large: + // keys: txNum -> key1+key2 + // vals: key1+key2+txNum -> value (not DupSort) + if hi.largeValues { + return hi.advanceLargeVals() + } + return hi.advanceSmallVals() +} +func (hi *StateAsOfIterDB) advanceLargeVals() error { + var seek []byte + var err error + if hi.valsC == nil { + if hi.valsC, err = hi.roTx.Cursor(hi.valsTable); err != nil { + return err + } + firstKey, _, err := hi.valsC.Seek(hi.from) + if err != nil { + return err + } + if firstKey == nil { + hi.nextKey = nil + return nil + } + seek = append(common.Copy(firstKey[:len(firstKey)-8]), hi.startTxKey[:]...) + } else { + next, ok := kv.NextSubtree(hi.nextKey) + if !ok { + hi.nextKey = nil + return nil + } + + seek = append(next, hi.startTxKey[:]...) + } + for k, v, err := hi.valsC.Seek(seek); k != nil; k, v, err = hi.valsC.Seek(seek) { + if err != nil { + return err + } + if hi.toPrefix != nil && bytes.Compare(k[:len(k)-8], hi.toPrefix) >= 0 { + break + } + if !bytes.Equal(seek[:len(k)-8], k[:len(k)-8]) { + copy(seek[:len(k)-8], k[:len(k)-8]) + continue + } + hi.nextKey = k[:len(k)-8] + hi.nextVal = v + return nil + } + hi.nextKey = nil + return nil +} +func (hi *StateAsOfIterDB) advanceSmallVals() error { + var seek []byte + var err error + if hi.valsCDup == nil { + if hi.valsCDup, err = hi.roTx.CursorDupSort(hi.valsTable); err != nil { + return err + } + seek = hi.from + } else { + next, ok := kv.NextSubtree(hi.nextKey) + if !ok { + hi.nextKey = nil + return nil + } + seek = next + } + for k, _, err := hi.valsCDup.Seek(seek); k != nil; k, _, err = hi.valsCDup.NextNoDup() { + if err != nil { + return err + } + if hi.toPrefix != nil && bytes.Compare(k, hi.toPrefix) >= 0 { + break + } + v, err := hi.valsCDup.SeekBothRange(k, hi.startTxKey[:]) + if err != nil { + return err + } + if v == nil { + continue + } + hi.nextKey = k + hi.nextVal = v[8:] + return nil + } + hi.nextKey = nil + return nil +} + +func (hi *StateAsOfIterDB) HasNext() bool { + if hi.err != nil { + return true + } + if hi.limit == 0 { // limit reached + return false + } + if hi.nextKey == nil { // EndOfTable + return false + } + if hi.toPrefix == nil { // s.nextK == nil check is above + return true + } + + //Asc: [from, to) AND from < to + //Desc: [from, to) AND from > to + cmp := bytes.Compare(hi.nextKey, hi.toPrefix) + return (bool(hi.orderAscend) && cmp < 0) || (!bool(hi.orderAscend) && cmp > 0) +} + +func (hi *StateAsOfIterDB) Next() ([]byte, []byte, error) { + select { + case <-hi.ctx.Done(): + return nil, nil, hi.ctx.Err() + default: + } + + if hi.err != nil { + return nil, nil, hi.err + } + hi.limit-- + hi.k, hi.v = hi.nextKey, hi.nextVal + + // Satisfy stream.Duo Invariant 2 + hi.k, hi.kBackup, hi.v, hi.vBackup = hi.kBackup, hi.k, hi.vBackup, hi.v + if err := hi.advance(); err != nil { + return nil, nil, err + } + hi.orderAscend.Assert(hi.kBackup, hi.nextKey) + // TODO: remove `common.Copy`. it protecting from some existing bug. https://github.com/erigontech/erigon/issues/12672 + return common.Copy(hi.kBackup), common.Copy(hi.vBackup), nil +} + +type HistoryChangesIterFiles struct { + hc *HistoryRoTx + nextVal []byte + nextKey []byte + h ReconHeap + startTxNum uint64 + endTxNum int + startTxKey [8]byte + txnKey [8]byte + + k, v, kBackup, vBackup []byte + err error + limit int +} + +func (hi *HistoryChangesIterFiles) Close() { +} + +func (hi *HistoryChangesIterFiles) advance() error { + for hi.h.Len() > 0 { + top := heap.Pop(&hi.h).(*ReconItem) + key := top.key + var idxVal []byte + //if hi.compressVals { + idxVal, _ = top.g.Next(nil) + //} else { + // idxVal, _ = top.g.NextUncompressed() + //} + if top.g.HasNext() { + //if hi.compressVals { + top.key, _ = top.g.Next(nil) + //} else { + // top.key, _ = top.g.NextUncompressed() + //} + heap.Push(&hi.h, top) + } + + if bytes.Equal(key, hi.nextKey) { + continue + } + n, ok := eliasfano32.Seek(idxVal, hi.startTxNum) + if !ok { + continue + } + if int(n) >= hi.endTxNum { + continue + } + + hi.nextKey = key + binary.BigEndian.PutUint64(hi.txnKey[:], n) + historyItem, ok := hi.hc.getFileDeprecated(top.startTxNum, top.endTxNum) + if !ok { + return fmt.Errorf("HistoryChangesIterFiles: no %s file found for [%x]", hi.hc.h.filenameBase, hi.nextKey) + } + reader := hi.hc.statelessIdxReader(historyItem.i) + offset, ok := reader.Lookup2(hi.txnKey[:], hi.nextKey) + if !ok { + continue + } + g := hi.hc.statelessGetter(historyItem.i) + g.Reset(offset) + hi.nextVal, _ = g.Next(nil) + return nil + } + hi.nextKey = nil + return nil +} + +func (hi *HistoryChangesIterFiles) HasNext() bool { + if hi.err != nil { // always true, then .Next() call will return this error + return true + } + if hi.limit == 0 { // limit reached + return false + } + if hi.nextKey == nil { // EndOfTable + return false + } + return true +} + +func (hi *HistoryChangesIterFiles) Next() ([]byte, []byte, error) { + if hi.err != nil { + return nil, nil, hi.err + } + hi.limit-- + hi.k, hi.v = append(hi.k[:0], hi.nextKey...), append(hi.v[:0], hi.nextVal...) + + // Satisfy iter.Duo Invariant 2 + hi.k, hi.kBackup, hi.v, hi.vBackup = hi.kBackup, hi.k, hi.vBackup, hi.v + if err := hi.advance(); err != nil { + return nil, nil, err + } + fmt.Printf("[dbg] hist.Next: %x\n", hi.kBackup) + return hi.kBackup, hi.vBackup, nil +} + +type HistoryChangesIterDB struct { + largeValues bool + roTx kv.Tx + valsC kv.Cursor + valsCDup kv.CursorDupSort + valsTable string + limit, endTxNum int + startTxKey [8]byte + + nextKey, nextVal []byte + nextStep uint64 + k, v []byte + step uint64 + err error +} + +func (hi *HistoryChangesIterDB) Close() { + if hi.valsC != nil { + hi.valsC.Close() + } + if hi.valsCDup != nil { + hi.valsCDup.Close() + } +} +func (hi *HistoryChangesIterDB) advance() (err error) { + // not large: + // keys: txNum -> key1+key2 + // vals: key1+key2 -> txNum + value (DupSort) + // large: + // keys: txNum -> key1+key2 + // vals: key1+key2+txNum -> value (not DupSort) + if hi.largeValues { + return hi.advanceLargeVals() + } + return hi.advanceSmallVals() +} + +func (hi *HistoryChangesIterDB) advanceLargeVals() error { + var seek []byte + var err error + if hi.valsC == nil { + if hi.valsC, err = hi.roTx.Cursor(hi.valsTable); err != nil { + return err + } + firstKey, _, err := hi.valsC.First() + if err != nil { + return err + } + if firstKey == nil { + hi.nextKey = nil + return nil + } + seek = append(common.Copy(firstKey[:len(firstKey)-8]), hi.startTxKey[:]...) + } else { + next, ok := kv.NextSubtree(hi.nextKey) + if !ok { + hi.nextKey = nil + return nil + } + + seek = append(next, hi.startTxKey[:]...) + } + for k, v, err := hi.valsC.Seek(seek); k != nil; k, v, err = hi.valsC.Seek(seek) { + if err != nil { + return err + } + if hi.endTxNum >= 0 && int(binary.BigEndian.Uint64(k[len(k)-8:])) >= hi.endTxNum { + next, ok := kv.NextSubtree(k[:len(k)-8]) + if !ok { + hi.nextKey = nil + return nil + } + seek = append(next, hi.startTxKey[:]...) + continue + } + if hi.nextKey != nil && bytes.Equal(k[:len(k)-8], hi.nextKey) && bytes.Equal(v, hi.nextVal) { + // stuck on the same key, move to first key larger than seek + for { + k, v, err = hi.valsC.Next() + if err != nil { + return err + } + if k == nil { + hi.nextKey = nil + return nil + } + if bytes.Compare(seek[:len(seek)-8], k[:len(k)-8]) < 0 { + break + } + } + } + //fmt.Printf("[seek=%x][RET=%t] '%x' '%x'\n", seek, bytes.Equal(seek[:len(seek)-8], k[:len(k)-8]), k, v) + if !bytes.Equal(seek[:len(seek)-8], k[:len(k)-8]) /*|| int(binary.BigEndian.Uint64(k[len(k)-8:])) > hi.endTxNum */ { + if len(seek) != len(k) { + seek = append(append(seek[:0], k[:len(k)-8]...), hi.startTxKey[:]...) + continue + } + copy(seek[:len(k)-8], k[:len(k)-8]) + continue + } + hi.nextKey = k[:len(k)-8] + hi.nextVal = v + return nil + } + hi.nextKey = nil + return nil +} +func (hi *HistoryChangesIterDB) advanceSmallVals() (err error) { + var k []byte + if hi.valsCDup == nil { + if hi.valsCDup, err = hi.roTx.CursorDupSort(hi.valsTable); err != nil { + return err + } + + if k, _, err = hi.valsCDup.First(); err != nil { + return err + } + } else { + if k, _, err = hi.valsCDup.NextNoDup(); err != nil { + return err + } + } + for ; k != nil; k, _, err = hi.valsCDup.NextNoDup() { + if err != nil { + return err + } + v, err := hi.valsCDup.SeekBothRange(k, hi.startTxKey[:]) + if err != nil { + return err + } + if v == nil { + continue + } + foundTxNumVal := v[:8] + if hi.endTxNum >= 0 && int(binary.BigEndian.Uint64(foundTxNumVal)) >= hi.endTxNum { + continue + } + hi.nextKey = k + hi.nextVal = v[8:] + return nil + } + hi.nextKey = nil + return nil +} + +func (hi *HistoryChangesIterDB) HasNext() bool { + if hi.err != nil { // always true, then .Next() call will return this error + return true + } + if hi.limit == 0 { // limit reached + return false + } + if hi.nextKey == nil { // EndOfTable + return false + } + return true +} + +func (hi *HistoryChangesIterDB) Next() ([]byte, []byte, uint64, error) { + if hi.err != nil { + return nil, nil, 0, hi.err + } + hi.limit-- + hi.k, hi.v, hi.step = hi.nextKey, hi.nextVal, hi.nextStep + if err := hi.advance(); err != nil { + return nil, nil, 0, err + } + order.Asc.Assert(hi.k, hi.nextKey) + return hi.k, hi.v, hi.step, nil +} diff --git a/erigon-lib/state/inverted_index.go b/erigon-lib/state/inverted_index.go index 9146e971b20..76aa845f80c 100644 --- a/erigon-lib/state/inverted_index.go +++ b/erigon-lib/state/inverted_index.go @@ -28,13 +28,10 @@ import ( "path" "path/filepath" "reflect" - "regexp" - "strconv" "strings" "sync" "time" - "github.com/RoaringBitmap/roaring/roaring64" "github.com/spaolacci/murmur3" btree2 "github.com/tidwall/btree" "golang.org/x/sync/errgroup" @@ -186,46 +183,17 @@ func (ii *InvertedIndex) openFolder() error { return ii.openList(idxFiles) } -func (ii *InvertedIndex) scanDirtyFiles(fileNames []string) (garbageFiles []*filesItem) { - re := regexp.MustCompile("^v([0-9]+)-" + ii.filenameBase + ".([0-9]+)-([0-9]+).ef$") - var err error - for _, name := range fileNames { - subs := re.FindStringSubmatch(name) - if len(subs) != 4 { - if len(subs) != 0 { - ii.logger.Warn("File ignored by inverted index scan, more than 3 submatches", "name", name, "submatches", len(subs)) - } - continue - } - var startStep, endStep uint64 - if startStep, err = strconv.ParseUint(subs[2], 10, 64); err != nil { - ii.logger.Warn("File ignored by inverted index scan, parsing startTxNum", "error", err, "name", name) - continue - } - if endStep, err = strconv.ParseUint(subs[3], 10, 64); err != nil { - ii.logger.Warn("File ignored by inverted index scan, parsing endTxNum", "error", err, "name", name) - continue - } - if startStep > endStep { - ii.logger.Warn("File ignored by inverted index scan, startTxNum > endTxNum", "name", name) - continue - } - - startTxNum, endTxNum := startStep*ii.aggregationStep, endStep*ii.aggregationStep - var newFile = newFilesItem(startTxNum, endTxNum, ii.aggregationStep) - +func (ii *InvertedIndex) scanDirtyFiles(fileNames []string) { + for _, dirtyFile := range scanDirtyFiles(fileNames, ii.aggregationStep, ii.filenameBase, "ef", ii.logger) { + startStep, endStep := dirtyFile.startTxNum/ii.aggregationStep, dirtyFile.endTxNum/ii.aggregationStep if ii.integrityCheck != nil && !ii.integrityCheck(startStep, endStep) { - ii.logger.Debug("[agg] skip garbage file", "name", name) + ii.logger.Debug("[agg] skip garbage file", "name", ii.filenameBase, "startStep", startStep, "endStep", endStep) continue } - - if _, has := ii.dirtyFiles.Get(newFile); has { - continue + if _, has := ii.dirtyFiles.Get(dirtyFile); !has { + ii.dirtyFiles.Set(dirtyFile) } - - ii.dirtyFiles.Set(newFile) } - return garbageFiles } type idxList int @@ -655,7 +623,7 @@ func (iit *InvertedIndexRoTx) seekInFiles(key []byte, txNum uint64) (found bool, // todo IdxRange operates over ii.indexTable . Passing `nil` as a key will not return all keys func (iit *InvertedIndexRoTx) IdxRange(key []byte, startTxNum, endTxNum int, asc order.By, limit int, roTx kv.Tx) (stream.U64, error) { - frozenIt, err := iit.iterateRangeFrozen(key, startTxNum, endTxNum, asc, limit) + frozenIt, err := iit.iterateRangeOnFiles(key, startTxNum, endTxNum, asc, limit) if err != nil { return nil, err } @@ -703,7 +671,7 @@ func (iit *InvertedIndexRoTx) recentIterateRange(key []byte, startTxNum, endTxNu // IdxRange is to be used in public API, therefore it relies on read-only transaction // so that iteration can be done even when the inverted index is being updated. // [startTxNum; endNumTx) -func (iit *InvertedIndexRoTx) iterateRangeFrozen(key []byte, startTxNum, endTxNum int, asc order.By, limit int) (*FrozenInvertedIdxIter, error) { +func (iit *InvertedIndexRoTx) iterateRangeOnFiles(key []byte, startTxNum, endTxNum int, asc order.By, limit int) (*InvertedIdxStreamFiles, error) { if asc && (startTxNum >= 0 && endTxNum >= 0) && startTxNum > endTxNum { return nil, fmt.Errorf("startTxNum=%d epected to be lower than endTxNum=%d", startTxNum, endTxNum) } @@ -711,7 +679,7 @@ func (iit *InvertedIndexRoTx) iterateRangeFrozen(key []byte, startTxNum, endTxNu return nil, fmt.Errorf("startTxNum=%d epected to be bigger than endTxNum=%d", startTxNum, endTxNum) } - it := &FrozenInvertedIdxIter{ + it := &InvertedIdxStreamFiles{ key: key, startTxNum: startTxNum, endTxNum: endTxNum, @@ -1013,402 +981,6 @@ func (iit *InvertedIndexRoTx) DebugEFAllValuesAreInRange(ctx context.Context, fa return nil } -// FrozenInvertedIdxIter allows iteration over range of txn numbers -// Iteration is not implmented via callback function, because there is often -// a requirement for interators to be composable (for example, to implement AND and OR for indices) -// FrozenInvertedIdxIter must be closed after use to prevent leaking of resources like cursor -type FrozenInvertedIdxIter struct { - key []byte - startTxNum, endTxNum int - limit int - orderAscend order.By - - efIt stream.Uno[uint64] - indexTable string - stack []visibleFile - - nextN uint64 - hasNext bool - err error - - ef *eliasfano32.EliasFano -} - -func (it *FrozenInvertedIdxIter) Close() { - for _, item := range it.stack { - item.reader.Close() - } -} - -func (it *FrozenInvertedIdxIter) advance() { - if it.hasNext { - it.advanceInFiles() - } -} - -func (it *FrozenInvertedIdxIter) HasNext() bool { - if it.err != nil { // always true, then .Next() call will return this error - return true - } - if it.limit == 0 { // limit reached - return false - } - return it.hasNext -} - -func (it *FrozenInvertedIdxIter) Next() (uint64, error) { return it.next(), nil } - -func (it *FrozenInvertedIdxIter) next() uint64 { - it.limit-- - n := it.nextN - it.advance() - return n -} - -func (it *FrozenInvertedIdxIter) advanceInFiles() { - for { - for it.efIt == nil { - if len(it.stack) == 0 { - it.hasNext = false - return - } - item := it.stack[len(it.stack)-1] - it.stack = it.stack[:len(it.stack)-1] - offset, ok := item.reader.TwoLayerLookup(it.key) - if !ok { - continue - } - g := item.getter - g.Reset(offset) - k, _ := g.NextUncompressed() - if bytes.Equal(k, it.key) { - eliasVal, _ := g.NextUncompressed() - it.ef.Reset(eliasVal) - var efiter *eliasfano32.EliasFanoIter - if it.orderAscend { - efiter = it.ef.Iterator() - } else { - efiter = it.ef.ReverseIterator() - } - if it.startTxNum > 0 { - efiter.Seek(uint64(it.startTxNum)) - } - it.efIt = efiter - } - } - - //Asc: [from, to) AND from < to - //Desc: [from, to) AND from > to - if it.orderAscend { - for it.efIt.HasNext() { - n, err := it.efIt.Next() - if err != nil { - it.err = err - return - } - isBeforeRange := int(n) < it.startTxNum - if isBeforeRange { //skip - continue - } - isAfterRange := it.endTxNum >= 0 && int(n) >= it.endTxNum - if isAfterRange { // terminate - it.hasNext = false - return - } - it.hasNext = true - it.nextN = n - return - } - } else { - for it.efIt.HasNext() { - n, err := it.efIt.Next() - if err != nil { - it.err = err - return - } - isAfterRange := it.startTxNum >= 0 && int(n) > it.startTxNum - if isAfterRange { //skip - continue - } - isBeforeRange := it.endTxNum >= 0 && int(n) <= it.endTxNum - if isBeforeRange { // terminate - it.hasNext = false - return - } - it.hasNext = true - it.nextN = n - return - } - } - it.efIt = nil // Exhausted this iterator - } -} - -// RecentInvertedIdxIter allows iteration over range of txn numbers -// Iteration is not implmented via callback function, because there is often -// a requirement for interators to be composable (for example, to implement AND and OR for indices) -type RecentInvertedIdxIter struct { - key []byte - startTxNum, endTxNum int - limit int - orderAscend order.By - - roTx kv.Tx - cursor kv.CursorDupSort - indexTable string - - nextN uint64 - hasNext bool - err error - - bm *roaring64.Bitmap -} - -func (it *RecentInvertedIdxIter) Close() { - if it.cursor != nil { - it.cursor.Close() - } - bitmapdb.ReturnToPool64(it.bm) -} - -func (it *RecentInvertedIdxIter) advanceInDB() { - var v []byte - var err error - if it.cursor == nil { - if it.cursor, err = it.roTx.CursorDupSort(it.indexTable); err != nil { - // TODO pass error properly around - panic(err) - } - var k []byte - if k, _, err = it.cursor.SeekExact(it.key); err != nil { - panic(err) - } - if k == nil { - it.hasNext = false - return - } - //Asc: [from, to) AND from < to - //Desc: [from, to) AND from > to - var keyBytes [8]byte - if it.startTxNum > 0 { - binary.BigEndian.PutUint64(keyBytes[:], uint64(it.startTxNum)) - } - if v, err = it.cursor.SeekBothRange(it.key, keyBytes[:]); err != nil { - panic(err) - } - if v == nil { - if !it.orderAscend { - _, v, err = it.cursor.PrevDup() - if err != nil { - panic(err) - } - } - if v == nil { - it.hasNext = false - return - } - } - } else { - if it.orderAscend { - _, v, err = it.cursor.NextDup() - if err != nil { - // TODO pass error properly around - panic(err) - } - } else { - _, v, err = it.cursor.PrevDup() - if err != nil { - panic(err) - } - } - } - - //Asc: [from, to) AND from < to - //Desc: [from, to) AND from > to - if it.orderAscend { - for ; v != nil; _, v, err = it.cursor.NextDup() { - if err != nil { - // TODO pass error properly around - panic(err) - } - n := binary.BigEndian.Uint64(v) - if it.endTxNum >= 0 && int(n) >= it.endTxNum { - it.hasNext = false - return - } - if int(n) >= it.startTxNum { - it.hasNext = true - it.nextN = n - return - } - } - } else { - for ; v != nil; _, v, err = it.cursor.PrevDup() { - if err != nil { - // TODO pass error properly around - panic(err) - } - n := binary.BigEndian.Uint64(v) - if int(n) <= it.endTxNum { - it.hasNext = false - return - } - if it.startTxNum >= 0 && int(n) <= it.startTxNum { - it.hasNext = true - it.nextN = n - return - } - } - } - - it.hasNext = false -} - -func (it *RecentInvertedIdxIter) advance() { - if it.hasNext { - it.advanceInDB() - } -} - -func (it *RecentInvertedIdxIter) HasNext() bool { - if it.err != nil { // always true, then .Next() call will return this error - return true - } - if it.limit == 0 { // limit reached - return false - } - return it.hasNext -} - -func (it *RecentInvertedIdxIter) Next() (uint64, error) { - if it.err != nil { - return 0, it.err - } - it.limit-- - n := it.nextN - it.advance() - return n, nil -} - -type InvertedIterator1 struct { - roTx kv.Tx - cursor kv.CursorDupSort - indexTable string - key []byte - h ReconHeap - nextKey []byte - nextFileKey []byte - nextDbKey []byte - endTxNum uint64 - startTxNum uint64 - startTxKey [8]byte - hasNextInDb bool - hasNextInFiles bool -} - -func (it *InvertedIterator1) Close() { - if it.cursor != nil { - it.cursor.Close() - } -} - -func (it *InvertedIterator1) advanceInFiles() { - for it.h.Len() > 0 { - top := heap.Pop(&it.h).(*ReconItem) - key := top.key - val, _ := top.g.Next(nil) - if top.g.HasNext() { - top.key, _ = top.g.Next(nil) - heap.Push(&it.h, top) - } - if !bytes.Equal(key, it.key) { - ef, _ := eliasfano32.ReadEliasFano(val) - _min := ef.Get(0) - _max := ef.Max() - if _min < it.endTxNum && _max >= it.startTxNum { // Intersection of [min; max) and [it.startTxNum; it.endTxNum) - it.key = key - it.nextFileKey = key - return - } - } - } - it.hasNextInFiles = false -} - -func (it *InvertedIterator1) advanceInDb() { - var k, v []byte - var err error - if it.cursor == nil { - if it.cursor, err = it.roTx.CursorDupSort(it.indexTable); err != nil { - // TODO pass error properly around - panic(err) - } - if k, _, err = it.cursor.First(); err != nil { - // TODO pass error properly around - panic(err) - } - } else { - if k, _, err = it.cursor.NextNoDup(); err != nil { - panic(err) - } - } - for k != nil { - if v, err = it.cursor.SeekBothRange(k, it.startTxKey[:]); err != nil { - panic(err) - } - if v != nil { - txNum := binary.BigEndian.Uint64(v) - if txNum < it.endTxNum { - it.nextDbKey = append(it.nextDbKey[:0], k...) - return - } - } - if k, _, err = it.cursor.NextNoDup(); err != nil { - panic(err) - } - } - it.cursor.Close() - it.cursor = nil - it.hasNextInDb = false -} - -func (it *InvertedIterator1) advance() { - if it.hasNextInFiles { - if it.hasNextInDb { - c := bytes.Compare(it.nextFileKey, it.nextDbKey) - if c < 0 { - it.nextKey = append(it.nextKey[:0], it.nextFileKey...) - it.advanceInFiles() - } else if c > 0 { - it.nextKey = append(it.nextKey[:0], it.nextDbKey...) - it.advanceInDb() - } else { - it.nextKey = append(it.nextKey[:0], it.nextFileKey...) - it.advanceInDb() - it.advanceInFiles() - } - } else { - it.nextKey = append(it.nextKey[:0], it.nextFileKey...) - it.advanceInFiles() - } - } else if it.hasNextInDb { - it.nextKey = append(it.nextKey[:0], it.nextDbKey...) - it.advanceInDb() - } else { - it.nextKey = nil - } -} - -func (it *InvertedIterator1) HasNext() bool { - return it.hasNextInFiles || it.hasNextInDb || it.nextKey != nil -} - -func (it *InvertedIterator1) Next(keyBuf []byte) []byte { - result := append(keyBuf, it.nextKey...) - it.advance() - return result -} - func (iit *InvertedIndexRoTx) IterateChangedKeys(startTxNum, endTxNum uint64, roTx kv.Tx) InvertedIterator1 { var ii1 InvertedIterator1 ii1.hasNextInDb = true @@ -1662,25 +1234,6 @@ func (ii *InvertedIndex) integrateDirtyFiles(sf InvertedFiles, txNumFrom, txNumT ii.dirtyFiles.Set(fi) } -func (ii *InvertedIndex) collectFilesStat() (filesCount, filesSize, idxSize uint64) { - if ii.dirtyFiles == nil { - return 0, 0, 0 - } - ii.dirtyFiles.Walk(func(items []*filesItem) bool { - for _, item := range items { - if item.index == nil { - return false - } - filesSize += uint64(item.decompressor.Size()) - idxSize += uint64(item.index.Size()) - idxSize += uint64(item.bindex.Size()) - filesCount += 3 - } - return true - }) - return filesCount, filesSize, idxSize -} - func (ii *InvertedIndex) stepsRangeInDBAsStr(tx kv.Tx) string { a1, a2 := ii.stepsRangeInDB(tx) return fmt.Sprintf("%s: %.1f", ii.filenameBase, a2-a1) diff --git a/erigon-lib/state/inverted_index_stream.go b/erigon-lib/state/inverted_index_stream.go new file mode 100644 index 00000000000..6484bb84aa5 --- /dev/null +++ b/erigon-lib/state/inverted_index_stream.go @@ -0,0 +1,427 @@ +// Copyright 2022 The Erigon Authors +// This file is part of Erigon. +// +// Erigon is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Erigon is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with Erigon. If not, see . + +package state + +import ( + "bytes" + "container/heap" + "encoding/binary" + + "github.com/RoaringBitmap/roaring/roaring64" + + "github.com/erigontech/erigon-lib/kv" + "github.com/erigontech/erigon-lib/kv/bitmapdb" + "github.com/erigontech/erigon-lib/kv/order" + "github.com/erigontech/erigon-lib/kv/stream" + "github.com/erigontech/erigon-lib/recsplit/eliasfano32" +) + +// InvertedIdxStreamFiles allows iteration over range of txn numbers +// Iteration is not implmented via callback function, because there is often +// a requirement for interators to be composable (for example, to implement AND and OR for indices) +// InvertedIdxStreamFiles must be closed after use to prevent leaking of resources like cursor +type InvertedIdxStreamFiles struct { + key []byte + startTxNum, endTxNum int + limit int + orderAscend order.By + + efIt stream.Uno[uint64] + indexTable string + stack []visibleFile + + nextN uint64 + hasNext bool + err error + + ef *eliasfano32.EliasFano +} + +func (it *InvertedIdxStreamFiles) Close() { + for _, item := range it.stack { + item.reader.Close() + } +} + +func (it *InvertedIdxStreamFiles) advance() { + if it.hasNext { + it.advanceInFiles() + } +} + +func (it *InvertedIdxStreamFiles) HasNext() bool { + if it.err != nil { // always true, then .Next() call will return this error + return true + } + if it.limit == 0 { // limit reached + return false + } + return it.hasNext +} + +func (it *InvertedIdxStreamFiles) Next() (uint64, error) { return it.next(), nil } + +func (it *InvertedIdxStreamFiles) next() uint64 { + it.limit-- + n := it.nextN + it.advance() + return n +} + +func (it *InvertedIdxStreamFiles) advanceInFiles() { + for { + for it.efIt == nil { + if len(it.stack) == 0 { + it.hasNext = false + return + } + item := it.stack[len(it.stack)-1] + it.stack = it.stack[:len(it.stack)-1] + offset, ok := item.reader.TwoLayerLookup(it.key) + if !ok { + continue + } + g := item.getter + g.Reset(offset) + k, _ := g.NextUncompressed() + if bytes.Equal(k, it.key) { + eliasVal, _ := g.NextUncompressed() + it.ef.Reset(eliasVal) + var efiter *eliasfano32.EliasFanoIter + if it.orderAscend { + efiter = it.ef.Iterator() + } else { + efiter = it.ef.ReverseIterator() + } + if it.startTxNum > 0 { + efiter.Seek(uint64(it.startTxNum)) + } + it.efIt = efiter + } + } + + //Asc: [from, to) AND from < to + //Desc: [from, to) AND from > to + if it.orderAscend { + for it.efIt.HasNext() { + n, err := it.efIt.Next() + if err != nil { + it.err = err + return + } + isBeforeRange := int(n) < it.startTxNum + if isBeforeRange { //skip + continue + } + isAfterRange := it.endTxNum >= 0 && int(n) >= it.endTxNum + if isAfterRange { // terminate + it.hasNext = false + return + } + it.hasNext = true + it.nextN = n + return + } + } else { + for it.efIt.HasNext() { + n, err := it.efIt.Next() + if err != nil { + it.err = err + return + } + isAfterRange := it.startTxNum >= 0 && int(n) > it.startTxNum + if isAfterRange { //skip + continue + } + isBeforeRange := it.endTxNum >= 0 && int(n) <= it.endTxNum + if isBeforeRange { // terminate + it.hasNext = false + return + } + it.hasNext = true + it.nextN = n + return + } + } + it.efIt = nil // Exhausted this iterator + } +} + +// RecentInvertedIdxIter allows iteration over range of txn numbers +// Iteration is not implmented via callback function, because there is often +// a requirement for interators to be composable (for example, to implement AND and OR for indices) +type RecentInvertedIdxIter struct { + key []byte + startTxNum, endTxNum int + limit int + orderAscend order.By + + roTx kv.Tx + cursor kv.CursorDupSort + indexTable string + + nextN uint64 + hasNext bool + err error + + bm *roaring64.Bitmap +} + +func (it *RecentInvertedIdxIter) Close() { + if it.cursor != nil { + it.cursor.Close() + } + bitmapdb.ReturnToPool64(it.bm) +} + +func (it *RecentInvertedIdxIter) advanceInDB() { + var v []byte + var err error + if it.cursor == nil { + if it.cursor, err = it.roTx.CursorDupSort(it.indexTable); err != nil { + // TODO pass error properly around + panic(err) + } + var k []byte + if k, _, err = it.cursor.SeekExact(it.key); err != nil { + panic(err) + } + if k == nil { + it.hasNext = false + return + } + //Asc: [from, to) AND from < to + //Desc: [from, to) AND from > to + var keyBytes [8]byte + if it.startTxNum > 0 { + binary.BigEndian.PutUint64(keyBytes[:], uint64(it.startTxNum)) + } + if v, err = it.cursor.SeekBothRange(it.key, keyBytes[:]); err != nil { + panic(err) + } + if v == nil { + if !it.orderAscend { + _, v, err = it.cursor.PrevDup() + if err != nil { + panic(err) + } + } + if v == nil { + it.hasNext = false + return + } + } + } else { + if it.orderAscend { + _, v, err = it.cursor.NextDup() + if err != nil { + // TODO pass error properly around + panic(err) + } + } else { + _, v, err = it.cursor.PrevDup() + if err != nil { + panic(err) + } + } + } + + //Asc: [from, to) AND from < to + //Desc: [from, to) AND from > to + if it.orderAscend { + for ; v != nil; _, v, err = it.cursor.NextDup() { + if err != nil { + // TODO pass error properly around + panic(err) + } + n := binary.BigEndian.Uint64(v) + if it.endTxNum >= 0 && int(n) >= it.endTxNum { + it.hasNext = false + return + } + if int(n) >= it.startTxNum { + it.hasNext = true + it.nextN = n + return + } + } + } else { + for ; v != nil; _, v, err = it.cursor.PrevDup() { + if err != nil { + // TODO pass error properly around + panic(err) + } + n := binary.BigEndian.Uint64(v) + if int(n) <= it.endTxNum { + it.hasNext = false + return + } + if it.startTxNum >= 0 && int(n) <= it.startTxNum { + it.hasNext = true + it.nextN = n + return + } + } + } + + it.hasNext = false +} + +func (it *RecentInvertedIdxIter) advance() { + if it.hasNext { + it.advanceInDB() + } +} + +func (it *RecentInvertedIdxIter) HasNext() bool { + if it.err != nil { // always true, then .Next() call will return this error + return true + } + if it.limit == 0 { // limit reached + return false + } + return it.hasNext +} + +func (it *RecentInvertedIdxIter) Next() (uint64, error) { + if it.err != nil { + return 0, it.err + } + it.limit-- + n := it.nextN + it.advance() + return n, nil +} + +type InvertedIterator1 struct { + roTx kv.Tx + cursor kv.CursorDupSort + indexTable string + key []byte + h ReconHeap + nextKey []byte + nextFileKey []byte + nextDbKey []byte + endTxNum uint64 + startTxNum uint64 + startTxKey [8]byte + hasNextInDb bool + hasNextInFiles bool +} + +func (it *InvertedIterator1) Close() { + if it.cursor != nil { + it.cursor.Close() + } +} + +func (it *InvertedIterator1) advanceInFiles() { + for it.h.Len() > 0 { + top := heap.Pop(&it.h).(*ReconItem) + key := top.key + val, _ := top.g.Next(nil) + if top.g.HasNext() { + top.key, _ = top.g.Next(nil) + heap.Push(&it.h, top) + } + if !bytes.Equal(key, it.key) { + ef, _ := eliasfano32.ReadEliasFano(val) + _min := ef.Get(0) + _max := ef.Max() + if _min < it.endTxNum && _max >= it.startTxNum { // Intersection of [min; max) and [it.startTxNum; it.endTxNum) + it.key = key + it.nextFileKey = key + return + } + } + } + it.hasNextInFiles = false +} + +func (it *InvertedIterator1) advanceInDb() { + var k, v []byte + var err error + if it.cursor == nil { + if it.cursor, err = it.roTx.CursorDupSort(it.indexTable); err != nil { + // TODO pass error properly around + panic(err) + } + if k, _, err = it.cursor.First(); err != nil { + // TODO pass error properly around + panic(err) + } + } else { + if k, _, err = it.cursor.NextNoDup(); err != nil { + panic(err) + } + } + for k != nil { + if v, err = it.cursor.SeekBothRange(k, it.startTxKey[:]); err != nil { + panic(err) + } + if v != nil { + txNum := binary.BigEndian.Uint64(v) + if txNum < it.endTxNum { + it.nextDbKey = append(it.nextDbKey[:0], k...) + return + } + } + if k, _, err = it.cursor.NextNoDup(); err != nil { + panic(err) + } + } + it.cursor.Close() + it.cursor = nil + it.hasNextInDb = false +} + +func (it *InvertedIterator1) advance() { + if it.hasNextInFiles { + if it.hasNextInDb { + c := bytes.Compare(it.nextFileKey, it.nextDbKey) + if c < 0 { + it.nextKey = append(it.nextKey[:0], it.nextFileKey...) + it.advanceInFiles() + } else if c > 0 { + it.nextKey = append(it.nextKey[:0], it.nextDbKey...) + it.advanceInDb() + } else { + it.nextKey = append(it.nextKey[:0], it.nextFileKey...) + it.advanceInDb() + it.advanceInFiles() + } + } else { + it.nextKey = append(it.nextKey[:0], it.nextFileKey...) + it.advanceInFiles() + } + } else if it.hasNextInDb { + it.nextKey = append(it.nextKey[:0], it.nextDbKey...) + it.advanceInDb() + } else { + it.nextKey = nil + } +} + +func (it *InvertedIterator1) HasNext() bool { + return it.hasNextInFiles || it.hasNextInDb || it.nextKey != nil +} + +func (it *InvertedIterator1) Next(keyBuf []byte) []byte { + result := append(keyBuf, it.nextKey...) + it.advance() + return result +} diff --git a/erigon-lib/state/merge.go b/erigon-lib/state/merge.go index 4ade27d1407..5874d86a68b 100644 --- a/erigon-lib/state/merge.go +++ b/erigon-lib/state/merge.go @@ -556,7 +556,6 @@ func (dt *DomainRoTx) mergeFiles(ctx context.Context, domainFiles, indexFiles, h } closeFiles = false - dt.d.stats.MergesCount++ return } diff --git a/turbo/jsonrpc/debug_api.go b/turbo/jsonrpc/debug_api.go index 1100bbc42dd..afbd2f9ab32 100644 --- a/turbo/jsonrpc/debug_api.go +++ b/turbo/jsonrpc/debug_api.go @@ -220,7 +220,7 @@ func (api *PrivateDebugAPIImpl) GetModifiedAccountsByNumber(ctx context.Context, // getModifiedAccounts returns a list of addresses that were modified in the block range // [startNum:endNum) func getModifiedAccounts(tx kv.TemporalTx, startTxNum, endTxNum uint64) ([]common.Address, error) { - it, err := tx.HistoryRange(kv.AccountsHistory, int(startTxNum), int(endTxNum), order.Asc, kv.Unlim) + it, err := tx.HistoryRange(kv.AccountsDomain, int(startTxNum), int(endTxNum), order.Asc, kv.Unlim) if err != nil { return nil, err } diff --git a/turbo/jsonrpc/erigon_block.go b/turbo/jsonrpc/erigon_block.go index cabbf66566d..61a17d3f5f8 100644 --- a/turbo/jsonrpc/erigon_block.go +++ b/turbo/jsonrpc/erigon_block.go @@ -232,7 +232,7 @@ func (api *ErigonImpl) GetBalanceChangesInBlock(ctx context.Context, blockNrOrHa } minTxNum, _ := txNumsReader.Min(tx, blockNumber) - it, err := tx.(kv.TemporalTx).HistoryRange(kv.AccountsHistory, int(minTxNum), -1, order.Asc, -1) + it, err := tx.(kv.TemporalTx).HistoryRange(kv.AccountsDomain, int(minTxNum), -1, order.Asc, -1) if err != nil { return nil, err } diff --git a/turbo/jsonrpc/otterscan_contract_creator.go b/turbo/jsonrpc/otterscan_contract_creator.go index 201bb075daf..3d1d13ec484 100644 --- a/turbo/jsonrpc/otterscan_contract_creator.go +++ b/turbo/jsonrpc/otterscan_contract_creator.go @@ -95,7 +95,7 @@ func (api *OtterscanAPIImpl) GetContractCreator(ctx context.Context, addr common continue } - v, ok, err := ttx.HistorySeek(kv.AccountsHistory, addr[:], txnID) + v, ok, err := ttx.HistorySeek(kv.AccountsDomain, addr[:], txnID) if err != nil { log.Error("Unexpected error, couldn't find changeset", "txNum", txnID, "addr", addr) return nil, err @@ -136,7 +136,7 @@ func (api *OtterscanAPIImpl) GetContractCreator(ctx context.Context, addr common // can be replaced by full-scan over ttx.HistoryRange([prevTxnID, nextTxnID])? idx := sort.Search(int(nextTxnID-prevTxnID), func(i int) bool { txnID := uint64(i) + prevTxnID - v, ok, err := ttx.HistorySeek(kv.AccountsHistory, addr[:], txnID) + v, ok, err := ttx.HistorySeek(kv.AccountsDomain, addr[:], txnID) if err != nil { log.Error("[rpc] Unexpected error, couldn't find changeset", "txNum", i, "addr", addr) panic(err) diff --git a/turbo/jsonrpc/otterscan_transaction_by_sender_and_nonce.go b/turbo/jsonrpc/otterscan_transaction_by_sender_and_nonce.go index daa72c34af9..2411bb10c5c 100644 --- a/turbo/jsonrpc/otterscan_transaction_by_sender_and_nonce.go +++ b/turbo/jsonrpc/otterscan_transaction_by_sender_and_nonce.go @@ -58,7 +58,7 @@ func (api *OtterscanAPIImpl) GetTransactionBySenderAndNonce(ctx context.Context, continue } - v, ok, err := ttx.HistorySeek(kv.AccountsHistory, addr[:], txnID) + v, ok, err := ttx.HistorySeek(kv.AccountsDomain, addr[:], txnID) if err != nil { log.Error("Unexpected error, couldn't find changeset", "txNum", i, "addr", addr) return nil, err @@ -98,7 +98,7 @@ func (api *OtterscanAPIImpl) GetTransactionBySenderAndNonce(ctx context.Context, // can be replaced by full-scan over ttx.HistoryRange([prevTxnID, nextTxnID])? idx := sort.Search(int(nextTxnID-prevTxnID), func(i int) bool { txnID := uint64(i) + prevTxnID - v, ok, err := ttx.HistorySeek(kv.AccountsHistory, addr[:], txnID) + v, ok, err := ttx.HistorySeek(kv.AccountsDomain, addr[:], txnID) if err != nil { log.Error("[rpc] Unexpected error, couldn't find changeset", "txNum", i, "addr", addr) panic(err)