Skip to content

Commit

Permalink
change main storage interface to handle inserts all at once
Browse files Browse the repository at this point in the history
  • Loading branch information
iuwqyir committed Oct 7, 2024
1 parent 01aba8e commit dd391a7
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 124 deletions.
111 changes: 23 additions & 88 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"math/big"
"sort"
"sync"
"time"

"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -57,7 +56,7 @@ func (c *Committer) Start() {
log.Error().Err(err).Msg("Error getting block data to commit")
continue
}
if len(blockDataToCommit) == 0 {
if len(*blockDataToCommit) == 0 {
log.Debug().Msg("No block data to commit")
continue
}
Expand Down Expand Up @@ -95,7 +94,7 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
return blockNumbers, nil
}

func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error) {
func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error) {
blocksToCommit, err := c.getBlockNumbersToCommit()
if err != nil {
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
Expand All @@ -104,129 +103,65 @@ func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error)
return nil, nil
}

blocksData, err := c.storage.StagingStorage.GetBlockData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.ChainID})
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.ChainID})
if err != nil {
return nil, fmt.Errorf("error fetching blocks to commit: %v", err)
}
if len(blocksData) == 0 {
if len(*blocksData) == 0 {
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64())
return nil, nil
}

// Sort blocks by block number
sort.Slice(blocksData, func(i, j int) bool {
return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0
sort.Slice(*blocksData, func(i, j int) bool {
return (*blocksData)[i].Block.Number.Cmp((*blocksData)[j].Block.Number) < 0
})

if blocksData[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
return nil, c.handleGap(blocksToCommit[0], blocksData[0].Block)
if (*blocksData)[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
return nil, c.handleGap(blocksToCommit[0], (*blocksData)[0].Block)
}

var sequentialBlockData []common.BlockData
sequentialBlockData = append(sequentialBlockData, blocksData[0])
expectedBlockNumber := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1))
sequentialBlockData = append(sequentialBlockData, (*blocksData)[0])
expectedBlockNumber := new(big.Int).Add((*blocksData)[0].Block.Number, big.NewInt(1))

for i := 1; i < len(blocksData); i++ {
if blocksData[i].Block.Number.Cmp(expectedBlockNumber) != 0 {
for i := 1; i < len(*blocksData); i++ {
if (*blocksData)[i].Block.Number.Cmp(expectedBlockNumber) != 0 {
// Note: Gap detected, stop here
log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), blocksData[i-1].Block.Number.String())
log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), (*blocksData)[i-1].Block.Number.String())
// increment the a gap counter in prometheus
metrics.GapCounter.Inc()
// record the first missed block number in prometheus
metrics.MissedBlockNumbers.Set(float64(blocksData[0].Block.Number.Int64()))
metrics.MissedBlockNumbers.Set(float64((*blocksData)[0].Block.Number.Int64()))
break
}
sequentialBlockData = append(sequentialBlockData, blocksData[i])
sequentialBlockData = append(sequentialBlockData, (*blocksData)[i])
expectedBlockNumber.Add(expectedBlockNumber, big.NewInt(1))
}

return sequentialBlockData, nil
return &sequentialBlockData, nil
}

func (c *Committer) commit(blockData []common.BlockData) error {
blockNumbers := make([]*big.Int, len(blockData))
for i, block := range blockData {
func (c *Committer) commit(blockData *[]common.BlockData) error {
blockNumbers := make([]*big.Int, len(*blockData))
for i, block := range *blockData {
blockNumbers[i] = block.Block.Number
}
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))

// TODO if next parts (saving or deleting) fail, we'll have to do a rollback
if err := c.saveDataToMainStorage(blockData); err != nil {
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
return fmt.Errorf("error saving data to main storage: %v", err)
}

if err := c.storage.StagingStorage.DeleteBlockData(blockData); err != nil {
if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
return fmt.Errorf("error deleting data from staging storage: %v", err)
}

// Update metrics for successful commits
metrics.SuccessfulCommits.Add(float64(len(blockData)))
metrics.LastCommittedBlock.Set(float64(blockData[len(blockData)-1].Block.Number.Int64()))

return nil
}

