diff --git a/pkg/solana/logpoller/job.go b/pkg/solana/logpoller/job.go index 1d827a85b..165c0b5fe 100644 --- a/pkg/solana/logpoller/job.go +++ b/pkg/solana/logpoller/job.go @@ -33,7 +33,8 @@ func (j retryableJob) Run(ctx context.Context) error { } type eventDetail struct { - blockNumber uint64 + slotNumber uint64 + blockHeight uint64 blockHash solana.Hash trxIdx int trxSig solana.Signature @@ -54,12 +55,18 @@ func (j *processEventJob) Run(_ context.Context) error { return j.parser.Process(j.event) } +type wrappedParser interface { + ProgramEventProcessor + ExpectBlock(uint64) + ExpectTxs(uint64, int) +} + // getTransactionsFromBlockJob is a job that fetches transaction signatures from a block and loads // the job queue with getTransactionLogsJobs for each transaction found in the block. type getTransactionsFromBlockJob struct { slotNumber uint64 client RPCClient - parser ProgramEventProcessor + parser wrappedParser chJobs chan Job } @@ -103,17 +110,20 @@ func (j *getTransactionsFromBlockJob) Run(ctx context.Context) error { } detail := eventDetail{ - blockHash: block.Blockhash, + slotNumber: j.slotNumber, + blockHash: block.Blockhash, } if block.BlockHeight != nil { - detail.blockNumber = *block.BlockHeight + detail.blockHeight = *block.BlockHeight } if len(block.Transactions) != len(blockSigsOnly.Signatures) { return fmt.Errorf("block %d has %d transactions but %d signatures", j.slotNumber, len(block.Transactions), len(blockSigsOnly.Signatures)) } + j.parser.ExpectTxs(j.slotNumber, len(block.Transactions)) + for idx, trx := range block.Transactions { detail.trxIdx = idx if len(blockSigsOnly.Signatures)-1 <= idx { @@ -130,14 +140,15 @@ func messagesToEvents(messages []string, parser ProgramEventProcessor, detail ev var logIdx uint for _, outputs := range parseProgramLogs(messages) { for _, event := range outputs.Events { - logIdx++ - - event.BlockNumber = detail.blockNumber + event.SlotNumber = detail.slotNumber + event.BlockHeight = detail.blockHeight event.BlockHash = detail.blockHash event.TransactionHash = detail.trxSig event.TransactionIndex = detail.trxIdx event.TransactionLogIndex = logIdx + logIdx++ + chJobs <- &processEventJob{ parser: parser, event: event, diff --git a/pkg/solana/logpoller/loader.go b/pkg/solana/logpoller/loader.go index 56fcef25c..d714f08ad 100644 --- a/pkg/solana/logpoller/loader.go +++ b/pkg/solana/logpoller/loader.go @@ -1,8 +1,12 @@ package logpoller import ( + "container/list" "context" "errors" + "fmt" + "slices" + "sync" "sync/atomic" "time" @@ -40,7 +44,8 @@ type EncodedLogCollector struct { // dependencies and configuration client RPCClient - parser ProgramEventProcessor + ordered *orderedParser + unordered *unorderedParser lggr logger.Logger rpcTimeLimit time.Duration @@ -62,7 +67,7 @@ func NewEncodedLogCollector( ) *EncodedLogCollector { c := &EncodedLogCollector{ client: client, - parser: parser, + unordered: newUnorderedParser(parser), chSlot: make(chan uint64), chBlock: make(chan uint64, 1), chJobs: make(chan Job, 1), @@ -74,8 +79,9 @@ func NewEncodedLogCollector( Name: "EncodedLogCollector", NewSubServices: func(lggr logger.Logger) []services.Service { c.workers = NewWorkerGroup(DefaultWorkerCount, lggr) + c.ordered = newOrderedParser(parser, lggr) - return []services.Service{c.workers} + return []services.Service{c.workers, c.ordered} }, Start: c.start, Close: c.close, @@ -127,7 +133,7 @@ func (c *EncodedLogCollector) BackfillForAddress(ctx context.Context, address st if err := c.workers.Do(ctx, &getTransactionsFromBlockJob{ slotNumber: sig.Slot, client: c.client, - parser: c.parser, + parser: c.unordered, chJobs: c.chJobs, }); err != nil { return err @@ -138,7 +144,7 @@ func (c *EncodedLogCollector) BackfillForAddress(ctx context.Context, address st return nil } -func (c *EncodedLogCollector) start(ctx context.Context) error { +func (c *EncodedLogCollector) start(_ context.Context) error { c.engine.Go(c.runSlotPolling) c.engine.Go(c.runSlotProcessing) c.engine.Go(c.runBlockProcessing) @@ -201,10 +207,15 @@ func (c *EncodedLogCollector) runSlotProcessing(ctx context.Context) { continue } + from := c.highestSlot.Load() + 1 + if c.highestSlot.Load() == 0 { + from = slot + } + c.highestSlot.Store(slot) // load blocks in slot range - c.loadRange(ctx, c.highestSlotLoaded.Load()+1, slot) + c.loadRange(ctx, from, slot) } } } @@ -214,11 +225,11 @@ func (c *EncodedLogCollector) runBlockProcessing(ctx context.Context) { select { case <-ctx.Done(): return - case block := <-c.chBlock: + case slot := <-c.chBlock: if err := c.workers.Do(ctx, &getTransactionsFromBlockJob{ - slotNumber: block, + slotNumber: slot, client: c.client, - parser: c.parser, + parser: c.ordered, chJobs: c.chJobs, }); err != nil { c.lggr.Errorf("failed to add job to queue: %s", err) @@ -269,7 +280,21 @@ func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, en return err } + // as a safety mechanism, order the blocks ascending (oldest to newest) in the extreme case + // that the RPC changes and results get jumbled. + slices.SortFunc(result, func(a, b uint64) int { + if a < b { + return -1 + } else if a > b { + return 1 + } + + return 0 + }) + for _, block := range result { + c.ordered.ExpectBlock(block) + select { case <-ctx.Done(): return nil @@ -279,3 +304,174 @@ func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, en return nil } + +type unorderedParser struct { + parser ProgramEventProcessor +} + +func newUnorderedParser(parser ProgramEventProcessor) *unorderedParser { + return &unorderedParser{parser: parser} +} + +func (p *unorderedParser) ExpectBlock(_ uint64) {} +func (p *unorderedParser) ExpectTxs(_ uint64, _ int) {} +func (p *unorderedParser) Process(evt ProgramEvent) error { + return p.parser.Process(evt) +} + +type orderedParser struct { + // service state management + services.Service + engine *services.Engine + + // internal state + parser ProgramEventProcessor + mu sync.Mutex + blocks *list.List + expect map[uint64]int + actual map[uint64][]ProgramEvent +} + +func newOrderedParser(parser ProgramEventProcessor, lggr logger.Logger) *orderedParser { + op := &orderedParser{ + parser: parser, + blocks: list.New(), + expect: make(map[uint64]int), + actual: make(map[uint64][]ProgramEvent), + } + + op.Service, op.engine = services.Config{ + Name: "OrderedParser", + Start: op.start, + Close: op.close, + }.NewServiceEngine(lggr) + + return op +} + +// ExpectBlock should be called in block order to preserve block progression. +func (p *orderedParser) ExpectBlock(block uint64) { + p.mu.Lock() + defer p.mu.Unlock() + + p.blocks.PushBack(block) +} + +func (p *orderedParser) ExpectTxs(block uint64, quantity int) { + p.mu.Lock() + defer p.mu.Unlock() + + p.expect[block] = quantity + p.actual[block] = make([]ProgramEvent, 0, quantity) +} + +func (p *orderedParser) Process(event ProgramEvent) error { + p.mu.Lock() + defer p.mu.Unlock() + + if err := p.addToExpectations(event); err != nil { + // TODO: log error because this is an unrecoverable error + return nil + } + + return p.sendReadySlots() +} + +func (p *orderedParser) start(_ context.Context) error { + p.engine.GoTick(services.NewTicker(time.Second), p.run) + + return nil +} + +func (p *orderedParser) close() error { + return nil +} + +func (p *orderedParser) addToExpectations(evt ProgramEvent) error { + _, ok := p.expect[evt.SlotNumber] + if !ok { + return fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber) + } + + evts, ok := p.actual[evt.SlotNumber] + if !ok { + return fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber) + } + + p.actual[evt.SlotNumber] = append(evts, evt) + + return nil +} + +func (p *orderedParser) expectations(block uint64) (int, bool, error) { + expectations, ok := p.expect[block] + if !ok { + return 0, false, fmt.Errorf("%w: %d", errExpectationsNotSet, block) + } + + evts, ok := p.actual[block] + if !ok { + return 0, false, fmt.Errorf("%w: %d", errExpectationsNotSet, block) + } + + return expectations, expectations == len(evts), nil +} + +func (p *orderedParser) clearExpectations(block uint64) { + delete(p.expect, block) + delete(p.actual, block) +} + +func (p *orderedParser) run(_ context.Context) { + p.mu.Lock() + defer p.mu.Unlock() + + _ = p.sendReadySlots() +} + +func (p *orderedParser) sendReadySlots() error { + // start at the lowest block and find ready blocks + for element := p.blocks.Front(); element != nil; element = p.blocks.Front() { + block := element.Value.(uint64) + // if no expectations are set, we are still waiting on information for the block. + // if expectations set and not met, we are still waiting on information for the block + // no other block data should be sent until this is resolved + exp, met, err := p.expectations(block) + if err != nil || !met { + break + } + + // if expectations are 0 -> remove and continue + if exp == 0 { + p.clearExpectations(block) + p.blocks.Remove(element) + + continue + } + + evts, ok := p.actual[block] + if !ok { + return errInvalidState + } + + var errs error + for _, evt := range evts { + errs = errors.Join(errs, p.parser.Process(evt)) + } + + // need possible retry + if errs != nil { + return errs + } + + p.blocks.Remove(element) + p.clearExpectations(block) + } + + return nil +} + +var ( + errExpectationsNotSet = errors.New("expectations not set") + errInvalidState = errors.New("invalid state") +) diff --git a/pkg/solana/logpoller/loader_test.go b/pkg/solana/logpoller/loader_test.go index 69a37702b..e3cbb7700 100644 --- a/pkg/solana/logpoller/loader_test.go +++ b/pkg/solana/logpoller/loader_test.go @@ -3,6 +3,7 @@ package logpoller_test import ( "context" "crypto/rand" + "reflect" "sync" "sync/atomic" "testing" @@ -32,6 +33,8 @@ var ( ) func TestEncodedLogCollector_StartClose(t *testing.T) { + t.Parallel() + client := new(mocks.RPCClient) ctx := tests.Context(t) @@ -42,6 +45,8 @@ func TestEncodedLogCollector_StartClose(t *testing.T) { } func TestEncodedLogCollector_ParseSingleEvent(t *testing.T) { + t.Parallel() + client := new(mocks.RPCClient) parser := new(testParser) ctx := tests.Context(t) @@ -53,42 +58,221 @@ func TestEncodedLogCollector_ParseSingleEvent(t *testing.T) { require.NoError(t, collector.Close()) }) - slot := uint64(42) - sig := solana.Signature{2, 1, 4, 2} - blockHeight := uint64(21) + var latest atomic.Uint64 - client.EXPECT().GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized).Return(&rpc.GetLatestBlockhashResult{ - RPCContext: rpc.RPCContext{ - Context: rpc.Context{ - Slot: slot, - }, - }, - }, nil) + latest.Store(uint64(40)) - client.EXPECT().GetBlocks(mock.Anything, uint64(1), mock.MatchedBy(func(val *uint64) bool { - return val != nil && *val == slot - }), mock.Anything).Return(rpc.BlocksResult{slot}, nil) + client.EXPECT(). + GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized). + RunAndReturn(latestBlockhashReturnFunc(&latest)) - client.EXPECT().GetBlockWithOpts(mock.Anything, slot, mock.Anything).Return(&rpc.GetBlockResult{ - Transactions: []rpc.TransactionWithMeta{ - { - Meta: &rpc.TransactionMeta{ - LogMessages: messages, - }, - }, - }, - Signatures: []solana.Signature{sig}, - BlockHeight: &blockHeight, - }, nil).Twice() + client.EXPECT(). + GetBlocks( + mock.Anything, + mock.MatchedBy(getBlocksStartValMatcher), + mock.MatchedBy(getBlocksEndValMatcher(&latest)), + rpc.CommitmentFinalized, + ). + RunAndReturn(getBlocksReturnFunc(false)) + + client.EXPECT(). + GetBlockWithOpts(mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(_ context.Context, slot uint64, _ *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { + height := slot - 1 + + result := rpc.GetBlockResult{ + Transactions: []rpc.TransactionWithMeta{}, + Signatures: []solana.Signature{}, + BlockHeight: &height, + } + + _, _ = rand.Read(result.Blockhash[:]) + + if slot == 42 { + var sig solana.Signature + _, _ = rand.Read(sig[:]) + + result.Signatures = []solana.Signature{sig} + result.Transactions = []rpc.TransactionWithMeta{ + { + Meta: &rpc.TransactionMeta{ + LogMessages: messages, + }, + }, + } + } + + return &result, nil + }) tests.AssertEventually(t, func() bool { return parser.Called() }) +} + +func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { + t.Parallel() + + client := new(mocks.RPCClient) + parser := new(testParser) + ctx := tests.Context(t) + + collector := logpoller.NewEncodedLogCollector(client, parser, logger.Nop()) + + require.NoError(t, collector.Start(ctx)) + t.Cleanup(func() { + require.NoError(t, collector.Close()) + }) + + var latest atomic.Uint64 + + latest.Store(uint64(40)) + + slots := []uint64{44, 43, 42, 41} + sigs := make([]solana.Signature, len(slots)) + hashes := make([]solana.Hash, len(slots)) + scrambler := &slotUnsync{ch: make(chan struct{})} + + for idx := range len(sigs) { + _, _ = rand.Read(sigs[idx][:]) + _, _ = rand.Read(hashes[idx][:]) + } + + client.EXPECT(). + GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized). + RunAndReturn(latestBlockhashReturnFunc(&latest)) + + client.EXPECT(). + GetBlocks( + mock.Anything, + mock.MatchedBy(getBlocksStartValMatcher), + mock.MatchedBy(getBlocksEndValMatcher(&latest)), + rpc.CommitmentFinalized, + ). + RunAndReturn(getBlocksReturnFunc(false)) + + client.EXPECT(). + GetBlockWithOpts(mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(_ context.Context, slot uint64, _ *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { + slotIdx := -1 + for idx, slt := range slots { + if slt == slot { + slotIdx = idx + + break + } + } + + // imitate loading block data out of order + // every other block must wait for the block previous + scrambler.next() + + height := slot - 1 + + if slotIdx == -1 { + var hash solana.Hash + _, _ = rand.Read(hash[:]) + + return &rpc.GetBlockResult{ + Blockhash: hash, + Transactions: []rpc.TransactionWithMeta{}, + Signatures: []solana.Signature{}, + BlockHeight: &height, + }, nil + } + + return &rpc.GetBlockResult{ + Blockhash: hashes[slotIdx], + Transactions: []rpc.TransactionWithMeta{ + { + Meta: &rpc.TransactionMeta{ + LogMessages: messages, + }, + }, + }, + Signatures: []solana.Signature{sigs[slotIdx]}, + BlockHeight: &height, + }, nil + }) + + tests.AssertEventually(t, func() bool { + return reflect.DeepEqual(parser.Events(), []logpoller.ProgramEvent{ + { + BlockData: logpoller.BlockData{ + SlotNumber: 41, + BlockHeight: 40, + BlockHash: hashes[3], + TransactionHash: sigs[3], + TransactionIndex: 0, + TransactionLogIndex: 0, + }, + Prefix: ">", + Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + }, + { + BlockData: logpoller.BlockData{ + SlotNumber: 42, + BlockHeight: 41, + BlockHash: hashes[2], + TransactionHash: sigs[2], + TransactionIndex: 0, + TransactionLogIndex: 0, + }, + Prefix: ">", + Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + }, + { + BlockData: logpoller.BlockData{ + SlotNumber: 43, + BlockHeight: 42, + BlockHash: hashes[1], + TransactionHash: sigs[1], + TransactionIndex: 0, + TransactionLogIndex: 0, + }, + Prefix: ">", + Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + }, + { + BlockData: logpoller.BlockData{ + SlotNumber: 44, + BlockHeight: 43, + BlockHash: hashes[0], + TransactionHash: sigs[0], + TransactionIndex: 0, + TransactionLogIndex: 0, + }, + Prefix: ">", + Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + }, + }) + }) client.AssertExpectations(t) } +type slotUnsync struct { + ch chan struct{} + waiting atomic.Bool +} + +func (u *slotUnsync) next() { + if u.waiting.Load() { + u.waiting.Store(false) + + <-u.ch + + return + } + + u.waiting.Store(true) + + u.ch <- struct{}{} +} + func TestEncodedLogCollector_BackfillForAddress(t *testing.T) { + t.Parallel() + client := new(mocks.RPCClient) parser := new(testParser) ctx := tests.Context(t) @@ -103,65 +287,91 @@ func TestEncodedLogCollector_BackfillForAddress(t *testing.T) { pubKey := solana.PublicKey{2, 1, 4, 2} slots := []uint64{44, 43, 42} sigs := make([]solana.Signature, len(slots)*2) - blockHeights := []uint64{21, 22, 23, 50} for idx := range len(sigs) { _, _ = rand.Read(sigs[idx][:]) } + var latest atomic.Uint64 + + latest.Store(uint64(40)) + // GetLatestBlockhash might be called at start-up; make it take some time because the result isn't needed for this test - client.EXPECT().GetLatestBlockhash(mock.Anything, mock.Anything).Return(&rpc.GetLatestBlockhashResult{ - RPCContext: rpc.RPCContext{ - Context: rpc.Context{ - Slot: slots[0], - }, - }, - Value: &rpc.LatestBlockhashResult{ - LastValidBlockHeight: 42, - }, - }, nil).After(2 * time.Second).Maybe() + client.EXPECT(). + GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized). + RunAndReturn(latestBlockhashReturnFunc(&latest)). + After(2 * time.Second). + Maybe() client.EXPECT(). - GetSignaturesForAddressWithOpts(mock.Anything, pubKey, mock.MatchedBy(func(opts *rpc.GetSignaturesForAddressOpts) bool { - return opts != nil && opts.Before.String() == solana.Signature{}.String() - })). - Return([]*rpc.TransactionSignature{ - {Slot: slots[0], Signature: sigs[0]}, - {Slot: slots[0], Signature: sigs[1]}, - {Slot: slots[1], Signature: sigs[2]}, - {Slot: slots[1], Signature: sigs[3]}, - {Slot: slots[2], Signature: sigs[4]}, - {Slot: slots[2], Signature: sigs[5]}, - }, nil) - - client.EXPECT().GetSignaturesForAddressWithOpts(mock.Anything, pubKey, mock.Anything).Return([]*rpc.TransactionSignature{}, nil) - - for idx := range len(slots) { - client.EXPECT().GetBlockWithOpts(mock.Anything, slots[idx], mock.Anything).Return(&rpc.GetBlockResult{ - Transactions: []rpc.TransactionWithMeta{ - { - Meta: &rpc.TransactionMeta{ - LogMessages: messages, + GetBlocks( + mock.Anything, + mock.MatchedBy(getBlocksStartValMatcher), + mock.MatchedBy(getBlocksEndValMatcher(&latest)), + rpc.CommitmentFinalized, + ). + RunAndReturn(getBlocksReturnFunc(true)) + + client.EXPECT(). + GetSignaturesForAddressWithOpts(mock.Anything, pubKey, mock.Anything). + RunAndReturn(func(_ context.Context, pk solana.PublicKey, opts *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error) { + ret := []*rpc.TransactionSignature{} + + if opts != nil && opts.Before.String() == (solana.Signature{}).String() { + for idx := range slots { + ret = append(ret, &rpc.TransactionSignature{Slot: slots[idx], Signature: sigs[idx*2]}) + ret = append(ret, &rpc.TransactionSignature{Slot: slots[idx], Signature: sigs[(idx*2)+1]}) + } + } + + return ret, nil + }) + + client.EXPECT(). + GetBlockWithOpts(mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(_ context.Context, slot uint64, _ *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { + idx := -1 + for sIdx, slt := range slots { + if slt == slot { + idx = sIdx + + break + } + } + + height := slot - 1 + + if idx == -1 { + return &rpc.GetBlockResult{ + Transactions: []rpc.TransactionWithMeta{}, + Signatures: []solana.Signature{}, + BlockHeight: &height, + }, nil + } + + return &rpc.GetBlockResult{ + Transactions: []rpc.TransactionWithMeta{ + { + Meta: &rpc.TransactionMeta{ + LogMessages: messages, + }, }, - }, - { - Meta: &rpc.TransactionMeta{ - LogMessages: messages, + { + Meta: &rpc.TransactionMeta{ + LogMessages: messages, + }, }, }, - }, - Signatures: []solana.Signature{sigs[idx*2], sigs[(idx*2)+1]}, - BlockHeight: &blockHeights[idx], - }, nil).Twice() - } + Signatures: []solana.Signature{sigs[idx*2], sigs[(idx*2)+1]}, + BlockHeight: &height, + }, nil + }) assert.NoError(t, collector.BackfillForAddress(ctx, pubKey.String(), 42)) tests.AssertEventually(t, func() bool { return parser.Count() == 6 }) - - client.AssertExpectations(t) } func BenchmarkEncodedLogCollector(b *testing.B) { @@ -347,12 +557,16 @@ func (p *testBlockProducer) GetTransaction(_ context.Context, sig solana.Signatu type testParser struct { called atomic.Bool - count atomic.Uint64 + mu sync.Mutex + events []logpoller.ProgramEvent } func (p *testParser) Process(event logpoller.ProgramEvent) error { p.called.Store(true) - p.count.Store(p.count.Load() + 1) + + p.mu.Lock() + p.events = append(p.events, event) + p.mu.Unlock() return nil } @@ -362,5 +576,59 @@ func (p *testParser) Called() bool { } func (p *testParser) Count() uint64 { - return p.count.Load() + p.mu.Lock() + defer p.mu.Unlock() + + return uint64(len(p.events)) +} + +func (p *testParser) Events() []logpoller.ProgramEvent { + p.mu.Lock() + defer p.mu.Unlock() + + return p.events +} + +func latestBlockhashReturnFunc(latest *atomic.Uint64) func(context.Context, rpc.CommitmentType) (*rpc.GetLatestBlockhashResult, error) { + return func(ctx context.Context, ct rpc.CommitmentType) (*rpc.GetLatestBlockhashResult, error) { + defer func() { + latest.Store(latest.Load() + 2) + }() + + return &rpc.GetLatestBlockhashResult{ + RPCContext: rpc.RPCContext{ + Context: rpc.Context{ + Slot: latest.Load(), + }, + }, + Value: &rpc.LatestBlockhashResult{ + LastValidBlockHeight: latest.Load() - 1, + }, + }, nil + } +} + +func getBlocksReturnFunc(empty bool) func(context.Context, uint64, *uint64, rpc.CommitmentType) (rpc.BlocksResult, error) { + return func(_ context.Context, u1 uint64, u2 *uint64, _ rpc.CommitmentType) (rpc.BlocksResult, error) { + blocks := []uint64{} + + if !empty { + blocks = make([]uint64, *u2-u1+1) + for idx := range blocks { + blocks[idx] = u1 + uint64(idx) + } + } + + return rpc.BlocksResult(blocks), nil + } +} + +func getBlocksStartValMatcher(val uint64) bool { + return val > uint64(0) +} + +func getBlocksEndValMatcher(latest *atomic.Uint64) func(*uint64) bool { + return func(val *uint64) bool { + return val != nil && *val <= latest.Load() + } } diff --git a/pkg/solana/logpoller/log_data_parser.go b/pkg/solana/logpoller/log_data_parser.go index 4cfd04470..4080a09e2 100644 --- a/pkg/solana/logpoller/log_data_parser.go +++ b/pkg/solana/logpoller/log_data_parser.go @@ -16,7 +16,8 @@ var ( ) type BlockData struct { - BlockNumber uint64 + SlotNumber uint64 + BlockHeight uint64 BlockHash solana.Hash TransactionHash solana.Signature TransactionIndex int