Skip to content

Commit

Permalink
logs are forwarded to a processor in slot and trx order (#953)
Browse files Browse the repository at this point in the history
* logs are forwarded to a processor in slot and trx order

* make tests pass again

* simplify block and expectation ordering
  • Loading branch information
EasterTheBunny authored and dhaidashenko committed Dec 20, 2024
1 parent ec749e1 commit 972a01b
Show file tree
Hide file tree
Showing 4 changed files with 562 additions and 86 deletions.
25 changes: 18 additions & 7 deletions pkg/solana/logpoller/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func (j retryableJob) Run(ctx context.Context) error {
}

type eventDetail struct {
blockNumber uint64
slotNumber uint64
blockHeight uint64
blockHash solana.Hash
trxIdx int
trxSig solana.Signature
Expand All @@ -54,12 +55,18 @@ func (j *processEventJob) Run(_ context.Context) error {
return j.parser.Process(j.event)
}

type wrappedParser interface {
ProgramEventProcessor
ExpectBlock(uint64)
ExpectTxs(uint64, int)
}

// getTransactionsFromBlockJob is a job that fetches transaction signatures from a block and loads
// the job queue with getTransactionLogsJobs for each transaction found in the block.
type getTransactionsFromBlockJob struct {
slotNumber uint64
client RPCClient
parser ProgramEventProcessor
parser wrappedParser
chJobs chan Job
}

Expand Down Expand Up @@ -103,17 +110,20 @@ func (j *getTransactionsFromBlockJob) Run(ctx context.Context) error {
}

detail := eventDetail{
blockHash: block.Blockhash,
slotNumber: j.slotNumber,
blockHash: block.Blockhash,
}

if block.BlockHeight != nil {
detail.blockNumber = *block.BlockHeight
detail.blockHeight = *block.BlockHeight
}

if len(block.Transactions) != len(blockSigsOnly.Signatures) {
return fmt.Errorf("block %d has %d transactions but %d signatures", j.slotNumber, len(block.Transactions), len(blockSigsOnly.Signatures))
}

j.parser.ExpectTxs(j.slotNumber, len(block.Transactions))

for idx, trx := range block.Transactions {
detail.trxIdx = idx
if len(blockSigsOnly.Signatures)-1 <= idx {
Expand All @@ -130,14 +140,15 @@ func messagesToEvents(messages []string, parser ProgramEventProcessor, detail ev
var logIdx uint
for _, outputs := range parseProgramLogs(messages) {
for _, event := range outputs.Events {
logIdx++

event.BlockNumber = detail.blockNumber
event.SlotNumber = detail.slotNumber
event.BlockHeight = detail.blockHeight
event.BlockHash = detail.blockHash
event.TransactionHash = detail.trxSig
event.TransactionIndex = detail.trxIdx
event.TransactionLogIndex = logIdx

logIdx++

chJobs <- &processEventJob{
parser: parser,
event: event,
Expand Down
214 changes: 205 additions & 9 deletions pkg/solana/logpoller/loader.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package logpoller

import (
"container/list"
"context"
"errors"
"fmt"
"slices"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -40,7 +44,8 @@ type EncodedLogCollector struct {

// dependencies and configuration
client RPCClient
parser ProgramEventProcessor
ordered *orderedParser
unordered *unorderedParser
lggr logger.Logger
rpcTimeLimit time.Duration

Expand All @@ -62,7 +67,7 @@ func NewEncodedLogCollector(
) *EncodedLogCollector {
c := &EncodedLogCollector{
client: client,
parser: parser,
unordered: newUnorderedParser(parser),
chSlot: make(chan uint64),
chBlock: make(chan uint64, 1),
chJobs: make(chan Job, 1),
Expand All @@ -74,8 +79,9 @@ func NewEncodedLogCollector(
Name: "EncodedLogCollector",
NewSubServices: func(lggr logger.Logger) []services.Service {
c.workers = NewWorkerGroup(DefaultWorkerCount, lggr)
c.ordered = newOrderedParser(parser, lggr)

return []services.Service{c.workers}
return []services.Service{c.workers, c.ordered}
},
Start: c.start,
Close: c.close,
Expand Down Expand Up @@ -127,7 +133,7 @@ func (c *EncodedLogCollector) BackfillForAddress(ctx context.Context, address st
if err := c.workers.Do(ctx, &getTransactionsFromBlockJob{
slotNumber: sig.Slot,
client: c.client,
parser: c.parser,
parser: c.unordered,
chJobs: c.chJobs,
}); err != nil {
return err
Expand All @@ -138,7 +144,7 @@ func (c *EncodedLogCollector) BackfillForAddress(ctx context.Context, address st
return nil
}

func (c *EncodedLogCollector) start(ctx context.Context) error {
func (c *EncodedLogCollector) start(_ context.Context) error {
c.engine.Go(c.runSlotPolling)
c.engine.Go(c.runSlotProcessing)
c.engine.Go(c.runBlockProcessing)
Expand Down Expand Up @@ -201,10 +207,15 @@ func (c *EncodedLogCollector) runSlotProcessing(ctx context.Context) {
continue
}

from := c.highestSlot.Load() + 1
if c.highestSlot.Load() == 0 {
from = slot
}

c.highestSlot.Store(slot)

// load blocks in slot range
c.loadRange(ctx, c.highestSlotLoaded.Load()+1, slot)
c.loadRange(ctx, from, slot)
}
}
}
Expand All @@ -214,11 +225,11 @@ func (c *EncodedLogCollector) runBlockProcessing(ctx context.Context) {
select {
case <-ctx.Done():
return
case block := <-c.chBlock:
case slot := <-c.chBlock:
if err := c.workers.Do(ctx, &getTransactionsFromBlockJob{
slotNumber: block,
slotNumber: slot,
client: c.client,
parser: c.parser,
parser: c.ordered,
chJobs: c.chJobs,
}); err != nil {
c.lggr.Errorf("failed to add job to queue: %s", err)
Expand Down Expand Up @@ -269,7 +280,21 @@ func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, en
return err
}

// as a safety mechanism, order the blocks ascending (oldest to newest) in the extreme case
// that the RPC changes and results get jumbled.
slices.SortFunc(result, func(a, b uint64) int {
if a < b {
return -1
} else if a > b {
return 1
}

return 0
})

for _, block := range result {
c.ordered.ExpectBlock(block)

select {
case <-ctx.Done():
return nil
Expand All @@ -279,3 +304,174 @@ func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, en

return nil
}

type unorderedParser struct {
parser ProgramEventProcessor
}

func newUnorderedParser(parser ProgramEventProcessor) *unorderedParser {
return &unorderedParser{parser: parser}
}

func (p *unorderedParser) ExpectBlock(_ uint64) {}
func (p *unorderedParser) ExpectTxs(_ uint64, _ int) {}
func (p *unorderedParser) Process(evt ProgramEvent) error {
return p.parser.Process(evt)
}

type orderedParser struct {
// service state management
services.Service
engine *services.Engine

// internal state
parser ProgramEventProcessor
mu sync.Mutex
blocks *list.List
expect map[uint64]int
actual map[uint64][]ProgramEvent
}

func newOrderedParser(parser ProgramEventProcessor, lggr logger.Logger) *orderedParser {
op := &orderedParser{
parser: parser,
blocks: list.New(),
expect: make(map[uint64]int),
actual: make(map[uint64][]ProgramEvent),
}

op.Service, op.engine = services.Config{
Name: "OrderedParser",
Start: op.start,
Close: op.close,
}.NewServiceEngine(lggr)

return op
}

// ExpectBlock should be called in block order to preserve block progression.
func (p *orderedParser) ExpectBlock(block uint64) {
p.mu.Lock()
defer p.mu.Unlock()

p.blocks.PushBack(block)
}

func (p *orderedParser) ExpectTxs(block uint64, quantity int) {
p.mu.Lock()
defer p.mu.Unlock()

p.expect[block] = quantity
p.actual[block] = make([]ProgramEvent, 0, quantity)
}

func (p *orderedParser) Process(event ProgramEvent) error {
p.mu.Lock()
defer p.mu.Unlock()

if err := p.addToExpectations(event); err != nil {
// TODO: log error because this is an unrecoverable error
return nil
}

return p.sendReadySlots()
}

func (p *orderedParser) start(_ context.Context) error {
p.engine.GoTick(services.NewTicker(time.Second), p.run)

return nil
}

func (p *orderedParser) close() error {
return nil
}

func (p *orderedParser) addToExpectations(evt ProgramEvent) error {
_, ok := p.expect[evt.SlotNumber]
if !ok {
return fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber)
}

evts, ok := p.actual[evt.SlotNumber]
if !ok {
return fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber)
}

p.actual[evt.SlotNumber] = append(evts, evt)

return nil
}

func (p *orderedParser) expectations(block uint64) (int, bool, error) {
expectations, ok := p.expect[block]
if !ok {
return 0, false, fmt.Errorf("%w: %d", errExpectationsNotSet, block)
}

evts, ok := p.actual[block]
if !ok {
return 0, false, fmt.Errorf("%w: %d", errExpectationsNotSet, block)
}

return expectations, expectations == len(evts), nil
}

func (p *orderedParser) clearExpectations(block uint64) {
delete(p.expect, block)
delete(p.actual, block)
}

func (p *orderedParser) run(_ context.Context) {
p.mu.Lock()
defer p.mu.Unlock()

_ = p.sendReadySlots()
}

func (p *orderedParser) sendReadySlots() error {
// start at the lowest block and find ready blocks
for element := p.blocks.Front(); element != nil; element = p.blocks.Front() {
block := element.Value.(uint64)
// if no expectations are set, we are still waiting on information for the block.
// if expectations set and not met, we are still waiting on information for the block
// no other block data should be sent until this is resolved
exp, met, err := p.expectations(block)
if err != nil || !met {
break
}

// if expectations are 0 -> remove and continue
if exp == 0 {
p.clearExpectations(block)
p.blocks.Remove(element)

continue
}

evts, ok := p.actual[block]
if !ok {
return errInvalidState
}

var errs error
for _, evt := range evts {
errs = errors.Join(errs, p.parser.Process(evt))
}

// need possible retry
if errs != nil {
return errs
}

p.blocks.Remove(element)
p.clearExpectations(block)
}

return nil
}

var (
errExpectationsNotSet = errors.New("expectations not set")
errInvalidState = errors.New("invalid state")
)
Loading

0 comments on commit 972a01b

Please sign in to comment.