func (c *Committer) saveDataToMainStorage(blockData []common.BlockData) error {
var commitWg sync.WaitGroup
commitWg.Add(4)

var commitErr error
var commitErrMutex sync.Mutex

blocks := make([]common.Block, 0, len(blockData))
logs := make([]common.Log, 0)
transactions := make([]common.Transaction, 0)
traces := make([]common.Trace, 0)

for _, block := range blockData {
blocks = append(blocks, block.Block)
logs = append(logs, block.Logs...)
transactions = append(transactions, block.Transactions...)
traces = append(traces, block.Traces...)
}

go func() {
defer commitWg.Done()
if err := c.storage.MainStorage.InsertBlocks(blocks); err != nil {
commitErrMutex.Lock()
commitErr = fmt.Errorf("error inserting blocks: %v", err)
commitErrMutex.Unlock()
}
}()

go func() {
defer commitWg.Done()
if err := c.storage.MainStorage.InsertLogs(logs); err != nil {
commitErrMutex.Lock()
commitErr = fmt.Errorf("error inserting logs: %v", err)
commitErrMutex.Unlock()
}
}()

go func() {
defer commitWg.Done()
if err := c.storage.MainStorage.InsertTransactions(transactions); err != nil {
commitErrMutex.Lock()
commitErr = fmt.Errorf("error inserting transactions: %v", err)
commitErrMutex.Unlock()
}
}()

go func() {
defer commitWg.Done()
if err := c.storage.MainStorage.InsertTraces(traces); err != nil {
commitErrMutex.Lock()
commitErr = fmt.Errorf("error inserting traces: %v", err)
commitErrMutex.Unlock()
}
}()

commitWg.Wait()

if commitErr != nil {
return commitErr
}
metrics.SuccessfulCommits.Add(float64(len(*blockData)))
metrics.LastCommittedBlock.Set(float64((*blockData)[len(*blockData)-1].Block.Number.Int64()))

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/orchestrator/failure_recoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFail
failuresToDelete = append(failuresToDelete, blockFailureForBlock)
}
}
if err := fr.storage.StagingStorage.InsertBlockData(successfulResults); err != nil {
if err := fr.storage.StagingStorage.InsertStagingData(successfulResults); err != nil {
log.Error().Err(fmt.Errorf("error inserting block data in failure recoverer: %v", err))
return
}
Expand Down
2 changes: 1 addition & 1 deletion internal/orchestrator/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) {
Traces: result.Data.Traces,
})
}
if err := p.storage.StagingStorage.InsertBlockData(blockData); err != nil {
if err := p.storage.StagingStorage.InsertStagingData(blockData); err != nil {
e := fmt.Errorf("error inserting block data: %v", err)
log.Error().Err(e)
for _, result := range successfulResults {
Expand Down
101 changes: 88 additions & 13 deletions internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math/big"
"strings"
"sync"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
Expand Down Expand Up @@ -60,7 +61,7 @@ func connectDB(cfg *config.ClickhouseConfig) (clickhouse.Conn, error) {
return conn, nil
}

func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error {
func (c *ClickHouseConnector) insertBlocks(blocks *[]common.Block) error {
query := `
INSERT INTO ` + c.cfg.Database + `.blocks (
chain_id, number, timestamp, hash, parent_hash, sha3_uncles, nonce,
Expand All @@ -73,7 +74,7 @@ func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error {
if err != nil {
return err
}
for _, block := range blocks {
for _, block := range *blocks {
err := batch.Append(
block.ChainId,
block.Number,
Expand Down Expand Up @@ -105,7 +106,7 @@ func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error {
return batch.Send()
}

func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error {
func (c *ClickHouseConnector) insertTransactions(txs *[]common.Transaction) error {
query := `
INSERT INTO ` + c.cfg.Database + `.transactions (
chain_id, hash, nonce, block_hash, block_number, block_timestamp, transaction_index,
Expand All @@ -117,7 +118,7 @@ func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error
if err != nil {
return err
}
for _, tx := range txs {
for _, tx := range *txs {
err := batch.Append(
tx.ChainId,
tx.Hash,
Expand Down Expand Up @@ -147,7 +148,7 @@ func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error
return batch.Send()
}

func (c *ClickHouseConnector) InsertLogs(logs []common.Log) error {
func (c *ClickHouseConnector) insertLogs(logs *[]common.Log) error {
query := `
INSERT INTO ` + c.cfg.Database + `.logs (
chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index,
Expand All @@ -158,7 +159,7 @@ func (c *ClickHouseConnector) InsertLogs(logs []common.Log) error {
if err != nil {
return err
}
for _, log := range logs {
for _, log := range *logs {
err := batch.Append(
log.ChainId,
log.BlockNumber,
Expand Down Expand Up @@ -585,7 +586,7 @@ func getBlockNumbersStringArray(blockNumbers []*big.Int) string {
return blockNumbersString
}

func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error {
func (c *ClickHouseConnector) InsertStagingData(data []common.BlockData) error {
query := `INSERT INTO ` + c.cfg.Database + `.block_data (chain_id, block_number, data)`
batch, err := c.conn.PrepareBatch(context.Background(), query)
if err != nil {
Expand All @@ -608,7 +609,7 @@ func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error {
return batch.Send()
}

func (c *ClickHouseConnector) GetBlockData(qf QueryFilter) (blockDataList []common.BlockData, err error) {
func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) (blockDataList *[]common.BlockData, err error) {
query := fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number IN (%s) AND is_deleted = 0",
c.cfg.Database, getBlockNumbersStringArray(qf.BlockNumbers))

Expand Down Expand Up @@ -638,12 +639,12 @@ func (c *ClickHouseConnector) GetBlockData(qf QueryFilter) (blockDataList []comm
if err != nil {
return nil, err
}
blockDataList = append(blockDataList, blockData)
*blockDataList = append(*blockDataList, blockData)
}
return blockDataList, nil
}

func (c *ClickHouseConnector) DeleteBlockData(data []common.BlockData) error {
func (c *ClickHouseConnector) DeleteStagingData(data *[]common.BlockData) error {
query := fmt.Sprintf(`
INSERT INTO %s.block_data (
chain_id, block_number, is_deleted
Expand All @@ -655,7 +656,7 @@ func (c *ClickHouseConnector) DeleteBlockData(data []common.BlockData) error {
return err
}

for _, blockData := range data {
for _, blockData := range *data {
err := batch.Append(
blockData.Block.ChainId,
blockData.Block.Number,
Expand All @@ -668,7 +669,7 @@ func (c *ClickHouseConnector) DeleteBlockData(data []common.BlockData) error {
return batch.Send()
}

func (c *ClickHouseConnector) InsertTraces(traces []common.Trace) error {
func (c *ClickHouseConnector) insertTraces(traces *[]common.Trace) error {
query := `
INSERT INTO ` + c.cfg.Database + `.traces (
chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index,
Expand All @@ -680,7 +681,7 @@ func (c *ClickHouseConnector) InsertTraces(traces []common.Trace) error {
if err != nil {
return err
}
for _, trace := range traces {
for _, trace := range *traces {
err = batch.Append(
trace.ChainID,
trace.BlockNumber,
Expand Down Expand Up @@ -761,3 +762,77 @@ func (c *ClickHouseConnector) GetTraces(qf QueryFilter) (traces []common.Trace,
}
return traces, nil
}

// TODO make this atomic
func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error {
blocks := make([]common.Block, 0, len(*data))
logs := make([]common.Log, 0)
transactions := make([]common.Transaction, 0)
traces := make([]common.Trace, 0)

for _, blockData := range *data {
blocks = append(blocks, blockData.Block)
logs = append(logs, blockData.Logs...)
transactions = append(transactions, blockData.Transactions...)
traces = append(traces, blockData.Traces...)
}

var saveErr error
var saveErrMutex sync.Mutex
var wg sync.WaitGroup

if len(blocks) > 0 {
wg.Add(1)
go func() {
defer wg.Done()
if err := c.insertBlocks(&blocks); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting blocks: %v", err)
saveErrMutex.Unlock()
}
}()
}

if len(logs) > 0 {
wg.Add(1)
go func() {
defer wg.Done()
if err := c.insertLogs(&logs); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting logs: %v", err)
saveErrMutex.Unlock()
}
}()
}

if len(transactions) > 0 {
wg.Add(1)
go func() {
defer wg.Done()
if err := c.insertTransactions(&transactions); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting transactions: %v", err)
saveErrMutex.Unlock()
}
}()
}

if len(traces) > 0 {
wg.Add(1)
go func() {
defer wg.Done()
if err := c.insertTraces(&traces); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting traces: %v", err)
saveErrMutex.Unlock()
}
}()
}

wg.Wait()

if saveErr != nil {
return saveErr
}
return nil
}
Loading

0 comments on commit dd391a7

Please sign in to comment.