Skip to content

Commit

Permalink
Change stream pagination setting from 5000 to 3000 (provider started …
Browse files Browse the repository at this point in the history
…to return failure for this page size) (#1406)
  • Loading branch information
sergekh2 authored Nov 2, 2024
1 parent bcddc94 commit 73781ab
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 15 deletions.
59 changes: 45 additions & 14 deletions core/cmd/registry_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ import (

func srStreamDump(cfg *config.Config, countOnly bool) error {
ctx := context.Background() // lint:ignore context.Background() is fine here
blockchain, err := crypto.NewBlockchain(ctx, &cfg.RiverChain, nil, infra.NewMetricsFactory(nil, "river", "cmdline"), nil)
blockchain, err := crypto.NewBlockchain(
ctx,
&cfg.RiverChain,
nil,
infra.NewMetricsFactory(nil, "river", "cmdline"),
nil,
)
if err != nil {
return err
}
Expand All @@ -43,27 +49,28 @@ func srStreamDump(cfg *config.Config, countOnly bool) error {
return nil
}

streams, err := registryContract.GetAllStreams(ctx, blockchain.InitialBlockNum)
if err != nil {
return err
}

for i, strm := range streams {
i := 0
err = registryContract.ForAllStreams(ctx, blockchain.InitialBlockNum, func(strm *registries.GetStreamResult) bool {
s := fmt.Sprintf("%4d %s", i, strm.StreamId.String())
fmt.Printf("%-69s %4d, %s\n", s, strm.LastMiniblockNum, strm.LastMiniblockHash.Hex())
for _, node := range strm.Nodes {
fmt.Printf(" %s\n", node.Hex())
}
i++
return true
})
if err != nil {
return err
}

if streamNum != int64(len(streams)) {
if streamNum != int64(i) {
return RiverError(
Err_INTERNAL,
"Stream count mismatch",
"GetStreamCount",
streamNum,
"GetAllStreams",
len(streams),
"ForAllStreams",
i,
)
}

Expand All @@ -73,7 +80,13 @@ func srStreamDump(cfg *config.Config, countOnly bool) error {
func srStream(cfg *config.Config, streamId string) error {
ctx := context.Background() // lint:ignore context.Background() is fine here

blockchain, err := crypto.NewBlockchain(ctx, &cfg.RiverChain, nil, infra.NewMetricsFactory(nil, "river", "cmdline"), nil)
blockchain, err := crypto.NewBlockchain(
ctx,
&cfg.RiverChain,
nil,
infra.NewMetricsFactory(nil, "river", "cmdline"),
nil,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -107,7 +120,13 @@ func srStream(cfg *config.Config, streamId string) error {
func nodesDump(cfg *config.Config) error {
ctx := context.Background() // lint:ignore context.Background() is fine here

blockchain, err := crypto.NewBlockchain(ctx, &cfg.RiverChain, nil, infra.NewMetricsFactory(nil, "river", "cmdline"), nil)
blockchain, err := crypto.NewBlockchain(
ctx,
&cfg.RiverChain,
nil,
infra.NewMetricsFactory(nil, "river", "cmdline"),
nil,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -140,7 +159,13 @@ func nodesDump(cfg *config.Config) error {
func settingsDump(cfg *config.Config) error {
ctx := context.Background() // lint:ignore context.Background() is fine here

blockchain, err := crypto.NewBlockchain(ctx, &cfg.RiverChain, nil, infra.NewMetricsFactory(nil, "river", "cmdline"), nil)
blockchain, err := crypto.NewBlockchain(
ctx,
&cfg.RiverChain,
nil,
infra.NewMetricsFactory(nil, "river", "cmdline"),
nil,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -179,7 +204,13 @@ func settingsDump(cfg *config.Config) error {
func blockNumber(cfg *config.Config) error {
ctx := context.Background() // lint:ignore context.Background() is fine here

blockchain, err := crypto.NewBlockchain(ctx, &cfg.RiverChain, nil, infra.NewMetricsFactory(nil, "river", "cmdline"), nil)
blockchain, err := crypto.NewBlockchain(
ctx,
&cfg.RiverChain,
nil,
infra.NewMetricsFactory(nil, "river", "cmdline"),
nil,
)
if err != nil {
return err
}
Expand Down
7 changes: 6 additions & 1 deletion core/node/registries/river_registry_contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package registries
import (
"context"
"math/big"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
Expand Down Expand Up @@ -290,7 +291,11 @@ func (c *RiverRegistryContract) ForAllStreams(
cb func(*GetStreamResult) bool,
) error {
// TODO: setting
const pageSize = int64(5000)
const pageSize = int64(1000)

// Add a lengthier timeout for this call in excess of rpc node timeout
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()

lastPage := false
var err error
Expand Down

0 comments on commit 73781ab

Please sign in to comment.