Skip to content

Commit

Permalink
stream blocks (#176)
Browse files Browse the repository at this point in the history
* stream blocks

* generated code

* leave old as is

* return BlockResponse

* gen code

* minor

* implementation

* fix

* contains accounts

* impl

* remove redundant loop
  • Loading branch information
anjor authored Nov 7, 2024
1 parent b569841 commit 6227dd8
Show file tree
Hide file tree
Showing 5 changed files with 477 additions and 389 deletions.
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,13 @@ gen-old-faithful-proto: install-protoc
--go-grpc_out=paths=source_relative:$$(pwd)/old-faithful-proto/old-faithful-grpc \
-I=$$(pwd)/old-faithful-proto/proto/ \
$$(pwd)/old-faithful-proto/proto/old-faithful.proto
gen-old-faithful-proto-system:
@mkdir -p $$(pwd)/old-faithful-proto/old-faithful-grpc
@echo "Generating golang protobuf for old-faithful..."
# the proto file is in old-faithful-proto/proto ; generate go code in old-faithful-proto/faithful-grpc
protoc \
--experimental_allow_proto3_optional \
--go_out=paths=source_relative:$$(pwd)/old-faithful-proto/old-faithful-grpc \
--go-grpc_out=paths=source_relative:$$(pwd)/old-faithful-proto/old-faithful-grpc \
-I=$$(pwd)/old-faithful-proto/proto/ \
$$(pwd)/old-faithful-proto/proto/old-faithful.proto
76 changes: 76 additions & 0 deletions grpc-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/rpcpool/yellowstone-faithful/compactindexsized"
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
"github.com/rpcpool/yellowstone-faithful/iplddecoders"
old_faithful_grpc "github.com/rpcpool/yellowstone-faithful/old-faithful-proto/old-faithful-grpc"
"github.com/rpcpool/yellowstone-faithful/tooling"
"golang.org/x/sync/errgroup"
Expand All @@ -29,6 +30,8 @@ import (
"k8s.io/klog/v2"
)

const maxSlotsToStream uint64 = 100

// ListeAndServe starts listening on the configured address and serves the RPC API.
func (me *MultiEpoch) ListenAndServeGRPC(ctx context.Context, listenOn string) error {
lis, err := net.Listen("tcp", listenOn)
Expand Down Expand Up @@ -622,3 +625,76 @@ func (multi *MultiEpoch) GetBlockTime(ctx context.Context, params *old_faithful_
BlockTime: int64(blocktime),
}, nil
}

func (multi *MultiEpoch) StreamBlocks(params *old_faithful_grpc.StreamBlocksRequest, ser old_faithful_grpc.OldFaithful_StreamBlocksServer) error {
ctx := ser.Context()

startSlot := params.StartSlot
endSlot := startSlot + maxSlotsToStream

if params.EndSlot != nil {
endSlot = *params.EndSlot
}

filterFunc := func(block *old_faithful_grpc.BlockResponse) bool {
if params.Filter == nil || len(params.Filter.AccountInclude) == 0 {
return true
}

return blockContainsAccounts(block, params.Filter.AccountInclude)
}

for slot := startSlot; slot <= endSlot; slot++ {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

block, err := multi.GetBlock(ctx, &old_faithful_grpc.BlockRequest{Slot: slot})
if err != nil {
if status.Code(err) == codes.NotFound {
continue // is this the right thing to do?
}
return err
}

if filterFunc(block) {
if err := ser.Send(block); err != nil {
return err
}
}
}

return nil
}

func blockContainsAccounts(block *old_faithful_grpc.BlockResponse, accounts []string) bool {
accountSet := make(map[string]struct{}, len(accounts))
for _, acc := range accounts {
accountSet[acc] = struct{}{}
}

for _, tx := range block.Transactions {
decoded, err := iplddecoders.DecodeTransaction(tx.Transaction)
if err != nil {
klog.Warningf("Failed to decode transaction: %w", err)
continue // skip if there's error decoding
}
solTx, err := decoded.GetSolanaTransaction()
if err != nil {
klog.Warningf("Failed to get sol transaction: %w", err)
continue
}

for _, acc := range solTx.Message.AccountKeys {
if _, exists := accountSet[acc.String()]; exists {
return true
}
}

}

return false

}
Loading

0 comments on commit 6227dd8

Please sign in to comment.