From 788d5480d9ab717f0ab49e8cb98690361b598ad7 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Wed, 3 Aug 2022 16:17:28 -0700 Subject: [PATCH 1/4] DROP tx_index column from log_broadcasts table --- .../migrate/migrations/0137_remove_tx_index.sql | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 core/store/migrate/migrations/0137_remove_tx_index.sql diff --git a/core/store/migrate/migrations/0137_remove_tx_index.sql b/core/store/migrate/migrations/0137_remove_tx_index.sql new file mode 100644 index 00000000000..e2ec7e87929 --- /dev/null +++ b/core/store/migrate/migrations/0137_remove_tx_index.sql @@ -0,0 +1,13 @@ +-- +goose Up +-- +goose StatementBegin +DROP INDEX log_broadcasts_unique_idx; +ALTER TABLE log_broadcasts DROP COLUMN tx_index; +CREATE UNIQUE INDEX log_broadcasts_unique_idx ON log_broadcasts USING BTREE (job_id, block_hash, log_index, evm_chain_id); +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP INDEX log_broadcasts_unique_idx; +ALTER TABLE log_broadcasts ADD COLUMN tx_index BIGINT NOT NULL DEFAULT -1; +CREATE UNIQUE INDEX log_broadcasts_unique_idx ON log_broadcasts USING BTREE (job_id, block_hash, tx_index, log_index, evm_chain_id); +-- +goose StatementEnd From e76be65ee349bb8023cc77c3b8846da699eede7f Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Wed, 3 Aug 2022 16:29:18 -0700 Subject: [PATCH 2/4] Revert "Update log/pool.go & log/pool_test.go for besu (#7106)" This reverts commit 4944dedef198d13db1438b0e75936f836bfdd04e. --- core/chains/evm/log/pool.go | 27 +++++++++------------------ core/chains/evm/log/pool_test.go | 26 ++++++-------------------- 2 files changed, 15 insertions(+), 38 deletions(-) diff --git a/core/chains/evm/log/pool.go b/core/chains/evm/log/pool.go index 5229e83936f..7d0993460ba 100644 --- a/core/chains/evm/log/pool.go +++ b/core/chains/evm/log/pool.go @@ -44,8 +44,8 @@ type logPool struct { // the logs in the pool. hashesByBlockNumbers map[uint64]map[common.Hash]struct{} - // A mapping of block hashes, to tx index within block, to log index, to logs - logsByBlockHash map[common.Hash]map[uint]map[uint]types.Log + // A mapping of block hashes, to log index within block, to logs + logsByBlockHash map[common.Hash]map[uint]types.Log // This min-heap maintains block numbers of logs in the pool. // it helps us easily determine the minimum log block number @@ -56,7 +56,7 @@ type logPool struct { func newLogPool() *logPool { return &logPool{ hashesByBlockNumbers: make(map[uint64]map[common.Hash]struct{}), - logsByBlockHash: make(map[common.Hash]map[uint]map[uint]types.Log), + logsByBlockHash: make(map[common.Hash]map[uint]types.Log), heap: pairingHeap.New(), } } @@ -68,12 +68,9 @@ func (pool *logPool) addLog(log types.Log) bool { } pool.hashesByBlockNumbers[log.BlockNumber][log.BlockHash] = struct{}{} if _, exists := pool.logsByBlockHash[log.BlockHash]; !exists { - pool.logsByBlockHash[log.BlockHash] = make(map[uint]map[uint]types.Log) + pool.logsByBlockHash[log.BlockHash] = make(map[uint]types.Log) } - if _, exists := pool.logsByBlockHash[log.BlockHash][log.TxIndex]; !exists { - pool.logsByBlockHash[log.BlockHash][log.TxIndex] = make(map[uint]types.Log) - } - pool.logsByBlockHash[log.BlockHash][log.TxIndex][log.Index] = log + pool.logsByBlockHash[log.BlockHash][log.Index] = log min := pool.heap.FindMin() pool.heap.Insert(Uint64(log.BlockNumber)) // first or new min @@ -157,11 +154,7 @@ func (pool *logPool) removeBlock(hash common.Hash, number uint64) { } func (pool *logPool) testOnly_getNumLogsForBlock(bh common.Hash) int { - var numLogs int - for _, txLogs := range pool.logsByBlockHash[bh] { - numLogs += len(txLogs) - } - return numLogs + return len(pool.logsByBlockHash[bh]) } type Uint64 uint64 @@ -184,12 +177,10 @@ type logsOnBlock struct { Logs []types.Log } -func newLogsOnBlock(num uint64, logsMap map[uint]map[uint]types.Log) logsOnBlock { +func newLogsOnBlock(num uint64, logsMap map[uint]types.Log) logsOnBlock { logs := make([]types.Log, 0, len(logsMap)) - for _, txLogs := range logsMap { - for _, l := range txLogs { - logs = append(logs, l) - } + for _, l := range logsMap { + logs = append(logs, l) } return logsOnBlock{num, logs} } diff --git a/core/chains/evm/log/pool_test.go b/core/chains/evm/log/pool_test.go index 5f8acc46424..05df5e8e2b2 100644 --- a/core/chains/evm/log/pool_test.go +++ b/core/chains/evm/log/pool_test.go @@ -52,7 +52,6 @@ func TestUnit_AddLog(t *testing.T) { blockHash := common.BigToHash(big.NewInt(1)) l1 := types.Log{ BlockHash: blockHash, - TxIndex: 37, Index: 42, BlockNumber: 1, } @@ -64,39 +63,26 @@ func TestUnit_AddLog(t *testing.T) { assert.False(t, p.addLog(l1), "AddLog should have returned false for a 2nd reattempt") require.Equal(t, 1, p.testOnly_getNumLogsForBlock(blockHash)) - // 2nd log with higher logIndex but same blockhash should add a new log, which shouldn't be minimum + // 2nd log with same loghash should add a new log, which shouldn't be minimum l2 := l1 l2.Index = 43 - assert.False(t, p.addLog(l2), "AddLog should have returned false for later log added") + assert.False(t, p.addLog(l2), "AddLog should have returned false for same log added") require.Equal(t, 2, p.testOnly_getNumLogsForBlock(blockHash)) - // New log with same logIndex but lower txIndex should add a new log, which should be a minimum - l2 = l1 - l2.TxIndex = 13 - assert.False(t, p.addLog(l2), "AddLog should have returned false for earlier log added") - require.Equal(t, 3, p.testOnly_getNumLogsForBlock(blockHash)) - - // New log with different larger BlockNumber should add a new log, not as minimum + // New log with different larger BlockNumber/loghash should add a new log, not as minimum l3 := l1 l3.BlockNumber = 3 l3.BlockHash = common.BigToHash(big.NewInt(3)) assert.False(t, p.addLog(l3), "AddLog should have returned false for same log added") - assert.Equal(t, 3, p.testOnly_getNumLogsForBlock(blockHash)) + assert.Equal(t, 2, p.testOnly_getNumLogsForBlock(blockHash)) require.Equal(t, 1, p.testOnly_getNumLogsForBlock(l3.BlockHash)) - // New log with different smaller BlockNumber should add a new log, as minimum + // New log with different smaller BlockNumber/loghash should add a new log, as minimum l4 := l1 l4.BlockNumber = 0 // New minimum block number l4.BlockHash = common.BigToHash(big.NewInt(0)) assert.True(t, p.addLog(l4), "AddLog should have returned true for smallest BlockNumber") - assert.Equal(t, 3, p.testOnly_getNumLogsForBlock(blockHash)) - assert.Equal(t, 1, p.testOnly_getNumLogsForBlock(l3.BlockHash)) - require.Equal(t, 1, p.testOnly_getNumLogsForBlock(l4.BlockHash)) - - // Adding duplicate log should not increase number of logs in pool - l5 := l1 - assert.False(t, p.addLog(l5), "AddLog should have returned false for smallest BlockNumber") - assert.Equal(t, 3, p.testOnly_getNumLogsForBlock(blockHash)) + assert.Equal(t, 2, p.testOnly_getNumLogsForBlock(blockHash)) assert.Equal(t, 1, p.testOnly_getNumLogsForBlock(l3.BlockHash)) require.Equal(t, 1, p.testOnly_getNumLogsForBlock(l4.BlockHash)) } From b82581717ef65c2d9a17be4537855611567715ba Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Wed, 3 Aug 2022 16:30:38 -0700 Subject: [PATCH 3/4] Revert "Add support for besu's non-unique logIndex (#7072)" This reverts commit 018950665414734645b7d3c1fd976741e09977d7. --- core/chains/evm/log/broadcaster.go | 8 +-- core/chains/evm/log/helpers_test.go | 4 +- core/chains/evm/log/integration_test.go | 8 +-- core/chains/evm/log/orm.go | 58 ++++++++----------- core/chains/evm/log/orm_test.go | 50 ++++++++-------- core/chains/evm/log/registrations.go | 4 +- .../fluxmonitorv2/integrations_test.go | 2 +- core/services/vrf/log_dedupe.go | 4 -- 8 files changed, 60 insertions(+), 78 deletions(-) diff --git a/core/chains/evm/log/broadcaster.go b/core/chains/evm/log/broadcaster.go index b1bb82e1d90..1356eb4df89 100644 --- a/core/chains/evm/log/broadcaster.go +++ b/core/chains/evm/log/broadcaster.go @@ -668,12 +668,12 @@ func (b *broadcaster) maybeWarnOnLargeBlockNumberDifference(logBlockNumber int64 // WasAlreadyConsumed reports whether the given consumer had already consumed the given log func (b *broadcaster) WasAlreadyConsumed(lb Broadcast, qopts ...pg.QOpt) (bool, error) { - return b.orm.WasBroadcastConsumed(lb.RawLog().BlockHash, lb.RawLog().TxIndex, lb.RawLog().Index, lb.JobID(), qopts...) + return b.orm.WasBroadcastConsumed(lb.RawLog().BlockHash, lb.RawLog().Index, lb.JobID(), qopts...) } // MarkConsumed marks the log as having been successfully consumed by the subscriber func (b *broadcaster) MarkConsumed(lb Broadcast, qopts ...pg.QOpt) error { - return b.orm.MarkBroadcastConsumed(lb.RawLog().BlockHash, lb.RawLog().BlockNumber, lb.RawLog().TxIndex, lb.RawLog().Index, lb.JobID(), qopts...) + return b.orm.MarkBroadcastConsumed(lb.RawLog().BlockHash, lb.RawLog().BlockNumber, lb.RawLog().Index, lb.JobID(), qopts...) } // MarkManyConsumed marks the logs as having been successfully consumed by the subscriber @@ -681,18 +681,16 @@ func (b *broadcaster) MarkManyConsumed(lbs []Broadcast, qopts ...pg.QOpt) (err e var ( blockHashes = make([]common.Hash, len(lbs)) blockNumbers = make([]uint64, len(lbs)) - txIndexes = make([]uint, len(lbs)) logIndexes = make([]uint, len(lbs)) jobIDs = make([]int32, len(lbs)) ) for i := range lbs { blockHashes[i] = lbs[i].RawLog().BlockHash blockNumbers[i] = lbs[i].RawLog().BlockNumber - txIndexes[i] = lbs[i].RawLog().TxIndex logIndexes[i] = lbs[i].RawLog().Index jobIDs[i] = lbs[i].JobID() } - return b.orm.MarkBroadcastsConsumed(blockHashes, blockNumbers, txIndexes, logIndexes, jobIDs, qopts...) + return b.orm.MarkBroadcastsConsumed(blockHashes, blockNumbers, logIndexes, jobIDs, qopts...) } // test only diff --git a/core/chains/evm/log/helpers_test.go b/core/chains/evm/log/helpers_test.go index dcf89de18d3..d1a376862b6 100644 --- a/core/chains/evm/log/helpers_test.go +++ b/core/chains/evm/log/helpers_test.go @@ -340,11 +340,11 @@ func (listener *simpleLogListener) handleLogBroadcast(t *testing.T, lggr logger. } func (listener *simpleLogListener) WasAlreadyConsumed(db *sqlx.DB, lggr logger.Logger, cfg pg.LogConfig, broadcast log.Broadcast) (bool, error) { - return log.NewORM(listener.db, lggr, cfg, cltest.FixtureChainID).WasBroadcastConsumed(broadcast.RawLog().BlockHash, broadcast.RawLog().TxIndex, broadcast.RawLog().Index, listener.jobID) + return log.NewORM(listener.db, lggr, cfg, cltest.FixtureChainID).WasBroadcastConsumed(broadcast.RawLog().BlockHash, broadcast.RawLog().Index, listener.jobID) } func (listener *simpleLogListener) MarkConsumed(db *sqlx.DB, lggr logger.Logger, cfg pg.LogConfig, broadcast log.Broadcast) error { - return log.NewORM(listener.db, lggr, cfg, cltest.FixtureChainID).MarkBroadcastConsumed(broadcast.RawLog().BlockHash, broadcast.RawLog().BlockNumber, broadcast.RawLog().TxIndex, broadcast.RawLog().Index, listener.jobID) + return log.NewORM(listener.db, lggr, cfg, cltest.FixtureChainID).MarkBroadcastConsumed(broadcast.RawLog().BlockHash, broadcast.RawLog().BlockNumber, broadcast.RawLog().Index, listener.jobID) } type mockListener struct { diff --git a/core/chains/evm/log/integration_test.go b/core/chains/evm/log/integration_test.go index ecccda9d5b5..1cea96fa581 100644 --- a/core/chains/evm/log/integration_test.go +++ b/core/chains/evm/log/integration_test.go @@ -366,7 +366,7 @@ func TestBroadcaster_BackfillUnconsumedAfterCrash(t *testing.T) { require.Equal(t, int64(log2.BlockNumber), *blockNum) require.NotEmpty(t, listener.getUniqueLogs()) require.Empty(t, listener2.getUniqueLogs()) - c, err := orm.WasBroadcastConsumed(log1.BlockHash, log1.TxIndex, log1.Index, listener.JobID()) + c, err := orm.WasBroadcastConsumed(log1.BlockHash, log1.Index, listener.JobID()) require.NoError(t, err) require.False(t, c) @@ -404,10 +404,10 @@ func TestBroadcaster_BackfillUnconsumedAfterCrash(t *testing.T) { require.Nil(t, blockNum) require.NotEmpty(t, listener.getUniqueLogs()) require.NotEmpty(t, listener2.getUniqueLogs()) - c, err = orm.WasBroadcastConsumed(log1.BlockHash, log1.TxIndex, log1.Index, listener.JobID()) + c, err = orm.WasBroadcastConsumed(log1.BlockHash, log1.Index, listener.JobID()) require.NoError(t, err) require.True(t, c) - c, err = orm.WasBroadcastConsumed(log2.BlockHash, log2.TxIndex, log2.Index, listener2.JobID()) + c, err = orm.WasBroadcastConsumed(log2.BlockHash, log2.Index, listener2.JobID()) require.NoError(t, err) require.False(t, c) @@ -443,7 +443,7 @@ func TestBroadcaster_BackfillUnconsumedAfterCrash(t *testing.T) { require.Nil(t, blockNum) require.Empty(t, listener.getUniqueLogs()) require.NotEmpty(t, listener2.getUniqueLogs()) - c, err = orm.WasBroadcastConsumed(log2.BlockHash, log2.TxIndex, log2.Index, listener2.JobID()) + c, err = orm.WasBroadcastConsumed(log2.BlockHash, log2.Index, listener2.JobID()) require.NoError(t, err) require.True(t, c) } diff --git a/core/chains/evm/log/orm.go b/core/chains/evm/log/orm.go index 211332835b3..d596649f278 100644 --- a/core/chains/evm/log/orm.go +++ b/core/chains/evm/log/orm.go @@ -28,13 +28,13 @@ type ORM interface { // FindBroadcasts returns broadcasts for a range of block numbers, both consumed and unconsumed. FindBroadcasts(fromBlockNum int64, toBlockNum int64) ([]LogBroadcast, error) // CreateBroadcast inserts an unconsumed log broadcast for jobID. - CreateBroadcast(blockHash common.Hash, blockNumber uint64, txIndex uint, logIndex uint, jobID int32, qopts ...pg.QOpt) error + CreateBroadcast(blockHash common.Hash, blockNumber uint64, logIndex uint, jobID int32, qopts ...pg.QOpt) error // WasBroadcastConsumed returns true if jobID consumed the log broadcast. - WasBroadcastConsumed(blockHash common.Hash, txIndex uint, logIndex uint, jobID int32, qopts ...pg.QOpt) (bool, error) + WasBroadcastConsumed(blockHash common.Hash, logIndex uint, jobID int32, qopts ...pg.QOpt) (bool, error) // MarkBroadcastConsumed marks the log broadcast as consumed by jobID. - MarkBroadcastConsumed(blockHash common.Hash, blockNumber uint64, txIndex uint, logIndex uint, jobID int32, qopts ...pg.QOpt) error + MarkBroadcastConsumed(blockHash common.Hash, blockNumber uint64, logIndex uint, jobID int32, qopts ...pg.QOpt) error // MarkBroadcastsConsumed marks the log broadcasts as consumed by jobID. - MarkBroadcastsConsumed(blockHashes []common.Hash, blockNumbers []uint64, txIndexes []uint, logIndexes []uint, jobIDs []int32, qopts ...pg.QOpt) error + MarkBroadcastsConsumed(blockHashes []common.Hash, blockNumbers []uint64, logIndexes []uint, jobIDs []int32, qopts ...pg.QOpt) error // MarkBroadcastsUnconsumed marks all log broadcasts from all jobs on or after fromBlock as // unconsumed. MarkBroadcastsUnconsumed(fromBlock int64, qopts ...pg.QOpt) error @@ -60,18 +60,16 @@ func NewORM(db *sqlx.DB, lggr logger.Logger, cfg pg.LogConfig, evmChainID big.In return &orm{pg.NewQ(db, lggr, cfg), *utils.NewBig(&evmChainID)} } -func (o *orm) WasBroadcastConsumed(blockHash common.Hash, txIndex uint, logIndex uint, jobID int32, qopts ...pg.QOpt) (consumed bool, err error) { +func (o *orm) WasBroadcastConsumed(blockHash common.Hash, logIndex uint, jobID int32, qopts ...pg.QOpt) (consumed bool, err error) { query := ` SELECT consumed FROM log_broadcasts WHERE block_hash = $1 - AND (tx_index = -1 OR tx_index = $2) - AND log_index = $3 - AND job_id = $4 - AND evm_chain_id = $5 + AND log_index = $2 + AND job_id = $3 + AND evm_chain_id = $4 ` args := []interface{}{ blockHash, - txIndex, logIndex, jobID, o.evmChainID, @@ -87,7 +85,7 @@ func (o *orm) WasBroadcastConsumed(blockHash common.Hash, txIndex uint, logIndex func (o *orm) FindBroadcasts(fromBlockNum int64, toBlockNum int64) ([]LogBroadcast, error) { var broadcasts []LogBroadcast query := ` - SELECT block_hash, consumed, tx_index, log_index, job_id FROM log_broadcasts + SELECT block_hash, consumed, log_index, job_id FROM log_broadcasts WHERE block_number >= $1 AND block_number <= $2 AND evm_chain_id = $3 @@ -99,55 +97,53 @@ func (o *orm) FindBroadcasts(fromBlockNum int64, toBlockNum int64) ([]LogBroadca return broadcasts, err } -func (o *orm) CreateBroadcast(blockHash common.Hash, blockNumber uint64, txIndex uint, logIndex uint, jobID int32, qopts ...pg.QOpt) error { +func (o *orm) CreateBroadcast(blockHash common.Hash, blockNumber uint64, logIndex uint, jobID int32, qopts ...pg.QOpt) error { q := o.q.WithOpts(qopts...) err := q.ExecQ(` - INSERT INTO log_broadcasts (block_hash, block_number, tx_index, log_index, job_id, created_at, updated_at, consumed, evm_chain_id) - VALUES ($1, $2, $3, $4, $5, NOW(), NOW(), false, $6) - `, blockHash, blockNumber, txIndex, logIndex, jobID, o.evmChainID) + INSERT INTO log_broadcasts (block_hash, block_number, log_index, job_id, created_at, updated_at, consumed, evm_chain_id) + VALUES ($1, $2, $3, $4, NOW(), NOW(), false, $5) + `, blockHash, blockNumber, logIndex, jobID, o.evmChainID) return errors.Wrap(err, "failed to create log broadcast") } -func (o *orm) MarkBroadcastConsumed(blockHash common.Hash, blockNumber uint64, txIndex uint, logIndex uint, jobID int32, qopts ...pg.QOpt) error { +func (o *orm) MarkBroadcastConsumed(blockHash common.Hash, blockNumber uint64, logIndex uint, jobID int32, qopts ...pg.QOpt) error { q := o.q.WithOpts(qopts...) err := q.ExecQ(` - INSERT INTO log_broadcasts (block_hash, block_number, tx_index, log_index, job_id, created_at, updated_at, consumed, evm_chain_id) - VALUES ($1, $2, $3, $4, $5, NOW(), NOW(), true, $6) - ON CONFLICT (job_id, block_hash, tx_index, log_index, evm_chain_id) DO UPDATE + INSERT INTO log_broadcasts (block_hash, block_number, log_index, job_id, created_at, updated_at, consumed, evm_chain_id) + VALUES ($1, $2, $3, $4, NOW(), NOW(), true, $5) + ON CONFLICT (job_id, block_hash, log_index, evm_chain_id) DO UPDATE SET consumed = true, updated_at = NOW() - `, blockHash, blockNumber, txIndex, logIndex, jobID, o.evmChainID) + `, blockHash, blockNumber, logIndex, jobID, o.evmChainID) return errors.Wrap(err, "failed to mark log broadcast as consumed") } // MarkBroadcastsConsumed marks many broadcasts as consumed. // The lengths of all the provided slices must be equal, otherwise an error is returned. -func (o *orm) MarkBroadcastsConsumed(blockHashes []common.Hash, blockNumbers []uint64, txIndexes []uint, logIndexes []uint, jobIDs []int32, qopts ...pg.QOpt) error { - if !utils.AllEqual(len(blockHashes), len(blockNumbers), len(txIndexes), len(logIndexes), len(jobIDs)) { - return fmt.Errorf("all arg slice lengths must be equal, got: %d %d %d %d %d", - len(blockHashes), len(blockNumbers), len(txIndexes), len(logIndexes), len(jobIDs), +func (o *orm) MarkBroadcastsConsumed(blockHashes []common.Hash, blockNumbers []uint64, logIndexes []uint, jobIDs []int32, qopts ...pg.QOpt) error { + if !utils.AllEqual(len(blockHashes), len(blockNumbers), len(logIndexes), len(jobIDs)) { + return fmt.Errorf("all arg slice lengths must be equal, got: %d %d %d %d", + len(blockHashes), len(blockNumbers), len(logIndexes), len(jobIDs), ) } type input struct { BlockHash common.Hash `db:"blockHash"` BlockNumber uint64 `db:"blockNumber"` - TxIndex uint `db:"txIndex"` LogIndex uint `db:"logIndex"` JobID int32 `db:"jobID"` ChainID utils.Big `db:"chainID"` } inputs := make([]input, len(blockHashes)) query := ` -INSERT INTO log_broadcasts (block_hash, block_number, tx_index, log_index, job_id, created_at, updated_at, consumed, evm_chain_id) -VALUES (:blockHash, :blockNumber, :txIndex, :logIndex, :jobID, NOW(), NOW(), true, :chainID) -ON CONFLICT (job_id, block_hash, tx_index, log_index, evm_chain_id) DO UPDATE +INSERT INTO log_broadcasts (block_hash, block_number, log_index, job_id, created_at, updated_at, consumed, evm_chain_id) +VALUES (:blockHash, :blockNumber, :logIndex, :jobID, NOW(), NOW(), true, :chainID) +ON CONFLICT (job_id, block_hash, log_index, evm_chain_id) DO UPDATE SET consumed = true, updated_at = NOW(); ` for i := range blockHashes { inputs[i] = input{ BlockHash: blockHashes[i], BlockNumber: blockNumbers[i], - TxIndex: txIndexes[i], LogIndex: logIndexes[i], JobID: jobIDs[i], ChainID: o.evmChainID, @@ -255,7 +251,6 @@ func (o *orm) removeUnconsumed(qopts ...pg.QOpt) error { type LogBroadcast struct { BlockHash common.Hash Consumed bool - TxIndex uint LogIndex uint JobID int32 } @@ -263,7 +258,6 @@ type LogBroadcast struct { func (b LogBroadcast) AsKey() LogBroadcastAsKey { return LogBroadcastAsKey{ b.BlockHash, - b.TxIndex, b.LogIndex, b.JobID, } @@ -272,7 +266,6 @@ func (b LogBroadcast) AsKey() LogBroadcastAsKey { // LogBroadcastAsKey - used as key in a map to filter out already consumed logs type LogBroadcastAsKey struct { BlockHash common.Hash - TxIndex uint // Needed for uniqueness with besu client LogIndex uint JobId int32 } @@ -280,7 +273,6 @@ type LogBroadcastAsKey struct { func NewLogBroadcastAsKey(log types.Log, listener Listener) LogBroadcastAsKey { return LogBroadcastAsKey{ log.BlockHash, - log.TxIndex, log.Index, listener.JobID(), } diff --git a/core/chains/evm/log/orm_test.go b/core/chains/evm/log/orm_test.go index e731becbde7..ee619e1637f 100644 --- a/core/chains/evm/log/orm_test.go +++ b/core/chains/evm/log/orm_test.go @@ -43,12 +43,12 @@ func TestORM_broadcasts(t *testing.T) { require.Zero(t, rowsAffected) t.Run("WasBroadcastConsumed_DNE", func(t *testing.T) { - _, err := orm.WasBroadcastConsumed(rawLog.BlockHash, rawLog.TxIndex, rawLog.Index, listener.JobID()) + _, err := orm.WasBroadcastConsumed(rawLog.BlockHash, rawLog.Index, listener.JobID()) require.NoError(t, err) }) require.True(t, t.Run("CreateBroadcast", func(t *testing.T) { - err := orm.CreateBroadcast(rawLog.BlockHash, rawLog.BlockNumber, rawLog.TxIndex, rawLog.Index, listener.JobID()) + err := orm.CreateBroadcast(rawLog.BlockHash, rawLog.BlockNumber, rawLog.Index, listener.JobID()) require.NoError(t, err) var consumed null.Bool @@ -58,13 +58,13 @@ func TestORM_broadcasts(t *testing.T) { })) t.Run("WasBroadcastConsumed_false", func(t *testing.T) { - was, err := orm.WasBroadcastConsumed(rawLog.BlockHash, rawLog.TxIndex, rawLog.Index, listener.JobID()) + was, err := orm.WasBroadcastConsumed(rawLog.BlockHash, rawLog.Index, listener.JobID()) require.NoError(t, err) require.False(t, was) }) require.True(t, t.Run("MarkBroadcastConsumed", func(t *testing.T) { - err := orm.MarkBroadcastConsumed(rawLog.BlockHash, rawLog.BlockNumber, rawLog.TxIndex, rawLog.Index, listener.JobID()) + err := orm.MarkBroadcastConsumed(rawLog.BlockHash, rawLog.BlockNumber, rawLog.Index, listener.JobID()) require.NoError(t, err) var consumed null.Bool @@ -78,26 +78,24 @@ func TestORM_broadcasts(t *testing.T) { err error blockHashes []common.Hash blockNumbers []uint64 - txIndexes []uint logIndexes []uint jobIDs []int32 ) for i := 0; i < 3; i++ { l := cltest.RandomLog(t) - err = orm.CreateBroadcast(l.BlockHash, l.BlockNumber, l.TxIndex, l.Index, listener.JobID()) + err = orm.CreateBroadcast(l.BlockHash, l.BlockNumber, l.Index, listener.JobID()) require.NoError(t, err) blockHashes = append(blockHashes, l.BlockHash) blockNumbers = append(blockNumbers, l.BlockNumber) - txIndexes = append(txIndexes, l.TxIndex) logIndexes = append(logIndexes, l.Index) jobIDs = append(jobIDs, listener.JobID()) } - err = orm.MarkBroadcastsConsumed(blockHashes, blockNumbers, txIndexes, logIndexes, jobIDs) + err = orm.MarkBroadcastsConsumed(blockHashes, blockNumbers, logIndexes, jobIDs) require.NoError(t, err) for i := range blockHashes { - was, err := orm.WasBroadcastConsumed(blockHashes[i], txIndexes[i], logIndexes[i], jobIDs[i]) + was, err := orm.WasBroadcastConsumed(blockHashes[i], logIndexes[i], jobIDs[i]) require.NoError(t, err) require.True(t, was) } @@ -108,27 +106,25 @@ func TestORM_broadcasts(t *testing.T) { err error blockHashes []common.Hash blockNumbers []uint64 - txIndexes []uint logIndexes []uint jobIDs []int32 ) for i := 0; i < 5; i++ { l := cltest.RandomLog(t) - err = orm.CreateBroadcast(l.BlockHash, l.BlockNumber, l.TxIndex, l.Index, listener.JobID()) + err = orm.CreateBroadcast(l.BlockHash, l.BlockNumber, l.Index, listener.JobID()) require.NoError(t, err) blockHashes = append(blockHashes, l.BlockHash) blockNumbers = append(blockNumbers, l.BlockNumber) - txIndexes = append(txIndexes, l.TxIndex) logIndexes = append(logIndexes, l.Index) jobIDs = append(jobIDs, listener.JobID()) } - err = orm.MarkBroadcastsConsumed(blockHashes[:len(blockHashes)-2], blockNumbers, txIndexes, logIndexes, jobIDs) + err = orm.MarkBroadcastsConsumed(blockHashes[:len(blockHashes)-2], blockNumbers, logIndexes, jobIDs) require.Error(t, err) }) t.Run("WasBroadcastConsumed_true", func(t *testing.T) { - was, err := orm.WasBroadcastConsumed(rawLog.BlockHash, rawLog.TxIndex, rawLog.Index, listener.JobID()) + was, err := orm.WasBroadcastConsumed(rawLog.BlockHash, rawLog.Index, listener.JobID()) require.NoError(t, err) require.True(t, was) }) @@ -177,36 +173,36 @@ func TestORM_MarkUnconsumed(t *testing.T) { logBefore := cltest.RandomLog(t) logBefore.BlockNumber = 34 require.NoError(t, - orm.CreateBroadcast(logBefore.BlockHash, logBefore.BlockNumber, logBefore.TxIndex, logBefore.Index, job1.ID)) + orm.CreateBroadcast(logBefore.BlockHash, logBefore.BlockNumber, logBefore.Index, job1.ID)) require.NoError(t, - orm.MarkBroadcastConsumed(logBefore.BlockHash, logBefore.BlockNumber, logBefore.TxIndex, logBefore.Index, job1.ID)) + orm.MarkBroadcastConsumed(logBefore.BlockHash, logBefore.BlockNumber, logBefore.Index, job1.ID)) logAt := cltest.RandomLog(t) logAt.BlockNumber = 38 require.NoError(t, - orm.CreateBroadcast(logAt.BlockHash, logAt.BlockNumber, logAt.TxIndex, logAt.Index, job1.ID)) + orm.CreateBroadcast(logAt.BlockHash, logAt.BlockNumber, logAt.Index, job1.ID)) require.NoError(t, - orm.MarkBroadcastConsumed(logAt.BlockHash, logAt.BlockNumber, logAt.TxIndex, logAt.Index, job1.ID)) + orm.MarkBroadcastConsumed(logAt.BlockHash, logAt.BlockNumber, logAt.Index, job1.ID)) logAfter := cltest.RandomLog(t) logAfter.BlockNumber = 40 require.NoError(t, - orm.CreateBroadcast(logAfter.BlockHash, logAfter.BlockNumber, logAfter.TxIndex, logAfter.Index, job2.ID)) + orm.CreateBroadcast(logAfter.BlockHash, logAfter.BlockNumber, logAfter.Index, job2.ID)) require.NoError(t, - orm.MarkBroadcastConsumed(logAfter.BlockHash, logAfter.BlockNumber, logAfter.TxIndex, logAfter.Index, job2.ID)) + orm.MarkBroadcastConsumed(logAfter.BlockHash, logAfter.BlockNumber, logAfter.Index, job2.ID)) // logAt and logAfter should now be marked unconsumed. logBefore is still consumed. require.NoError(t, orm.MarkBroadcastsUnconsumed(38)) - consumed, err := orm.WasBroadcastConsumed(logBefore.BlockHash, logBefore.TxIndex, logBefore.Index, job1.ID) + consumed, err := orm.WasBroadcastConsumed(logBefore.BlockHash, logBefore.Index, job1.ID) require.NoError(t, err) require.True(t, consumed) - consumed, err = orm.WasBroadcastConsumed(logAt.BlockHash, logAt.TxIndex, logAt.Index, job1.ID) + consumed, err = orm.WasBroadcastConsumed(logAt.BlockHash, logAt.Index, job1.ID) require.NoError(t, err) require.False(t, consumed) - consumed, err = orm.WasBroadcastConsumed(logAfter.BlockHash, logAfter.TxIndex, logAfter.Index, job2.ID) + consumed, err = orm.WasBroadcastConsumed(logAfter.BlockHash, logAfter.Index, job2.ID) require.NoError(t, err) require.False(t, consumed) } @@ -219,13 +215,13 @@ func TestORM_Reinitialize(t *testing.T) { var unconsumed = func(blockNum int64) TestLogBroadcast { hash := common.BigToHash(big.NewInt(rand.Int63())) return TestLogBroadcast{*big.NewInt(blockNum), - log.LogBroadcast{hash, false, uint(rand.Uint32()), uint(rand.Uint32()), 0}, + log.LogBroadcast{hash, false, uint(rand.Uint32()), 0}, } } var consumed = func(blockNum int64) TestLogBroadcast { hash := common.BigToHash(big.NewInt(rand.Int63())) return TestLogBroadcast{*big.NewInt(blockNum), - log.LogBroadcast{hash, true, uint(rand.Uint32()), uint(rand.Uint32()), 0}, + log.LogBroadcast{hash, true, uint(rand.Uint32()), 0}, } } @@ -269,10 +265,10 @@ func TestORM_Reinitialize(t *testing.T) { for _, b := range tt.broadcasts { if b.Consumed { - err := orm.MarkBroadcastConsumed(b.BlockHash, b.BlockNumber.Uint64(), b.TxIndex, b.LogIndex, jobID) + err := orm.MarkBroadcastConsumed(b.BlockHash, b.BlockNumber.Uint64(), b.LogIndex, jobID) require.NoError(t, err) } else { - err := orm.CreateBroadcast(b.BlockHash, b.BlockNumber.Uint64(), b.TxIndex, b.LogIndex, jobID) + err := orm.CreateBroadcast(b.BlockHash, b.BlockNumber.Uint64(), b.LogIndex, jobID) require.NoError(t, err) } } diff --git a/core/chains/evm/log/registrations.go b/core/chains/evm/log/registrations.go index a0ca3bef070..893a868bdb1 100644 --- a/core/chains/evm/log/registrations.go +++ b/core/chains/evm/log/registrations.go @@ -388,7 +388,7 @@ func (r *handler) isAddressRegistered(addr common.Address) bool { var _ broadcastCreator = &orm{} type broadcastCreator interface { - CreateBroadcast(blockHash common.Hash, blockNumber uint64, txIndex uint, logIndex uint, jobID int32, pqOpts ...pg.QOpt) error + CreateBroadcast(blockHash common.Hash, blockNumber uint64, logIndex uint, jobID int32, pqOpts ...pg.QOpt) error } func (r *handler) sendLog(log types.Log, latestHead evmtypes.Head, @@ -427,7 +427,7 @@ func (r *handler) sendLog(log types.Log, latestHead evmtypes.Head, jobID := sub.listener.JobID() if !exists { // Create unconsumed broadcast - if err := bc.CreateBroadcast(log.BlockHash, log.BlockNumber, log.TxIndex, log.Index, jobID); err != nil { + if err := bc.CreateBroadcast(log.BlockHash, log.BlockNumber, log.Index, jobID); err != nil { logger.Errorw("Could not create broadcast log", "blockNumber", log.BlockNumber, "blockHash", log.BlockHash, "address", log.Address, "jobID", jobID, "error", err) continue diff --git a/core/services/fluxmonitorv2/integrations_test.go b/core/services/fluxmonitorv2/integrations_test.go index d164e7758a6..6113672f948 100644 --- a/core/services/fluxmonitorv2/integrations_test.go +++ b/core/services/fluxmonitorv2/integrations_test.go @@ -425,7 +425,7 @@ func checkLogWasConsumed(t *testing.T, fa fluxAggregatorUniverse, db *sqlx.DB, p block := fa.backend.Blockchain().GetBlockByNumber(blockNumber) require.NotNil(t, block) orm := log.NewORM(db, lggr, cfg, fa.evmChainID) - consumed, err := orm.WasBroadcastConsumed(block.Hash(), 0, 0, pipelineSpecID) + consumed, err := orm.WasBroadcastConsumed(block.Hash(), 0, pipelineSpecID) require.NoError(t, err) fa.backend.Commit() return consumed diff --git a/core/services/vrf/log_dedupe.go b/core/services/vrf/log_dedupe.go index edfaf5ed373..c0530a88d38 100644 --- a/core/services/vrf/log_dedupe.go +++ b/core/services/vrf/log_dedupe.go @@ -43,9 +43,6 @@ type logKey struct { // blockNumber of the block the log was included in. This is necessary to prune old logs. blockNumber uint64 - // transaction id of the transaction which generated this log - txIndex uint - // logIndex of the log in the block. logIndex uint } @@ -58,7 +55,6 @@ func (l *logDeduper) shouldDeliver(log types.Log) bool { key := logKey{ blockHash: log.BlockHash, blockNumber: log.BlockNumber, - txIndex: log.TxIndex, logIndex: log.Index, } From 5c59a6d15864a216e02a906087e7f4650f8bb896 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Thu, 4 Aug 2022 11:09:56 -0400 Subject: [PATCH 4/4] Minor fixes --- README.md | 5 +++++ core/store/migrate/migrations/0137_remove_tx_index.sql | 2 +- docs/CHANGELOG.md | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index adb70397c7f..ba646f7bda5 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,11 @@ Ethereum node versions currently tested and supported: [Officially supported] - [Parity/Openethereum](https://github.com/openethereum/openethereum) (NOTE: Parity is deprecated and support for this client may be removed in future) - [Geth](https://github.com/ethereum/go-ethereum/releases) + +[Supported but RPC node has bugs] + +Chainlink supports these clients, but the actual implementations have bugs that prevent Chainlink from working reliably on these platforms. + - [Nethermind](https://github.com/NethermindEth/nethermind) - [Besu](https://github.com/hyperledger/besu) diff --git a/core/store/migrate/migrations/0137_remove_tx_index.sql b/core/store/migrate/migrations/0137_remove_tx_index.sql index e2ec7e87929..48f75a3a016 100644 --- a/core/store/migrate/migrations/0137_remove_tx_index.sql +++ b/core/store/migrate/migrations/0137_remove_tx_index.sql @@ -1,6 +1,6 @@ -- +goose Up -- +goose StatementBegin -DROP INDEX log_broadcasts_unique_idx; +DROP INDEX IF EXISTS log_broadcasts_unique_idx; ALTER TABLE log_broadcasts DROP COLUMN tx_index; CREATE UNIQUE INDEX log_broadcasts_unique_idx ON log_broadcasts USING BTREE (job_id, block_hash, log_index, evm_chain_id); -- +goose StatementEnd diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 3c1d42538a1..d375cc1e14b 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -14,7 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `p2pv2Bootstrappers` has been added as a new optional property of OCR1 job specs; default may still be specified with P2PV2_BOOTSTRAPPERS config param - Added official support for Sepolia chain - Added `hexdecode` and `base64decode` tasks (pipeline). -- Added official support for Besu execution client. +- Added support for Besu execution client (note that while Chainlink supports Besu, Besu itself [has](https://github.com/hyperledger/besu/issues/4212) [multiple](https://github.com/hyperledger/besu/issues/4192) [bugs](https://github.com/hyperledger/besu/issues/4114) that make it unreliable). - Added the functionality to allow the root admin CLI user (and any additional admin users created) to create and assign tiers of role based access to new users. These new API users will be able to log in to the Operator UI independently, and can each have specific roles tied to their account. There are four roles: `admin`, `edit`, `run`, and `view`. - User management can be configured through the use of the new admin CLI command `chainlink admin users`. Be sure to run `chainlink adamin login`. For example, a readonly user can be created with: `chainlink admin users create --email=operator-ui-read-only@test.com --role=view`. - Updated documentation repo with a break down of actions to required role level