Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor rpc for easier reuse #88

Merged
merged 1 commit into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/orchestrator"
"github.com/thirdweb-dev/indexer/internal/rpc"
)

var (
Expand All @@ -23,7 +23,7 @@ var (

func RunOrchestrator(cmd *cobra.Command, args []string) {
log.Info().Msg("Starting indexer")
rpc, err := common.InitializeRPC()
rpc, err := rpc.Initialize()
if err != nil {
log.Fatal().Err(err).Msg("Failed to initialize RPC")
}
Expand Down
2 changes: 2 additions & 0 deletions internal/common/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ type BlockData struct {
Logs []Log
Traces []Trace
}

type RawBlock = map[string]interface{}
2 changes: 2 additions & 0 deletions internal/common/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ type Log struct {
Data string `json:"data"`
Topics []string `json:"topics"`
}

type RawLogs = []map[string]interface{}
125 changes: 0 additions & 125 deletions internal/common/rpc.go

This file was deleted.

2 changes: 2 additions & 0 deletions internal/common/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ type Trace struct {
RewardType string `json:"reward_type"`
RefundAddress string `json:"refund_address"`
}

type RawTraces = []map[string]interface{}
18 changes: 18 additions & 0 deletions internal/common/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package common

import "math/big"

func BigIntSliceToChunks(values []*big.Int, chunkSize int) [][]*big.Int {
if chunkSize >= len(values) || chunkSize <= 0 {
return [][]*big.Int{values}
}
var chunks [][]*big.Int
for i := 0; i < len(values); i += chunkSize {
end := i + chunkSize
if end > len(values) {
end = len(values)
}
chunks = append(chunks, values[i:end])
}
return chunks
}
7 changes: 3 additions & 4 deletions internal/orchestrator/chain_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@ import (
"time"

"github.com/rs/zerolog/log"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/metrics"
"github.com/thirdweb-dev/indexer/internal/rpc"
)

const DEFAULT_CHAIN_TRACKER_POLL_INTERVAL = 300000 // 5 minutes

type ChainTracker struct {
rpc common.RPC
rpc rpc.Client
triggerIntervalMs int
}

func NewChainTracker(rpc common.RPC) *ChainTracker {

func NewChainTracker(rpc rpc.Client) *ChainTracker {
return &ChainTracker{
rpc: rpc,
triggerIntervalMs: DEFAULT_CHAIN_TRACKER_POLL_INTERVAL,
Expand Down
5 changes: 3 additions & 2 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/metrics"
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
)

Expand All @@ -22,10 +23,10 @@ type Committer struct {
blocksPerCommit int
storage storage.IStorage
pollFromBlock *big.Int
rpc common.RPC
rpc rpc.Client
}

func NewCommitter(rpc common.RPC, storage storage.IStorage) *Committer {
func NewCommitter(rpc rpc.Client, storage storage.IStorage) *Committer {
triggerInterval := config.Cfg.Committer.Interval
if triggerInterval == 0 {
triggerInterval = DEFAULT_COMMITTER_TRIGGER_INTERVAL
Expand Down
15 changes: 8 additions & 7 deletions internal/orchestrator/failure_recoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/metrics"
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
"github.com/thirdweb-dev/indexer/internal/worker"
)
Expand All @@ -20,10 +21,10 @@ type FailureRecoverer struct {
failuresPerPoll int
triggerIntervalMs int
storage storage.IStorage
rpc common.RPC
rpc rpc.Client
}

func NewFailureRecoverer(rpc common.RPC, storage storage.IStorage) *FailureRecoverer {
func NewFailureRecoverer(rpc rpc.Client, storage storage.IStorage) *FailureRecoverer {
failuresPerPoll := config.Cfg.FailureRecoverer.BlocksPerRun
if failuresPerPoll == 0 {
failuresPerPoll = DEFAULT_FAILURES_PER_POLL
Expand Down Expand Up @@ -80,7 +81,7 @@ func (fr *FailureRecoverer) Start() {
select {}
}

func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFailure, results []worker.WorkerResult) {
func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFailure, results []rpc.GetFullBlockResult) {
log.Debug().Msgf("Failure Recoverer recovered %d blocks", len(results))
blockFailureMap := make(map[*big.Int]common.BlockFailure)
for _, failure := range blockFailures {
Expand All @@ -105,10 +106,10 @@ func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFail
})
} else {
successfulResults = append(successfulResults, common.BlockData{
Block: result.Block,
Logs: result.Logs,
Transactions: result.Transactions,
Traces: result.Traces,
Block: result.Data.Block,
Logs: result.Data.Logs,
Transactions: result.Data.Transactions,
Traces: result.Data.Traces,
})
failuresToDelete = append(failuresToDelete, blockFailureForBlock)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ import (
"sync"

config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
)

type Orchestrator struct {
rpc common.RPC
rpc rpc.Client
storage storage.IStorage
pollerEnabled bool
failureRecovererEnabled bool
committerEnabled bool
}

func NewOrchestrator(rpc common.RPC) (*Orchestrator, error) {
func NewOrchestrator(rpc rpc.Client) (*Orchestrator, error) {
storage, err := storage.NewStorageConnector(&config.Cfg.Storage)
if err != nil {
return nil, err
Expand Down
23 changes: 12 additions & 11 deletions internal/orchestrator/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/metrics"
"github.com/thirdweb-dev/indexer/internal/rpc"
"github.com/thirdweb-dev/indexer/internal/storage"
"github.com/thirdweb-dev/indexer/internal/worker"
)
Expand All @@ -19,7 +20,7 @@ const DEFAULT_BLOCKS_PER_POLL = 10
const DEFAULT_TRIGGER_INTERVAL = 1000

type Poller struct {
rpc common.RPC
rpc rpc.Client
blocksPerPoll int64
triggerIntervalMs int64
storage storage.IStorage
Expand All @@ -32,7 +33,7 @@ type BlockNumberWithError struct {
Error error
}

func NewPoller(rpc common.RPC, storage storage.IStorage) *Poller {
func NewPoller(rpc rpc.Client, storage storage.IStorage) *Poller {
blocksPerPoll := config.Cfg.Poller.BlocksPerPoll
if blocksPerPoll == 0 {
blocksPerPoll = DEFAULT_BLOCKS_PER_POLL
Expand Down Expand Up @@ -169,9 +170,9 @@ func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int)
return endBlock
}

func (p *Poller) handleWorkerResults(results []worker.WorkerResult) {
var successfulResults []worker.WorkerResult
var failedResults []worker.WorkerResult
func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) {
var successfulResults []rpc.GetFullBlockResult
var failedResults []rpc.GetFullBlockResult

for _, result := range results {
if result.Error != nil {
Expand All @@ -185,17 +186,17 @@ func (p *Poller) handleWorkerResults(results []worker.WorkerResult) {
blockData := make([]common.BlockData, 0, len(successfulResults))
for _, result := range successfulResults {
blockData = append(blockData, common.BlockData{
Block: result.Block,
Logs: result.Logs,
Transactions: result.Transactions,
Traces: result.Traces,
Block: result.Data.Block,
Logs: result.Data.Logs,
Transactions: result.Data.Transactions,
Traces: result.Data.Traces,
})
}
if err := p.storage.StagingStorage.InsertBlockData(blockData); err != nil {
e := fmt.Errorf("error inserting block data: %v", err)
log.Error().Err(e)
for _, result := range successfulResults {
failedResults = append(failedResults, worker.WorkerResult{
failedResults = append(failedResults, rpc.GetFullBlockResult{
BlockNumber: result.BlockNumber,
Error: e,
})
Expand All @@ -208,7 +209,7 @@ func (p *Poller) handleWorkerResults(results []worker.WorkerResult) {
}
}

func (p *Poller) handleBlockFailures(results []worker.WorkerResult) {
func (p *Poller) handleBlockFailures(results []rpc.GetFullBlockResult) {
var blockFailures []common.BlockFailure
for _, result := range results {
if result.Error != nil {
Expand Down
Loading
Loading