From c7ef582727022cbcec21a0dedbb0f4ec4e95f553 Mon Sep 17 00:00:00 2001 From: Awbrey Hughlett Date: Mon, 2 Dec 2024 10:41:38 -0600 Subject: [PATCH 1/6] logs are forwarded to a processor in slot and trx order --- pkg/solana/logpoller/job.go | 19 ++- pkg/solana/logpoller/loader.go | 182 +++++++++++++++++++++++- pkg/solana/logpoller/loader_test.go | 158 +++++++++++++++++++- pkg/solana/logpoller/log_data_parser.go | 3 +- 4 files changed, 346 insertions(+), 16 deletions(-) diff --git a/pkg/solana/logpoller/job.go b/pkg/solana/logpoller/job.go index 1d827a85b..ce62de190 100644 --- a/pkg/solana/logpoller/job.go +++ b/pkg/solana/logpoller/job.go @@ -33,7 +33,8 @@ func (j retryableJob) Run(ctx context.Context) error { } type eventDetail struct { - blockNumber uint64 + slotNumber uint64 + blockHeight uint64 blockHash solana.Hash trxIdx int trxSig solana.Signature @@ -59,7 +60,7 @@ func (j *processEventJob) Run(_ context.Context) error { type getTransactionsFromBlockJob struct { slotNumber uint64 client RPCClient - parser ProgramEventProcessor + parser *orderedParser chJobs chan Job } @@ -103,17 +104,20 @@ func (j *getTransactionsFromBlockJob) Run(ctx context.Context) error { } detail := eventDetail{ - blockHash: block.Blockhash, + slotNumber: j.slotNumber, + blockHash: block.Blockhash, } if block.BlockHeight != nil { - detail.blockNumber = *block.BlockHeight + detail.blockHeight = *block.BlockHeight } if len(block.Transactions) != len(blockSigsOnly.Signatures) { return fmt.Errorf("block %d has %d transactions but %d signatures", j.slotNumber, len(block.Transactions), len(blockSigsOnly.Signatures)) } + j.parser.ExpectTxs(j.slotNumber, len(block.Transactions)) + for idx, trx := range block.Transactions { detail.trxIdx = idx if len(blockSigsOnly.Signatures)-1 <= idx { @@ -130,14 +134,15 @@ func messagesToEvents(messages []string, parser ProgramEventProcessor, detail ev var logIdx uint for _, outputs := range parseProgramLogs(messages) { for _, event := range outputs.Events { - logIdx++ - - event.BlockNumber = detail.blockNumber + event.SlotNumber = detail.slotNumber + event.BlockHeight = detail.blockHeight event.BlockHash = detail.blockHash event.TransactionHash = detail.trxSig event.TransactionIndex = detail.trxIdx event.TransactionLogIndex = logIdx + logIdx++ + chJobs <- &processEventJob{ parser: parser, event: event, diff --git a/pkg/solana/logpoller/loader.go b/pkg/solana/logpoller/loader.go index 56fcef25c..25118c6bc 100644 --- a/pkg/solana/logpoller/loader.go +++ b/pkg/solana/logpoller/loader.go @@ -3,6 +3,10 @@ package logpoller import ( "context" "errors" + "fmt" + "reflect" + "sort" + "sync" "sync/atomic" "time" @@ -40,7 +44,7 @@ type EncodedLogCollector struct { // dependencies and configuration client RPCClient - parser ProgramEventProcessor + parser *orderedParser lggr logger.Logger rpcTimeLimit time.Duration @@ -62,7 +66,7 @@ func NewEncodedLogCollector( ) *EncodedLogCollector { c := &EncodedLogCollector{ client: client, - parser: parser, + parser: newOrderedParser(parser), chSlot: make(chan uint64), chBlock: make(chan uint64, 1), chJobs: make(chan Job, 1), @@ -201,10 +205,15 @@ func (c *EncodedLogCollector) runSlotProcessing(ctx context.Context) { continue } + from := c.highestSlot.Load() + 1 + if c.highestSlot.Load() == 0 { + from = slot + } + c.highestSlot.Store(slot) // load blocks in slot range - c.loadRange(ctx, c.highestSlotLoaded.Load()+1, slot) + c.loadRange(ctx, from, slot) } } } @@ -214,9 +223,9 @@ func (c *EncodedLogCollector) runBlockProcessing(ctx context.Context) { select { case <-ctx.Done(): return - case block := <-c.chBlock: + case slot := <-c.chBlock: if err := c.workers.Do(ctx, &getTransactionsFromBlockJob{ - slotNumber: block, + slotNumber: slot, client: c.client, parser: c.parser, chJobs: c.chJobs, @@ -270,6 +279,7 @@ func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, en } for _, block := range result { + c.parser.ExpectBlock(block) select { case <-ctx.Done(): return nil @@ -279,3 +289,165 @@ func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, en return nil } + +type orderedParser struct { + parser ProgramEventProcessor + mu sync.Mutex + blocks []uint64 + ready []uint64 + expect map[uint64]int + actual map[uint64][]ProgramEvent +} + +func newOrderedParser(parser ProgramEventProcessor) *orderedParser { + return &orderedParser{ + parser: parser, + blocks: make([]uint64, 0), + ready: make([]uint64, 0), + expect: make(map[uint64]int), + actual: make(map[uint64][]ProgramEvent), + } +} + +func (p *orderedParser) ExpectBlock(block uint64) { + p.mu.Lock() + defer p.mu.Unlock() + + p.blocks = append(p.blocks, block) + + // ensure sort ascending + sort.Slice(p.blocks, func(i, j int) bool { + return p.blocks[i] < p.blocks[j] + }) +} + +func (p *orderedParser) ExpectTxs(block uint64, quantity int) { + p.mu.Lock() + defer p.mu.Unlock() + + p.expect[block] = quantity + p.actual[block] = make([]ProgramEvent, 0, quantity) +} + +func (p *orderedParser) Process(event ProgramEvent) error { + p.mu.Lock() + defer p.mu.Unlock() + + meetsExpectations, err := p.addAndCompareExpectations(event) + if err != nil { + return err + } + + // incoming event does not meet expectations for transaction + // event is added to actual and no error is returned + if !meetsExpectations { + return nil + } + + p.clearEmptyBlocks() + p.setReady(event.SlotNumber) + + return p.sendReadySlots() +} + +func (p *orderedParser) addAndCompareExpectations(evt ProgramEvent) (bool, error) { + expectations, ok := p.expect[evt.SlotNumber] + if !ok { + return false, fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber) + } + + evts, ok := p.actual[evt.SlotNumber] + if !ok { + return false, fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber) + } + + p.actual[evt.SlotNumber] = append(evts, evt) + + return expectations == len(evts)+1, nil +} + +func (p *orderedParser) clearEmptyBlocks() { + rmvIdx := make([]int, 0) + + for idx, block := range p.blocks { + exp, ok := p.expect[block] + if !ok { + // transaction expectations have not been applied for this block yet + continue + } + + if exp == 0 { + rmvIdx = append(rmvIdx, idx) + + delete(p.expect, block) + delete(p.actual, block) + } + } + + for count, idx := range rmvIdx { + p.blocks = remove(p.blocks, idx-count) + } +} + +func (p *orderedParser) setReady(slot uint64) { + p.ready = append(p.ready, slot) +} + +func (p *orderedParser) sendReadySlots() error { + rmvIdx := make([]int, 0) + + // start at the lowest block and find ready blocks + for idx, block := range p.blocks { + // to ensure ordered delivery, break from the loop if a ready block isn't found + // this function should be preceded by clearEmptyBlocks + rIdx, ok := getIdx(p.ready, block) + if !ok { + return nil + } + + evts, ok := p.actual[block] + if !ok { + return errors.New("invalid state") + } + + var errs error + for _, evt := range evts { + errs = errors.Join(errs, p.parser.Process(evt)) + } + + // need possible retry + if errs != nil { + return errs + } + + p.ready = remove(p.ready, rIdx) + rmvIdx = append(rmvIdx, idx) + + delete(p.expect, block) + delete(p.actual, block) + } + + for count, idx := range rmvIdx { + p.blocks = remove(p.blocks, idx-count) + } + + return nil +} + +func getIdx[T any](slice []T, match T) (int, bool) { + for idx, value := range slice { + if reflect.DeepEqual(value, match) { + return idx, true + } + } + + return -1, false +} + +func remove[T any](slice []T, s int) []T { + return append(slice[:s], slice[s+1:]...) +} + +var ( + errExpectationsNotSet = errors.New("expectations not set") +) diff --git a/pkg/solana/logpoller/loader_test.go b/pkg/solana/logpoller/loader_test.go index 69a37702b..d752b0054 100644 --- a/pkg/solana/logpoller/loader_test.go +++ b/pkg/solana/logpoller/loader_test.go @@ -3,6 +3,7 @@ package logpoller_test import ( "context" "crypto/rand" + "reflect" "sync" "sync/atomic" "testing" @@ -88,6 +89,143 @@ func TestEncodedLogCollector_ParseSingleEvent(t *testing.T) { client.AssertExpectations(t) } +func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { + t.Parallel() + + client := new(mocks.RPCClient) + parser := new(testParser) + ctx := tests.Context(t) + + collector := logpoller.NewEncodedLogCollector(client, parser, logger.Nop()) + + require.NoError(t, collector.Start(ctx)) + t.Cleanup(func() { + require.NoError(t, collector.Close()) + }) + + var latest atomic.Uint64 + + latest.Store(uint64(40)) + + slots := []uint64{43, 42} + sigs := make([]solana.Signature, len(slots)) + hashes := make([]solana.Hash, len(slots)) + + for idx := range len(sigs) { + _, _ = rand.Read(sigs[idx][:]) + _, _ = rand.Read(hashes[idx][:]) + } + + client.EXPECT(). + GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized). + RunAndReturn(func(ctx context.Context, ct rpc.CommitmentType) (*rpc.GetLatestBlockhashResult, error) { + defer func() { + latest.Store(latest.Load() + 2) + }() + + return &rpc.GetLatestBlockhashResult{ + RPCContext: rpc.RPCContext{ + Context: rpc.Context{ + Slot: latest.Load(), + }, + }, + }, nil + }) + + client.EXPECT(). + GetBlocks(mock.Anything, mock.MatchedBy(func(val uint64) bool { + return val > uint64(0) + }), mock.MatchedBy(func(val *uint64) bool { + return val != nil && *val <= latest.Load() + }), mock.Anything). + RunAndReturn(func(_ context.Context, u1 uint64, u2 *uint64, _ rpc.CommitmentType) (rpc.BlocksResult, error) { + blocks := make([]uint64, *u2-u1+1) + for idx := range blocks { + blocks[idx] = u1 + uint64(idx) + } + + return rpc.BlocksResult(blocks), nil + }) + + client.EXPECT(). + GetBlockWithOpts(mock.Anything, mock.MatchedBy(func(val uint64) bool { + return true + }), mock.Anything). + RunAndReturn(func(_ context.Context, slot uint64, _ *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { + slotIdx := -1 + for idx, slt := range slots { + if slt == slot { + slotIdx = idx + + break + } + } + + if slot == 42 { + // force slot 42 to return after 43 + time.Sleep(1 * time.Second) + } + + height := slot - 1 + + if slotIdx == -1 { + var hash solana.Hash + _, _ = rand.Read(hash[:]) + + return &rpc.GetBlockResult{ + Blockhash: hash, + Transactions: []rpc.TransactionWithMeta{}, + Signatures: []solana.Signature{}, + BlockHeight: &height, + }, nil + } + + return &rpc.GetBlockResult{ + Blockhash: hashes[slotIdx], + Transactions: []rpc.TransactionWithMeta{ + { + Meta: &rpc.TransactionMeta{ + LogMessages: messages, + }, + }, + }, + Signatures: []solana.Signature{sigs[slotIdx]}, + BlockHeight: &height, + }, nil + }) + + tests.AssertEventually(t, func() bool { + return reflect.DeepEqual(parser.Events(), []logpoller.ProgramEvent{ + { + BlockData: logpoller.BlockData{ + SlotNumber: 42, + BlockHeight: 41, + BlockHash: hashes[1], + TransactionHash: sigs[1], + TransactionIndex: 0, + TransactionLogIndex: 0, + }, + Prefix: ">", + Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + }, + { + BlockData: logpoller.BlockData{ + SlotNumber: 43, + BlockHeight: 42, + BlockHash: hashes[0], + TransactionHash: sigs[0], + TransactionIndex: 0, + TransactionLogIndex: 0, + }, + Prefix: ">", + Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + }, + }) + }) + + client.AssertExpectations(t) +} + func TestEncodedLogCollector_BackfillForAddress(t *testing.T) { client := new(mocks.RPCClient) parser := new(testParser) @@ -347,12 +485,16 @@ func (p *testBlockProducer) GetTransaction(_ context.Context, sig solana.Signatu type testParser struct { called atomic.Bool - count atomic.Uint64 + mu sync.Mutex + events []logpoller.ProgramEvent } func (p *testParser) Process(event logpoller.ProgramEvent) error { p.called.Store(true) - p.count.Store(p.count.Load() + 1) + + p.mu.Lock() + p.events = append(p.events, event) + p.mu.Unlock() return nil } @@ -362,5 +504,15 @@ func (p *testParser) Called() bool { } func (p *testParser) Count() uint64 { - return p.count.Load() + p.mu.Lock() + defer p.mu.Unlock() + + return uint64(len(p.events)) +} + +func (p *testParser) Events() []logpoller.ProgramEvent { + p.mu.Lock() + defer p.mu.Unlock() + + return p.events } diff --git a/pkg/solana/logpoller/log_data_parser.go b/pkg/solana/logpoller/log_data_parser.go index 4cfd04470..4080a09e2 100644 --- a/pkg/solana/logpoller/log_data_parser.go +++ b/pkg/solana/logpoller/log_data_parser.go @@ -16,7 +16,8 @@ var ( ) type BlockData struct { - BlockNumber uint64 + SlotNumber uint64 + BlockHeight uint64 BlockHash solana.Hash TransactionHash solana.Signature TransactionIndex int From ac40ee87f4e0b52a12c9807ace1e2a7dace6b43c Mon Sep 17 00:00:00 2001 From: Awbrey Hughlett Date: Wed, 4 Dec 2024 09:50:01 -0600 Subject: [PATCH 2/6] make tests pass again --- pkg/solana/logpoller/job.go | 8 +- pkg/solana/logpoller/loader.go | 139 ++++++++++----- pkg/solana/logpoller/loader_test.go | 267 ++++++++++++++++++---------- 3 files changed, 274 insertions(+), 140 deletions(-) diff --git a/pkg/solana/logpoller/job.go b/pkg/solana/logpoller/job.go index ce62de190..165c0b5fe 100644 --- a/pkg/solana/logpoller/job.go +++ b/pkg/solana/logpoller/job.go @@ -55,12 +55,18 @@ func (j *processEventJob) Run(_ context.Context) error { return j.parser.Process(j.event) } +type wrappedParser interface { + ProgramEventProcessor + ExpectBlock(uint64) + ExpectTxs(uint64, int) +} + // getTransactionsFromBlockJob is a job that fetches transaction signatures from a block and loads // the job queue with getTransactionLogsJobs for each transaction found in the block. type getTransactionsFromBlockJob struct { slotNumber uint64 client RPCClient - parser *orderedParser + parser wrappedParser chJobs chan Job } diff --git a/pkg/solana/logpoller/loader.go b/pkg/solana/logpoller/loader.go index 25118c6bc..8791e2a9f 100644 --- a/pkg/solana/logpoller/loader.go +++ b/pkg/solana/logpoller/loader.go @@ -44,7 +44,8 @@ type EncodedLogCollector struct { // dependencies and configuration client RPCClient - parser *orderedParser + ordered *orderedParser + unordered *unorderedParser lggr logger.Logger rpcTimeLimit time.Duration @@ -66,7 +67,7 @@ func NewEncodedLogCollector( ) *EncodedLogCollector { c := &EncodedLogCollector{ client: client, - parser: newOrderedParser(parser), + unordered: newUnorderedParser(parser), chSlot: make(chan uint64), chBlock: make(chan uint64, 1), chJobs: make(chan Job, 1), @@ -78,8 +79,9 @@ func NewEncodedLogCollector( Name: "EncodedLogCollector", NewSubServices: func(lggr logger.Logger) []services.Service { c.workers = NewWorkerGroup(DefaultWorkerCount, lggr) + c.ordered = newOrderedParser(parser, lggr) - return []services.Service{c.workers} + return []services.Service{c.workers, c.ordered} }, Start: c.start, Close: c.close, @@ -131,7 +133,7 @@ func (c *EncodedLogCollector) BackfillForAddress(ctx context.Context, address st if err := c.workers.Do(ctx, &getTransactionsFromBlockJob{ slotNumber: sig.Slot, client: c.client, - parser: c.parser, + parser: c.unordered, chJobs: c.chJobs, }); err != nil { return err @@ -142,7 +144,7 @@ func (c *EncodedLogCollector) BackfillForAddress(ctx context.Context, address st return nil } -func (c *EncodedLogCollector) start(ctx context.Context) error { +func (c *EncodedLogCollector) start(_ context.Context) error { c.engine.Go(c.runSlotPolling) c.engine.Go(c.runSlotProcessing) c.engine.Go(c.runBlockProcessing) @@ -227,7 +229,7 @@ func (c *EncodedLogCollector) runBlockProcessing(ctx context.Context) { if err := c.workers.Do(ctx, &getTransactionsFromBlockJob{ slotNumber: slot, client: c.client, - parser: c.parser, + parser: c.ordered, chJobs: c.chJobs, }); err != nil { c.lggr.Errorf("failed to add job to queue: %s", err) @@ -279,7 +281,7 @@ func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, en } for _, block := range result { - c.parser.ExpectBlock(block) + c.ordered.ExpectBlock(block) select { case <-ctx.Done(): return nil @@ -290,7 +292,26 @@ func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, en return nil } +type unorderedParser struct { + parser ProgramEventProcessor +} + +func newUnorderedParser(parser ProgramEventProcessor) *unorderedParser { + return &unorderedParser{parser: parser} +} + +func (p *unorderedParser) ExpectBlock(_ uint64) {} +func (p *unorderedParser) ExpectTxs(_ uint64, _ int) {} +func (p *unorderedParser) Process(evt ProgramEvent) error { + return p.parser.Process(evt) +} + type orderedParser struct { + // service state management + services.Service + engine *services.Engine + + // internal state parser ProgramEventProcessor mu sync.Mutex blocks []uint64 @@ -299,14 +320,22 @@ type orderedParser struct { actual map[uint64][]ProgramEvent } -func newOrderedParser(parser ProgramEventProcessor) *orderedParser { - return &orderedParser{ +func newOrderedParser(parser ProgramEventProcessor, lggr logger.Logger) *orderedParser { + op := &orderedParser{ parser: parser, blocks: make([]uint64, 0), ready: make([]uint64, 0), expect: make(map[uint64]int), actual: make(map[uint64][]ProgramEvent), } + + op.Service, op.engine = services.Config{ + Name: "OrderedParser", + Start: op.start, + Close: op.close, + }.NewServiceEngine(lggr) + + return op } func (p *orderedParser) ExpectBlock(block uint64) { @@ -333,71 +362,97 @@ func (p *orderedParser) Process(event ProgramEvent) error { p.mu.Lock() defer p.mu.Unlock() - meetsExpectations, err := p.addAndCompareExpectations(event) - if err != nil { - return err - } - - // incoming event does not meet expectations for transaction - // event is added to actual and no error is returned - if !meetsExpectations { + if err := p.addToExpectations(event); err != nil { + // TODO: log error because this is an unrecoverable error return nil } - p.clearEmptyBlocks() - p.setReady(event.SlotNumber) - return p.sendReadySlots() } -func (p *orderedParser) addAndCompareExpectations(evt ProgramEvent) (bool, error) { +func (p *orderedParser) start(_ context.Context) error { + p.engine.GoTick(services.NewTicker(time.Second), p.run) + + return nil +} + +func (p *orderedParser) close() error { + return nil +} + +func (p *orderedParser) addToExpectations(evt ProgramEvent) error { expectations, ok := p.expect[evt.SlotNumber] if !ok { - return false, fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber) + return fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber) } evts, ok := p.actual[evt.SlotNumber] if !ok { - return false, fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber) + return fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber) } p.actual[evt.SlotNumber] = append(evts, evt) - return expectations == len(evts)+1, nil -} - -func (p *orderedParser) clearEmptyBlocks() { - rmvIdx := make([]int, 0) - - for idx, block := range p.blocks { - exp, ok := p.expect[block] - if !ok { - // transaction expectations have not been applied for this block yet - continue - } + if expectations == len(evts)+1 { + p.setReady(evt.SlotNumber) + } - if exp == 0 { - rmvIdx = append(rmvIdx, idx) + return nil +} - delete(p.expect, block) - delete(p.actual, block) - } +func (p *orderedParser) expectations(block uint64) (int, bool, error) { + expectations, ok := p.expect[block] + if !ok { + return 0, false, fmt.Errorf("%w: %d", errExpectationsNotSet, block) } - for count, idx := range rmvIdx { - p.blocks = remove(p.blocks, idx-count) + evts, ok := p.actual[block] + if !ok { + return 0, false, fmt.Errorf("%w: %d", errExpectationsNotSet, block) } + + return expectations, expectations == len(evts), nil +} + +func (p *orderedParser) clearExpectations(block uint64) { + delete(p.expect, block) + delete(p.actual, block) } func (p *orderedParser) setReady(slot uint64) { p.ready = append(p.ready, slot) } +func (p *orderedParser) run(_ context.Context) { + p.mu.Lock() + defer p.mu.Unlock() + + _ = p.sendReadySlots() +} + func (p *orderedParser) sendReadySlots() error { rmvIdx := make([]int, 0) // start at the lowest block and find ready blocks for idx, block := range p.blocks { + // if no expectations are set, we are still waiting on information for the block. + // if expectations set and not met, we are still waiting on information for the block + // no other block data should be sent until this is resolved + exp, met, err := p.expectations(block) + if err != nil || !met { + break + } + + // if expectations are 0 -> remove and continue + if exp == 0 { + p.clearExpectations(block) + rmvIdx = append(rmvIdx, idx) + + continue + } + + // if expectations set and met -> forward, remove, and continue + // to ensure ordered delivery, break from the loop if a ready block isn't found // this function should be preceded by clearEmptyBlocks rIdx, ok := getIdx(p.ready, block) diff --git a/pkg/solana/logpoller/loader_test.go b/pkg/solana/logpoller/loader_test.go index d752b0054..166ed9b24 100644 --- a/pkg/solana/logpoller/loader_test.go +++ b/pkg/solana/logpoller/loader_test.go @@ -33,6 +33,8 @@ var ( ) func TestEncodedLogCollector_StartClose(t *testing.T) { + t.Parallel() + client := new(mocks.RPCClient) ctx := tests.Context(t) @@ -43,6 +45,8 @@ func TestEncodedLogCollector_StartClose(t *testing.T) { } func TestEncodedLogCollector_ParseSingleEvent(t *testing.T) { + t.Parallel() + client := new(mocks.RPCClient) parser := new(testParser) ctx := tests.Context(t) @@ -54,39 +58,56 @@ func TestEncodedLogCollector_ParseSingleEvent(t *testing.T) { require.NoError(t, collector.Close()) }) - slot := uint64(42) - sig := solana.Signature{2, 1, 4, 2} - blockHeight := uint64(21) + var latest atomic.Uint64 - client.EXPECT().GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized).Return(&rpc.GetLatestBlockhashResult{ - RPCContext: rpc.RPCContext{ - Context: rpc.Context{ - Slot: slot, - }, - }, - }, nil) + latest.Store(uint64(40)) - client.EXPECT().GetBlocks(mock.Anything, uint64(1), mock.MatchedBy(func(val *uint64) bool { - return val != nil && *val == slot - }), mock.Anything).Return(rpc.BlocksResult{slot}, nil) + client.EXPECT(). + GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized). + RunAndReturn(latestBlockhashReturnFunc(&latest)) - client.EXPECT().GetBlockWithOpts(mock.Anything, slot, mock.Anything).Return(&rpc.GetBlockResult{ - Transactions: []rpc.TransactionWithMeta{ - { - Meta: &rpc.TransactionMeta{ - LogMessages: messages, - }, - }, - }, - Signatures: []solana.Signature{sig}, - BlockHeight: &blockHeight, - }, nil).Twice() + client.EXPECT(). + GetBlocks( + mock.Anything, + mock.MatchedBy(getBlocksStartValMatcher), + mock.MatchedBy(getBlocksEndValMatcher(&latest)), + rpc.CommitmentFinalized, + ). + RunAndReturn(getBlocksReturnFunc(false)) + + client.EXPECT(). + GetBlockWithOpts(mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(_ context.Context, slot uint64, _ *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { + height := slot - 1 + + result := rpc.GetBlockResult{ + Transactions: []rpc.TransactionWithMeta{}, + Signatures: []solana.Signature{}, + BlockHeight: &height, + } + + _, _ = rand.Read(result.Blockhash[:]) + + if slot == 42 { + var sig solana.Signature + _, _ = rand.Read(sig[:]) + + result.Signatures = []solana.Signature{sig} + result.Transactions = []rpc.TransactionWithMeta{ + { + Meta: &rpc.TransactionMeta{ + LogMessages: messages, + }, + }, + } + } + + return &result, nil + }) tests.AssertEventually(t, func() bool { return parser.Called() }) - - client.AssertExpectations(t) } func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { @@ -118,39 +139,19 @@ func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { client.EXPECT(). GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized). - RunAndReturn(func(ctx context.Context, ct rpc.CommitmentType) (*rpc.GetLatestBlockhashResult, error) { - defer func() { - latest.Store(latest.Load() + 2) - }() - - return &rpc.GetLatestBlockhashResult{ - RPCContext: rpc.RPCContext{ - Context: rpc.Context{ - Slot: latest.Load(), - }, - }, - }, nil - }) + RunAndReturn(latestBlockhashReturnFunc(&latest)) client.EXPECT(). - GetBlocks(mock.Anything, mock.MatchedBy(func(val uint64) bool { - return val > uint64(0) - }), mock.MatchedBy(func(val *uint64) bool { - return val != nil && *val <= latest.Load() - }), mock.Anything). - RunAndReturn(func(_ context.Context, u1 uint64, u2 *uint64, _ rpc.CommitmentType) (rpc.BlocksResult, error) { - blocks := make([]uint64, *u2-u1+1) - for idx := range blocks { - blocks[idx] = u1 + uint64(idx) - } - - return rpc.BlocksResult(blocks), nil - }) + GetBlocks( + mock.Anything, + mock.MatchedBy(getBlocksStartValMatcher), + mock.MatchedBy(getBlocksEndValMatcher(&latest)), + rpc.CommitmentFinalized, + ). + RunAndReturn(getBlocksReturnFunc(false)) client.EXPECT(). - GetBlockWithOpts(mock.Anything, mock.MatchedBy(func(val uint64) bool { - return true - }), mock.Anything). + GetBlockWithOpts(mock.Anything, mock.Anything, mock.Anything). RunAndReturn(func(_ context.Context, slot uint64, _ *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { slotIdx := -1 for idx, slt := range slots { @@ -227,6 +228,8 @@ func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { } func TestEncodedLogCollector_BackfillForAddress(t *testing.T) { + t.Parallel() + client := new(mocks.RPCClient) parser := new(testParser) ctx := tests.Context(t) @@ -241,65 +244,91 @@ func TestEncodedLogCollector_BackfillForAddress(t *testing.T) { pubKey := solana.PublicKey{2, 1, 4, 2} slots := []uint64{44, 43, 42} sigs := make([]solana.Signature, len(slots)*2) - blockHeights := []uint64{21, 22, 23, 50} for idx := range len(sigs) { _, _ = rand.Read(sigs[idx][:]) } + var latest atomic.Uint64 + + latest.Store(uint64(40)) + // GetLatestBlockhash might be called at start-up; make it take some time because the result isn't needed for this test - client.EXPECT().GetLatestBlockhash(mock.Anything, mock.Anything).Return(&rpc.GetLatestBlockhashResult{ - RPCContext: rpc.RPCContext{ - Context: rpc.Context{ - Slot: slots[0], - }, - }, - Value: &rpc.LatestBlockhashResult{ - LastValidBlockHeight: 42, - }, - }, nil).After(2 * time.Second).Maybe() + client.EXPECT(). + GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized). + RunAndReturn(latestBlockhashReturnFunc(&latest)). + After(2 * time.Second). + Maybe() client.EXPECT(). - GetSignaturesForAddressWithOpts(mock.Anything, pubKey, mock.MatchedBy(func(opts *rpc.GetSignaturesForAddressOpts) bool { - return opts != nil && opts.Before.String() == solana.Signature{}.String() - })). - Return([]*rpc.TransactionSignature{ - {Slot: slots[0], Signature: sigs[0]}, - {Slot: slots[0], Signature: sigs[1]}, - {Slot: slots[1], Signature: sigs[2]}, - {Slot: slots[1], Signature: sigs[3]}, - {Slot: slots[2], Signature: sigs[4]}, - {Slot: slots[2], Signature: sigs[5]}, - }, nil) - - client.EXPECT().GetSignaturesForAddressWithOpts(mock.Anything, pubKey, mock.Anything).Return([]*rpc.TransactionSignature{}, nil) - - for idx := range len(slots) { - client.EXPECT().GetBlockWithOpts(mock.Anything, slots[idx], mock.Anything).Return(&rpc.GetBlockResult{ - Transactions: []rpc.TransactionWithMeta{ - { - Meta: &rpc.TransactionMeta{ - LogMessages: messages, + GetBlocks( + mock.Anything, + mock.MatchedBy(getBlocksStartValMatcher), + mock.MatchedBy(getBlocksEndValMatcher(&latest)), + rpc.CommitmentFinalized, + ). + RunAndReturn(getBlocksReturnFunc(true)) + + client.EXPECT(). + GetSignaturesForAddressWithOpts(mock.Anything, pubKey, mock.Anything). + RunAndReturn(func(_ context.Context, pk solana.PublicKey, opts *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error) { + ret := []*rpc.TransactionSignature{} + + if opts != nil && opts.Before.String() == (solana.Signature{}).String() { + for idx := range slots { + ret = append(ret, &rpc.TransactionSignature{Slot: slots[idx], Signature: sigs[idx*2]}) + ret = append(ret, &rpc.TransactionSignature{Slot: slots[idx], Signature: sigs[(idx*2)+1]}) + } + } + + return ret, nil + }) + + client.EXPECT(). + GetBlockWithOpts(mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(_ context.Context, slot uint64, _ *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { + idx := -1 + for sIdx, slt := range slots { + if slt == slot { + idx = sIdx + + break + } + } + + height := slot - 1 + + if idx == -1 { + return &rpc.GetBlockResult{ + Transactions: []rpc.TransactionWithMeta{}, + Signatures: []solana.Signature{}, + BlockHeight: &height, + }, nil + } + + return &rpc.GetBlockResult{ + Transactions: []rpc.TransactionWithMeta{ + { + Meta: &rpc.TransactionMeta{ + LogMessages: messages, + }, }, - }, - { - Meta: &rpc.TransactionMeta{ - LogMessages: messages, + { + Meta: &rpc.TransactionMeta{ + LogMessages: messages, + }, }, }, - }, - Signatures: []solana.Signature{sigs[idx*2], sigs[(idx*2)+1]}, - BlockHeight: &blockHeights[idx], - }, nil).Twice() - } + Signatures: []solana.Signature{sigs[idx*2], sigs[(idx*2)+1]}, + BlockHeight: &height, + }, nil + }) assert.NoError(t, collector.BackfillForAddress(ctx, pubKey.String(), 42)) tests.AssertEventually(t, func() bool { return parser.Count() == 6 }) - - client.AssertExpectations(t) } func BenchmarkEncodedLogCollector(b *testing.B) { @@ -516,3 +545,47 @@ func (p *testParser) Events() []logpoller.ProgramEvent { return p.events } + +func latestBlockhashReturnFunc(latest *atomic.Uint64) func(context.Context, rpc.CommitmentType) (*rpc.GetLatestBlockhashResult, error) { + return func(ctx context.Context, ct rpc.CommitmentType) (*rpc.GetLatestBlockhashResult, error) { + defer func() { + latest.Store(latest.Load() + 2) + }() + + return &rpc.GetLatestBlockhashResult{ + RPCContext: rpc.RPCContext{ + Context: rpc.Context{ + Slot: latest.Load(), + }, + }, + Value: &rpc.LatestBlockhashResult{ + LastValidBlockHeight: latest.Load() - 1, + }, + }, nil + } +} + +func getBlocksReturnFunc(empty bool) func(context.Context, uint64, *uint64, rpc.CommitmentType) (rpc.BlocksResult, error) { + return func(_ context.Context, u1 uint64, u2 *uint64, _ rpc.CommitmentType) (rpc.BlocksResult, error) { + blocks := []uint64{} + + if !empty { + blocks = make([]uint64, *u2-u1+1) + for idx := range blocks { + blocks[idx] = u1 + uint64(idx) + } + } + + return rpc.BlocksResult(blocks), nil + } +} + +func getBlocksStartValMatcher(val uint64) bool { + return val > uint64(0) +} + +func getBlocksEndValMatcher(latest *atomic.Uint64) func(*uint64) bool { + return func(val *uint64) bool { + return val != nil && *val <= latest.Load() + } +} From d901b2094c17fb916d80b2dd1b7dd87214e6f205 Mon Sep 17 00:00:00 2001 From: Awbrey Hughlett Date: Mon, 16 Dec 2024 16:23:29 -0600 Subject: [PATCH 3/6] simplify block and expectation ordering --- pkg/solana/logpoller/loader.go | 63 +++++++++++------------------ pkg/solana/logpoller/loader_test.go | 57 ++++++++++++++++++++++---- 2 files changed, 74 insertions(+), 46 deletions(-) diff --git a/pkg/solana/logpoller/loader.go b/pkg/solana/logpoller/loader.go index 8791e2a9f..db9020303 100644 --- a/pkg/solana/logpoller/loader.go +++ b/pkg/solana/logpoller/loader.go @@ -4,8 +4,7 @@ import ( "context" "errors" "fmt" - "reflect" - "sort" + "slices" "sync" "sync/atomic" "time" @@ -280,8 +279,21 @@ func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, en return err } + // as a safety mechanism, order the blocks ascending (oldest to newest) in the extreme case + // that the RPC changes and results get jumbled. + slices.SortFunc(result, func(a, b uint64) int { + if a < b { + return -1 + } else if a > b { + return 1 + } + + return 0 + }) + for _, block := range result { c.ordered.ExpectBlock(block) + select { case <-ctx.Done(): return nil @@ -338,16 +350,12 @@ func newOrderedParser(parser ProgramEventProcessor, lggr logger.Logger) *ordered return op } +// ExpectBlock should be called in block order to preserve block progression. func (p *orderedParser) ExpectBlock(block uint64) { p.mu.Lock() defer p.mu.Unlock() p.blocks = append(p.blocks, block) - - // ensure sort ascending - sort.Slice(p.blocks, func(i, j int) bool { - return p.blocks[i] < p.blocks[j] - }) } func (p *orderedParser) ExpectTxs(block uint64, quantity int) { @@ -431,10 +439,8 @@ func (p *orderedParser) run(_ context.Context) { } func (p *orderedParser) sendReadySlots() error { - rmvIdx := make([]int, 0) - // start at the lowest block and find ready blocks - for idx, block := range p.blocks { + for _, block := range p.blocks { // if no expectations are set, we are still waiting on information for the block. // if expectations set and not met, we are still waiting on information for the block // no other block data should be sent until this is resolved @@ -446,23 +452,20 @@ func (p *orderedParser) sendReadySlots() error { // if expectations are 0 -> remove and continue if exp == 0 { p.clearExpectations(block) - rmvIdx = append(rmvIdx, idx) + p.blocks = p.blocks[1:] continue } - // if expectations set and met -> forward, remove, and continue - // to ensure ordered delivery, break from the loop if a ready block isn't found - // this function should be preceded by clearEmptyBlocks - rIdx, ok := getIdx(p.ready, block) - if !ok { + rIdx := slices.Index(p.ready, block) + if rIdx < 0 { return nil } evts, ok := p.actual[block] if !ok { - return errors.New("invalid state") + return errInvalidState } var errs error @@ -475,34 +478,16 @@ func (p *orderedParser) sendReadySlots() error { return errs } - p.ready = remove(p.ready, rIdx) - rmvIdx = append(rmvIdx, idx) + p.ready = slices.Delete(p.ready, rIdx, rIdx+1) + p.blocks = p.blocks[1:] - delete(p.expect, block) - delete(p.actual, block) - } - - for count, idx := range rmvIdx { - p.blocks = remove(p.blocks, idx-count) + p.clearExpectations(block) } return nil } -func getIdx[T any](slice []T, match T) (int, bool) { - for idx, value := range slice { - if reflect.DeepEqual(value, match) { - return idx, true - } - } - - return -1, false -} - -func remove[T any](slice []T, s int) []T { - return append(slice[:s], slice[s+1:]...) -} - var ( errExpectationsNotSet = errors.New("expectations not set") + errInvalidState = errors.New("invalid state") ) diff --git a/pkg/solana/logpoller/loader_test.go b/pkg/solana/logpoller/loader_test.go index 166ed9b24..e3cbb7700 100644 --- a/pkg/solana/logpoller/loader_test.go +++ b/pkg/solana/logpoller/loader_test.go @@ -128,9 +128,10 @@ func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { latest.Store(uint64(40)) - slots := []uint64{43, 42} + slots := []uint64{44, 43, 42, 41} sigs := make([]solana.Signature, len(slots)) hashes := make([]solana.Hash, len(slots)) + scrambler := &slotUnsync{ch: make(chan struct{})} for idx := range len(sigs) { _, _ = rand.Read(sigs[idx][:]) @@ -162,10 +163,9 @@ func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { } } - if slot == 42 { - // force slot 42 to return after 43 - time.Sleep(1 * time.Second) - } + // imitate loading block data out of order + // every other block must wait for the block previous + scrambler.next() height := slot - 1 @@ -197,12 +197,24 @@ func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { tests.AssertEventually(t, func() bool { return reflect.DeepEqual(parser.Events(), []logpoller.ProgramEvent{ + { + BlockData: logpoller.BlockData{ + SlotNumber: 41, + BlockHeight: 40, + BlockHash: hashes[3], + TransactionHash: sigs[3], + TransactionIndex: 0, + TransactionLogIndex: 0, + }, + Prefix: ">", + Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + }, { BlockData: logpoller.BlockData{ SlotNumber: 42, BlockHeight: 41, - BlockHash: hashes[1], - TransactionHash: sigs[1], + BlockHash: hashes[2], + TransactionHash: sigs[2], TransactionIndex: 0, TransactionLogIndex: 0, }, @@ -213,6 +225,18 @@ func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { BlockData: logpoller.BlockData{ SlotNumber: 43, BlockHeight: 42, + BlockHash: hashes[1], + TransactionHash: sigs[1], + TransactionIndex: 0, + TransactionLogIndex: 0, + }, + Prefix: ">", + Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + }, + { + BlockData: logpoller.BlockData{ + SlotNumber: 44, + BlockHeight: 43, BlockHash: hashes[0], TransactionHash: sigs[0], TransactionIndex: 0, @@ -227,6 +251,25 @@ func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { client.AssertExpectations(t) } +type slotUnsync struct { + ch chan struct{} + waiting atomic.Bool +} + +func (u *slotUnsync) next() { + if u.waiting.Load() { + u.waiting.Store(false) + + <-u.ch + + return + } + + u.waiting.Store(true) + + u.ch <- struct{}{} +} + func TestEncodedLogCollector_BackfillForAddress(t *testing.T) { t.Parallel() From 28e8dea2b128f1672462d44c727ac78401beb769 Mon Sep 17 00:00:00 2001 From: Awbrey Hughlett Date: Tue, 17 Dec 2024 11:46:43 -0600 Subject: [PATCH 4/6] remove ready as an indicator since it is redundant --- pkg/solana/logpoller/loader.go | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/pkg/solana/logpoller/loader.go b/pkg/solana/logpoller/loader.go index db9020303..f4b1424e8 100644 --- a/pkg/solana/logpoller/loader.go +++ b/pkg/solana/logpoller/loader.go @@ -327,7 +327,6 @@ type orderedParser struct { parser ProgramEventProcessor mu sync.Mutex blocks []uint64 - ready []uint64 expect map[uint64]int actual map[uint64][]ProgramEvent } @@ -336,7 +335,6 @@ func newOrderedParser(parser ProgramEventProcessor, lggr logger.Logger) *ordered op := &orderedParser{ parser: parser, blocks: make([]uint64, 0), - ready: make([]uint64, 0), expect: make(map[uint64]int), actual: make(map[uint64][]ProgramEvent), } @@ -389,7 +387,7 @@ func (p *orderedParser) close() error { } func (p *orderedParser) addToExpectations(evt ProgramEvent) error { - expectations, ok := p.expect[evt.SlotNumber] + _, ok := p.expect[evt.SlotNumber] if !ok { return fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber) } @@ -401,10 +399,6 @@ func (p *orderedParser) addToExpectations(evt ProgramEvent) error { p.actual[evt.SlotNumber] = append(evts, evt) - if expectations == len(evts)+1 { - p.setReady(evt.SlotNumber) - } - return nil } @@ -427,10 +421,6 @@ func (p *orderedParser) clearExpectations(block uint64) { delete(p.actual, block) } -func (p *orderedParser) setReady(slot uint64) { - p.ready = append(p.ready, slot) -} - func (p *orderedParser) run(_ context.Context) { p.mu.Lock() defer p.mu.Unlock() @@ -457,12 +447,6 @@ func (p *orderedParser) sendReadySlots() error { continue } - // to ensure ordered delivery, break from the loop if a ready block isn't found - rIdx := slices.Index(p.ready, block) - if rIdx < 0 { - return nil - } - evts, ok := p.actual[block] if !ok { return errInvalidState @@ -478,7 +462,6 @@ func (p *orderedParser) sendReadySlots() error { return errs } - p.ready = slices.Delete(p.ready, rIdx, rIdx+1) p.blocks = p.blocks[1:] p.clearExpectations(block) From c54eb2ac800fb71b58127a938eb23ccd4ca49197 Mon Sep 17 00:00:00 2001 From: Awbrey Hughlett Date: Wed, 18 Dec 2024 09:18:32 -0600 Subject: [PATCH 5/6] use container/list for blocks --- pkg/solana/logpoller/loader.go | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/pkg/solana/logpoller/loader.go b/pkg/solana/logpoller/loader.go index f4b1424e8..eb9794864 100644 --- a/pkg/solana/logpoller/loader.go +++ b/pkg/solana/logpoller/loader.go @@ -1,6 +1,7 @@ package logpoller import ( + "container/list" "context" "errors" "fmt" @@ -326,7 +327,7 @@ type orderedParser struct { // internal state parser ProgramEventProcessor mu sync.Mutex - blocks []uint64 + blocks *list.List expect map[uint64]int actual map[uint64][]ProgramEvent } @@ -334,7 +335,7 @@ type orderedParser struct { func newOrderedParser(parser ProgramEventProcessor, lggr logger.Logger) *orderedParser { op := &orderedParser{ parser: parser, - blocks: make([]uint64, 0), + blocks: list.New(), expect: make(map[uint64]int), actual: make(map[uint64][]ProgramEvent), } @@ -353,7 +354,7 @@ func (p *orderedParser) ExpectBlock(block uint64) { p.mu.Lock() defer p.mu.Unlock() - p.blocks = append(p.blocks, block) + p.blocks.PushBack(block) } func (p *orderedParser) ExpectTxs(block uint64, quantity int) { @@ -430,7 +431,8 @@ func (p *orderedParser) run(_ context.Context) { func (p *orderedParser) sendReadySlots() error { // start at the lowest block and find ready blocks - for _, block := range p.blocks { + for element := p.blocks.Front(); element != nil; element = element.Next() { + block := element.Value.(uint64) // if no expectations are set, we are still waiting on information for the block. // if expectations set and not met, we are still waiting on information for the block // no other block data should be sent until this is resolved @@ -442,7 +444,14 @@ func (p *orderedParser) sendReadySlots() error { // if expectations are 0 -> remove and continue if exp == 0 { p.clearExpectations(block) - p.blocks = p.blocks[1:] + + temp := element.Prev() + p.blocks.Remove(element) + if temp == nil { + break + } + + element = temp continue } @@ -462,7 +471,14 @@ func (p *orderedParser) sendReadySlots() error { return errs } - p.blocks = p.blocks[1:] + temp := element.Prev() + p.blocks.Remove(element) + + if temp == nil { + break + } + + element = temp p.clearExpectations(block) } From f5bf83fd495a50d4559a56ce50190a292ea2c3b7 Mon Sep 17 00:00:00 2001 From: Awbrey Hughlett Date: Wed, 18 Dec 2024 10:09:09 -0600 Subject: [PATCH 6/6] simplify list item removal --- pkg/solana/logpoller/loader.go | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/pkg/solana/logpoller/loader.go b/pkg/solana/logpoller/loader.go index eb9794864..d714f08ad 100644 --- a/pkg/solana/logpoller/loader.go +++ b/pkg/solana/logpoller/loader.go @@ -431,7 +431,7 @@ func (p *orderedParser) run(_ context.Context) { func (p *orderedParser) sendReadySlots() error { // start at the lowest block and find ready blocks - for element := p.blocks.Front(); element != nil; element = element.Next() { + for element := p.blocks.Front(); element != nil; element = p.blocks.Front() { block := element.Value.(uint64) // if no expectations are set, we are still waiting on information for the block. // if expectations set and not met, we are still waiting on information for the block @@ -444,14 +444,7 @@ func (p *orderedParser) sendReadySlots() error { // if expectations are 0 -> remove and continue if exp == 0 { p.clearExpectations(block) - - temp := element.Prev() p.blocks.Remove(element) - if temp == nil { - break - } - - element = temp continue } @@ -471,15 +464,7 @@ func (p *orderedParser) sendReadySlots() error { return errs } - temp := element.Prev() p.blocks.Remove(element) - - if temp == nil { - break - } - - element = temp - p.clearExpectations(block) }