diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index a837d8d..d0b1903 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -4,7 +4,6 @@ import ( "fmt" "math/big" "sort" - "sync" "time" "github.com/rs/zerolog/log" @@ -57,7 +56,7 @@ func (c *Committer) Start() { log.Error().Err(err).Msg("Error getting block data to commit") continue } - if len(blockDataToCommit) == 0 { + if len(*blockDataToCommit) == 0 { log.Debug().Msg("No block data to commit") continue } @@ -95,7 +94,7 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) { return blockNumbers, nil } -func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error) { +func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error) { blocksToCommit, err := c.getBlockNumbersToCommit() if err != nil { return nil, fmt.Errorf("error determining blocks to commit: %v", err) @@ -104,129 +103,65 @@ func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error) return nil, nil } - blocksData, err := c.storage.StagingStorage.GetBlockData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.ChainID}) + blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.ChainID}) if err != nil { return nil, fmt.Errorf("error fetching blocks to commit: %v", err) } - if len(blocksData) == 0 { + if len(*blocksData) == 0 { log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64()) return nil, nil } // Sort blocks by block number - sort.Slice(blocksData, func(i, j int) bool { - return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0 + sort.Slice(*blocksData, func(i, j int) bool { + return (*blocksData)[i].Block.Number.Cmp((*blocksData)[j].Block.Number) < 0 }) - if blocksData[0].Block.Number.Cmp(blocksToCommit[0]) != 0 { - return nil, c.handleGap(blocksToCommit[0], blocksData[0].Block) + if (*blocksData)[0].Block.Number.Cmp(blocksToCommit[0]) != 0 { + return nil, c.handleGap(blocksToCommit[0], (*blocksData)[0].Block) } var sequentialBlockData []common.BlockData - sequentialBlockData = append(sequentialBlockData, blocksData[0]) - expectedBlockNumber := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1)) + sequentialBlockData = append(sequentialBlockData, (*blocksData)[0]) + expectedBlockNumber := new(big.Int).Add((*blocksData)[0].Block.Number, big.NewInt(1)) - for i := 1; i < len(blocksData); i++ { - if blocksData[i].Block.Number.Cmp(expectedBlockNumber) != 0 { + for i := 1; i < len(*blocksData); i++ { + if (*blocksData)[i].Block.Number.Cmp(expectedBlockNumber) != 0 { // Note: Gap detected, stop here - log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), blocksData[i-1].Block.Number.String()) + log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), (*blocksData)[i-1].Block.Number.String()) // increment the a gap counter in prometheus metrics.GapCounter.Inc() // record the first missed block number in prometheus - metrics.MissedBlockNumbers.Set(float64(blocksData[0].Block.Number.Int64())) + metrics.MissedBlockNumbers.Set(float64((*blocksData)[0].Block.Number.Int64())) break } - sequentialBlockData = append(sequentialBlockData, blocksData[i]) + sequentialBlockData = append(sequentialBlockData, (*blocksData)[i]) expectedBlockNumber.Add(expectedBlockNumber, big.NewInt(1)) } - return sequentialBlockData, nil + return &sequentialBlockData, nil } -func (c *Committer) commit(blockData []common.BlockData) error { - blockNumbers := make([]*big.Int, len(blockData)) - for i, block := range blockData { +func (c *Committer) commit(blockData *[]common.BlockData) error { + blockNumbers := make([]*big.Int, len(*blockData)) + for i, block := range *blockData { blockNumbers[i] = block.Block.Number } log.Debug().Msgf("Committing %d blocks", len(blockNumbers)) // TODO if next parts (saving or deleting) fail, we'll have to do a rollback - if err := c.saveDataToMainStorage(blockData); err != nil { + if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil { log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers) return fmt.Errorf("error saving data to main storage: %v", err) } - if err := c.storage.StagingStorage.DeleteBlockData(blockData); err != nil { + if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil { return fmt.Errorf("error deleting data from staging storage: %v", err) } // Update metrics for successful commits - metrics.SuccessfulCommits.Add(float64(len(blockData))) - metrics.LastCommittedBlock.Set(float64(blockData[len(blockData)-1].Block.Number.Int64())) - - return nil -} - -func (c *Committer) saveDataToMainStorage(blockData []common.BlockData) error { - var commitWg sync.WaitGroup - commitWg.Add(4) - - var commitErr error - var commitErrMutex sync.Mutex - - blocks := make([]common.Block, 0, len(blockData)) - logs := make([]common.Log, 0) - transactions := make([]common.Transaction, 0) - traces := make([]common.Trace, 0) - - for _, block := range blockData { - blocks = append(blocks, block.Block) - logs = append(logs, block.Logs...) - transactions = append(transactions, block.Transactions...) - traces = append(traces, block.Traces...) - } - - go func() { - defer commitWg.Done() - if err := c.storage.MainStorage.InsertBlocks(blocks); err != nil { - commitErrMutex.Lock() - commitErr = fmt.Errorf("error inserting blocks: %v", err) - commitErrMutex.Unlock() - } - }() - - go func() { - defer commitWg.Done() - if err := c.storage.MainStorage.InsertLogs(logs); err != nil { - commitErrMutex.Lock() - commitErr = fmt.Errorf("error inserting logs: %v", err) - commitErrMutex.Unlock() - } - }() - - go func() { - defer commitWg.Done() - if err := c.storage.MainStorage.InsertTransactions(transactions); err != nil { - commitErrMutex.Lock() - commitErr = fmt.Errorf("error inserting transactions: %v", err) - commitErrMutex.Unlock() - } - }() - - go func() { - defer commitWg.Done() - if err := c.storage.MainStorage.InsertTraces(traces); err != nil { - commitErrMutex.Lock() - commitErr = fmt.Errorf("error inserting traces: %v", err) - commitErrMutex.Unlock() - } - }() - - commitWg.Wait() - - if commitErr != nil { - return commitErr - } + metrics.SuccessfulCommits.Add(float64(len(*blockData))) + metrics.LastCommittedBlock.Set(float64((*blockData)[len(*blockData)-1].Block.Number.Int64())) return nil } diff --git a/internal/orchestrator/failure_recoverer.go b/internal/orchestrator/failure_recoverer.go index 16d7228..a8d25cd 100644 --- a/internal/orchestrator/failure_recoverer.go +++ b/internal/orchestrator/failure_recoverer.go @@ -114,7 +114,7 @@ func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFail failuresToDelete = append(failuresToDelete, blockFailureForBlock) } } - if err := fr.storage.StagingStorage.InsertBlockData(successfulResults); err != nil { + if err := fr.storage.StagingStorage.InsertStagingData(successfulResults); err != nil { log.Error().Err(fmt.Errorf("error inserting block data in failure recoverer: %v", err)) return } diff --git a/internal/orchestrator/poller.go b/internal/orchestrator/poller.go index d8f27e3..fa8326a 100644 --- a/internal/orchestrator/poller.go +++ b/internal/orchestrator/poller.go @@ -192,7 +192,7 @@ func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) { Traces: result.Data.Traces, }) } - if err := p.storage.StagingStorage.InsertBlockData(blockData); err != nil { + if err := p.storage.StagingStorage.InsertStagingData(blockData); err != nil { e := fmt.Errorf("error inserting block data: %v", err) log.Error().Err(e) for _, result := range successfulResults { diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 23ee2f0..f831d65 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -7,6 +7,7 @@ import ( "fmt" "math/big" "strings" + "sync" "time" "github.com/ClickHouse/clickhouse-go/v2" @@ -60,7 +61,7 @@ func connectDB(cfg *config.ClickhouseConfig) (clickhouse.Conn, error) { return conn, nil } -func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error { +func (c *ClickHouseConnector) insertBlocks(blocks *[]common.Block) error { query := ` INSERT INTO ` + c.cfg.Database + `.blocks ( chain_id, number, timestamp, hash, parent_hash, sha3_uncles, nonce, @@ -73,7 +74,7 @@ func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error { if err != nil { return err } - for _, block := range blocks { + for _, block := range *blocks { err := batch.Append( block.ChainId, block.Number, @@ -105,7 +106,7 @@ func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error { return batch.Send() } -func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error { +func (c *ClickHouseConnector) insertTransactions(txs *[]common.Transaction) error { query := ` INSERT INTO ` + c.cfg.Database + `.transactions ( chain_id, hash, nonce, block_hash, block_number, block_timestamp, transaction_index, @@ -117,7 +118,7 @@ func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error if err != nil { return err } - for _, tx := range txs { + for _, tx := range *txs { err := batch.Append( tx.ChainId, tx.Hash, @@ -147,7 +148,7 @@ func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error return batch.Send() } -func (c *ClickHouseConnector) InsertLogs(logs []common.Log) error { +func (c *ClickHouseConnector) insertLogs(logs *[]common.Log) error { query := ` INSERT INTO ` + c.cfg.Database + `.logs ( chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index, @@ -158,7 +159,7 @@ func (c *ClickHouseConnector) InsertLogs(logs []common.Log) error { if err != nil { return err } - for _, log := range logs { + for _, log := range *logs { err := batch.Append( log.ChainId, log.BlockNumber, @@ -585,7 +586,7 @@ func getBlockNumbersStringArray(blockNumbers []*big.Int) string { return blockNumbersString } -func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error { +func (c *ClickHouseConnector) InsertStagingData(data []common.BlockData) error { query := `INSERT INTO ` + c.cfg.Database + `.block_data (chain_id, block_number, data)` batch, err := c.conn.PrepareBatch(context.Background(), query) if err != nil { @@ -608,7 +609,7 @@ func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error { return batch.Send() } -func (c *ClickHouseConnector) GetBlockData(qf QueryFilter) (blockDataList []common.BlockData, err error) { +func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) (blockDataList *[]common.BlockData, err error) { query := fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number IN (%s) AND is_deleted = 0", c.cfg.Database, getBlockNumbersStringArray(qf.BlockNumbers)) @@ -638,12 +639,12 @@ func (c *ClickHouseConnector) GetBlockData(qf QueryFilter) (blockDataList []comm if err != nil { return nil, err } - blockDataList = append(blockDataList, blockData) + *blockDataList = append(*blockDataList, blockData) } return blockDataList, nil } -func (c *ClickHouseConnector) DeleteBlockData(data []common.BlockData) error { +func (c *ClickHouseConnector) DeleteStagingData(data *[]common.BlockData) error { query := fmt.Sprintf(` INSERT INTO %s.block_data ( chain_id, block_number, is_deleted @@ -655,7 +656,7 @@ func (c *ClickHouseConnector) DeleteBlockData(data []common.BlockData) error { return err } - for _, blockData := range data { + for _, blockData := range *data { err := batch.Append( blockData.Block.ChainId, blockData.Block.Number, @@ -668,7 +669,7 @@ func (c *ClickHouseConnector) DeleteBlockData(data []common.BlockData) error { return batch.Send() } -func (c *ClickHouseConnector) InsertTraces(traces []common.Trace) error { +func (c *ClickHouseConnector) insertTraces(traces *[]common.Trace) error { query := ` INSERT INTO ` + c.cfg.Database + `.traces ( chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index, @@ -680,7 +681,7 @@ func (c *ClickHouseConnector) InsertTraces(traces []common.Trace) error { if err != nil { return err } - for _, trace := range traces { + for _, trace := range *traces { err = batch.Append( trace.ChainID, trace.BlockNumber, @@ -761,3 +762,77 @@ func (c *ClickHouseConnector) GetTraces(qf QueryFilter) (traces []common.Trace, } return traces, nil } + +// TODO make this atomic +func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error { + blocks := make([]common.Block, 0, len(*data)) + logs := make([]common.Log, 0) + transactions := make([]common.Transaction, 0) + traces := make([]common.Trace, 0) + + for _, blockData := range *data { + blocks = append(blocks, blockData.Block) + logs = append(logs, blockData.Logs...) + transactions = append(transactions, blockData.Transactions...) + traces = append(traces, blockData.Traces...) + } + + var saveErr error + var saveErrMutex sync.Mutex + var wg sync.WaitGroup + + if len(blocks) > 0 { + wg.Add(1) + go func() { + defer wg.Done() + if err := c.insertBlocks(&blocks); err != nil { + saveErrMutex.Lock() + saveErr = fmt.Errorf("error deleting blocks: %v", err) + saveErrMutex.Unlock() + } + }() + } + + if len(logs) > 0 { + wg.Add(1) + go func() { + defer wg.Done() + if err := c.insertLogs(&logs); err != nil { + saveErrMutex.Lock() + saveErr = fmt.Errorf("error deleting logs: %v", err) + saveErrMutex.Unlock() + } + }() + } + + if len(transactions) > 0 { + wg.Add(1) + go func() { + defer wg.Done() + if err := c.insertTransactions(&transactions); err != nil { + saveErrMutex.Lock() + saveErr = fmt.Errorf("error deleting transactions: %v", err) + saveErrMutex.Unlock() + } + }() + } + + if len(traces) > 0 { + wg.Add(1) + go func() { + defer wg.Done() + if err := c.insertTraces(&traces); err != nil { + saveErrMutex.Lock() + saveErr = fmt.Errorf("error deleting traces: %v", err) + saveErrMutex.Unlock() + } + }() + } + + wg.Wait() + + if saveErr != nil { + return saveErr + } + return nil +} diff --git a/internal/storage/connector.go b/internal/storage/connector.go index 6b7af95..be102d5 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -41,19 +41,16 @@ type IOrchestratorStorage interface { } type IStagingStorage interface { - InsertBlockData(data []common.BlockData) error - GetBlockData(qf QueryFilter) (data []common.BlockData, err error) - DeleteBlockData(data []common.BlockData) error + InsertStagingData(data []common.BlockData) error + GetStagingData(qf QueryFilter) (data *[]common.BlockData, err error) + DeleteStagingData(data *[]common.BlockData) error GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) } type IMainStorage interface { - InsertBlocks(blocks []common.Block) error - InsertTransactions(txs []common.Transaction) error - InsertLogs(logs []common.Log) error - InsertTraces(traces []common.Trace) error + InsertBlockData(data *[]common.BlockData) error - GetBlocks(qf QueryFilter) (logs []common.Block, err error) + GetBlocks(qf QueryFilter) (blocks []common.Block, err error) GetTransactions(qf QueryFilter) (transactions QueryResult[common.Transaction], err error) GetLogs(qf QueryFilter) (logs QueryResult[common.Log], err error) GetTraces(qf QueryFilter) (traces []common.Trace, err error) diff --git a/internal/storage/memory.go b/internal/storage/memory.go index e55b76d..b1c7855 100644 --- a/internal/storage/memory.go +++ b/internal/storage/memory.go @@ -73,8 +73,8 @@ func (m *MemoryConnector) DeleteBlockFailures(failures []common.BlockFailure) er return nil } -func (m *MemoryConnector) InsertBlocks(blocks []common.Block) error { - for _, block := range blocks { +func (m *MemoryConnector) insertBlocks(blocks *[]common.Block) error { + for _, block := range *blocks { blockJson, err := json.Marshal(block) if err != nil { return err @@ -109,8 +109,8 @@ func (m *MemoryConnector) GetBlocks(qf QueryFilter) ([]common.Block, error) { return blocks, nil } -func (m *MemoryConnector) InsertTransactions(txs []common.Transaction) error { - for _, tx := range txs { +func (m *MemoryConnector) insertTransactions(txs *[]common.Transaction) error { + for _, tx := range *txs { txJson, err := json.Marshal(tx) if err != nil { return err @@ -143,8 +143,8 @@ func (m *MemoryConnector) GetTransactions(qf QueryFilter) ([]common.Transaction, return txs, nil } -func (m *MemoryConnector) InsertLogs(logs []common.Log) error { - for _, log := range logs { +func (m *MemoryConnector) insertLogs(logs *[]common.Log) error { + for _, log := range *logs { logJson, err := json.Marshal(log) if err != nil { return err @@ -251,7 +251,7 @@ func getBlockNumbersToCheck(qf QueryFilter) map[string]uint8 { return blockNumbersToCheck } -func (m *MemoryConnector) InsertBlockData(data []common.BlockData) error { +func (m *MemoryConnector) InsertStagingData(data []common.BlockData) error { for _, blockData := range data { dataJson, err := json.Marshal(blockData) if err != nil { @@ -262,7 +262,7 @@ func (m *MemoryConnector) InsertBlockData(data []common.BlockData) error { return nil } -func (m *MemoryConnector) GetBlockData(qf QueryFilter) ([]common.BlockData, error) { +func (m *MemoryConnector) GetStagingData(qf QueryFilter) (*[]common.BlockData, error) { blockData := []common.BlockData{} limit := getLimit(qf) blockNumbersToCheck := getBlockNumbersToCheck(qf) @@ -283,19 +283,19 @@ func (m *MemoryConnector) GetBlockData(qf QueryFilter) ([]common.BlockData, erro } } } - return blockData, nil + return &blockData, nil } -func (m *MemoryConnector) DeleteBlockData(data []common.BlockData) error { - for _, blockData := range data { +func (m *MemoryConnector) DeleteStagingData(data *[]common.BlockData) error { + for _, blockData := range *data { key := fmt.Sprintf("blockData:%s:%s", blockData.Block.ChainId.String(), blockData.Block.Number.String()) m.cache.Remove(key) } return nil } -func (m *MemoryConnector) InsertTraces(traces []common.Trace) error { - for _, trace := range traces { +func (m *MemoryConnector) insertTraces(traces *[]common.Trace) error { + for _, trace := range *traces { traceJson, err := json.Marshal(trace) if err != nil { return err @@ -331,3 +331,31 @@ func (m *MemoryConnector) GetTraces(qf QueryFilter) ([]common.Trace, error) { func traceAddressToString(traceAddress []uint64) string { return strings.Trim(strings.Replace(fmt.Sprint(traceAddress), " ", ",", -1), "[]") } + +func (m *MemoryConnector) InsertBlockData(data *[]common.BlockData) error { + blocks := make([]common.Block, 0, len(*data)) + logs := make([]common.Log, 0) + transactions := make([]common.Transaction, 0) + traces := make([]common.Trace, 0) + + for _, blockData := range *data { + blocks = append(blocks, blockData.Block) + logs = append(logs, blockData.Logs...) + transactions = append(transactions, blockData.Transactions...) + traces = append(traces, blockData.Traces...) + } + + if err := m.insertBlocks(&blocks); err != nil { + return err + } + if err := m.insertLogs(&logs); err != nil { + return err + } + if err := m.insertTransactions(&transactions); err != nil { + return err + } + if err := m.insertTraces(&traces); err != nil { + return err + } + return nil +}