Skip to content

Commit

Permalink
simplify block and expectation ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
EasterTheBunny committed Dec 16, 2024
1 parent fd9ce36 commit 1be791e
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 46 deletions.
63 changes: 24 additions & 39 deletions pkg/solana/logpoller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"sort"
"slices"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -280,8 +279,21 @@ func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, en
return err
}

// as a safety mechanism, order the blocks ascending (oldest to newest) in the extreme case
// that the RPC changes and results get jumbled.
slices.SortFunc(result, func(a, b uint64) int {
if a < b {
return -1
} else if a > b {
return 1
}

return 0
})

for _, block := range result {
c.ordered.ExpectBlock(block)

select {
case <-ctx.Done():
return nil
Expand Down Expand Up @@ -338,16 +350,12 @@ func newOrderedParser(parser ProgramEventProcessor, lggr logger.Logger) *ordered
return op
}

// ExpectBlock should be called in block order to preserve block progression.
func (p *orderedParser) ExpectBlock(block uint64) {
p.mu.Lock()
defer p.mu.Unlock()

p.blocks = append(p.blocks, block)

// ensure sort ascending
sort.Slice(p.blocks, func(i, j int) bool {
return p.blocks[i] < p.blocks[j]
})
}

func (p *orderedParser) ExpectTxs(block uint64, quantity int) {
Expand Down Expand Up @@ -431,10 +439,8 @@ func (p *orderedParser) run(_ context.Context) {
}

func (p *orderedParser) sendReadySlots() error {
rmvIdx := make([]int, 0)

// start at the lowest block and find ready blocks
for idx, block := range p.blocks {
for _, block := range p.blocks {
// if no expectations are set, we are still waiting on information for the block.
// if expectations set and not met, we are still waiting on information for the block
// no other block data should be sent until this is resolved
Expand All @@ -446,23 +452,20 @@ func (p *orderedParser) sendReadySlots() error {
// if expectations are 0 -> remove and continue
if exp == 0 {
p.clearExpectations(block)
rmvIdx = append(rmvIdx, idx)
p.blocks = p.blocks[1:]

continue
}

// if expectations set and met -> forward, remove, and continue

// to ensure ordered delivery, break from the loop if a ready block isn't found
// this function should be preceded by clearEmptyBlocks
rIdx, ok := getIdx(p.ready, block)
if !ok {
rIdx := slices.Index(p.ready, block)
if rIdx < 0 {
return nil
}

evts, ok := p.actual[block]
if !ok {
return errors.New("invalid state")
return errInvalidState
}

var errs error
Expand All @@ -475,34 +478,16 @@ func (p *orderedParser) sendReadySlots() error {
return errs
}

p.ready = remove(p.ready, rIdx)
rmvIdx = append(rmvIdx, idx)
p.ready = slices.Delete(p.ready, rIdx, rIdx+1)
p.blocks = p.blocks[1:]

delete(p.expect, block)
delete(p.actual, block)
}

for count, idx := range rmvIdx {
p.blocks = remove(p.blocks, idx-count)
p.clearExpectations(block)
}

return nil
}

func getIdx[T any](slice []T, match T) (int, bool) {
for idx, value := range slice {
if reflect.DeepEqual(value, match) {
return idx, true
}
}

return -1, false
}

func remove[T any](slice []T, s int) []T {
return append(slice[:s], slice[s+1:]...)
}

var (
errExpectationsNotSet = errors.New("expectations not set")
errInvalidState = errors.New("invalid state")
)
57 changes: 50 additions & 7 deletions pkg/solana/logpoller/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,10 @@ func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) {

latest.Store(uint64(40))

slots := []uint64{43, 42}
slots := []uint64{44, 43, 42, 41}
sigs := make([]solana.Signature, len(slots))
hashes := make([]solana.Hash, len(slots))
scrambler := &slotUnsync{ch: make(chan struct{})}

for idx := range len(sigs) {
_, _ = rand.Read(sigs[idx][:])
Expand Down Expand Up @@ -162,10 +163,9 @@ func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) {
}
}

if slot == 42 {
// force slot 42 to return after 43
time.Sleep(1 * time.Second)
}
// imitate loading block data out of order
// every other block must wait for the block previous
scrambler.next()

height := slot - 1

Expand Down Expand Up @@ -197,12 +197,24 @@ func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) {

tests.AssertEventually(t, func() bool {
return reflect.DeepEqual(parser.Events(), []logpoller.ProgramEvent{
{
BlockData: logpoller.BlockData{
SlotNumber: 41,
BlockHeight: 40,
BlockHash: hashes[3],
TransactionHash: sigs[3],
TransactionIndex: 0,
TransactionLogIndex: 0,
},
Prefix: ">",
Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA",
},
{
BlockData: logpoller.BlockData{
SlotNumber: 42,
BlockHeight: 41,
BlockHash: hashes[1],
TransactionHash: sigs[1],
BlockHash: hashes[2],
TransactionHash: sigs[2],
TransactionIndex: 0,
TransactionLogIndex: 0,
},
Expand All @@ -213,6 +225,18 @@ func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) {
BlockData: logpoller.BlockData{
SlotNumber: 43,
BlockHeight: 42,
BlockHash: hashes[1],
TransactionHash: sigs[1],
TransactionIndex: 0,
TransactionLogIndex: 0,
},
Prefix: ">",
Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA",
},
{
BlockData: logpoller.BlockData{
SlotNumber: 44,
BlockHeight: 43,
BlockHash: hashes[0],
TransactionHash: sigs[0],
TransactionIndex: 0,
Expand All @@ -227,6 +251,25 @@ func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) {
client.AssertExpectations(t)
}

type slotUnsync struct {
ch chan struct{}
waiting atomic.Bool
}

func (u *slotUnsync) next() {
if u.waiting.Load() {
u.waiting.Store(false)

<-u.ch

return
}

u.waiting.Store(true)

u.ch <- struct{}{}
}

func TestEncodedLogCollector_BackfillForAddress(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 1be791e

Please sign in to comment.