Skip to content

Commit

Permalink
cli sub-tool to get a specific miniblock from a stream (#1878)
Browse files Browse the repository at this point in the history
  • Loading branch information
clemire authored Dec 20, 2024
1 parent 75837a7 commit b95da4e
Showing 1 changed file with 129 additions and 0 deletions.
129 changes: 129 additions & 0 deletions core/cmd/stream_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"net/http"
"strconv"
Expand Down Expand Up @@ -126,6 +127,124 @@ func runStreamGetEventCmd(cmd *cobra.Command, args []string) error {
return nil
}

func runStreamGetMiniblockCmd(cmd *cobra.Command, args []string) error {
ctx := context.Background() // lint:ignore context.Background() is fine here
streamID, err := shared.StreamIdFromString(args[0])
if err != nil {
return err
}
miniblockHash := common.HexToHash(args[1])
blockchain, err := crypto.NewBlockchain(
ctx,
&cmdConfig.RiverChain,
nil,
infra.NewMetricsFactory(nil, "river", "cmdline"),
nil,
)
if err != nil {
return err
}

registryContract, err := registries.NewRiverRegistryContract(
ctx,
blockchain,
&cmdConfig.RegistryContract,
&cmdConfig.RiverRegistry,
)
if err != nil {
return err
}

stream, err := registryContract.StreamRegistry.GetStream(nil, streamID)
if err != nil {
return err
}

nodes := nodes.NewStreamNodesWithLock(stream.Nodes, common.Address{})
remoteNodeAddress := nodes.GetStickyPeer()

remote, err := registryContract.NodeRegistry.GetNode(nil, remoteNodeAddress)
if err != nil {
return err
}

remoteClient := protocolconnect.NewStreamServiceClient(http.DefaultClient, remote.Url)

response, err := remoteClient.GetStream(ctx, connect.NewRequest(&protocol.GetStreamRequest{
StreamId: streamID[:],
Optional: false,
}))
if err != nil {
return err
}

streamAndCookie := response.Msg.GetStream()

to := streamAndCookie.GetNextSyncCookie().GetMinipoolGen()
blockRange := int64(100)
if len(args) == 3 {
blockRange, err = strconv.ParseInt(args[2], 10, 64)
if err != nil {
return err
}
}
from := max(to-blockRange, 0)

miniblocks, err := remoteClient.GetMiniblocks(ctx, connect.NewRequest(&protocol.GetMiniblocksRequest{
StreamId: streamID[:],
FromInclusive: from,
ToExclusive: to,
}))
if err != nil {
return err
}

for n, miniblock := range miniblocks.Msg.GetMiniblocks() {
// Parse header
info, err := events.NewMiniblockInfoFromProto(
miniblock,
events.NewMiniblockInfoFromProtoOpts{
ExpectedBlockNumber: from + int64(n),
},
)
if err != nil {
return err
}

if info.HeaderEvent().Hash == miniblockHash {
mbHeader, ok := info.HeaderEvent().Event.Payload.(*protocol.StreamEvent_MiniblockHeader)
if !ok {
return fmt.Errorf("unable to parse header event as miniblock header")
}

if len(mbHeader.MiniblockHeader.EventHashes) != len(miniblock.Events) {
return fmt.Errorf("malformatted miniblock: header event count and miniblock event count do not match")
}

for i, hash := range mbHeader.MiniblockHeader.EventHashes {
if !bytes.Equal(miniblock.Events[i].Hash, hash) {
return fmt.Errorf(
"event %d hashes do not match: %v v %v in the header",
i,
hex.EncodeToString(miniblock.Events[i].Hash),
hex.EncodeToString(hash),
)
}
}

fmt.Printf("\nMiniblock\n=========\n%s\n", protojson.Format(miniblock))

fmt.Printf("\nHeader\n======\n%s\n", protojson.Format(mbHeader.MiniblockHeader))

return nil
}
}

fmt.Printf("Miniblock hash %s not found in stream %s (block range [%d...%d])\n", miniblockHash, streamID, from, to)

return nil
}

func init() {
cmdStream := &cobra.Command{
Use: "stream",
Expand All @@ -141,6 +260,16 @@ max-block-range is optional and limits the number of blocks to consider (default
RunE: runStreamGetEventCmd,
}

cmdStreamGetMiniblock := &cobra.Command{
Use: "miniblock",
Short: "Get Miniblock <stream-id> <block-hash> [max-block-range]",
Long: `Dump miniblock content to stdout.
max-block-range is optional and limits the number of blocks to consider (default=100)`,
Args: cobra.RangeArgs(2, 3),
RunE: runStreamGetMiniblockCmd,
}

cmdStream.AddCommand(cmdStreamGetMiniblock)
cmdStream.AddCommand(cmdStreamGetEvent)
rootCmd.AddCommand(cmdStream)
}

0 comments on commit b95da4e

Please sign in to comment.