Skip to content

Commit

Permalink
bump substreams, prepare release v0.1.3
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Jul 24, 2023
1 parent 4e3854c commit 9ac4639
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 26 deletions.
20 changes: 11 additions & 9 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,22 @@ Operators, you should copy/paste content of this content straight to your `fireh

If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you should copy the content between those 2 version to your own repository.

## Next
## v0.1.3

This release bumps substreams to v1.1.9

### Highlights

#### Substreams Scheduler Improvements for Parallel Processing

The `substreams` scheduler has been improved to reduce the number of required jobs for parallel processing. This affects `backprocessing` (preparing the states of modules up to a "start-block") and `forward processing` (preparing the states and the outputs to speed up streaming in production-mode).

Jobs on `tier2` workers are now divided in "stages", each stage generating the partial states for all the modules that have the same dependencies. A `substreams` that has a single store won't be affected, but one that has 3 top-level stores, which used to run 3 jobs for every segment now only runs a single job per segment to get all the states ready.


#### Substreams State Store Selection

The `substreams` server now accepts `X-Sf-Substreams-Cache-Tag` header to select which Substreams state store URL should be used by the request. When performing a Substreams request, the servers will pick the state store based on the header. This enable consumers to stay on the same cache version when the operators needs to bump the data version (reasons for this could be a bug in Substreams software that caused some cached data to be corrupted on invalid).
The `substreams` server now accepts `X-Sf-Substreams-Cache-Tag` header to select which Substreams state store URL should be used by the request. When performing a Substreams request, the servers will optionally pick the state store based on the header. This enable consumers to stay on the same cache version when the operators needs to bump the data version (reasons for this could be a bug in Substreams software that caused some cached data to be corrupted on invalid).

To benefit from this, operators that have a version currently in their state store URL should move the version part from `--substreams-state-store-url` to the new flag `--substreams-state-store-default-tag`. For example if today you have in your config:

Expand All @@ -36,17 +44,11 @@ start:
substreams-state-store-default-tag: v3
```
#### Substreams Scheduler Improvements for Parallel Processing
The `substreams` scheduler has been improved to reduce the number of required jobs for parallel processing. This affects `backprocessing` (preparing the states of modules up to a "start-block") and `forward processing` (preparing the states and the outputs to speed up streaming in production-mode).

Jobs on `tier2` workers are now divided in "stages", each stage generating the partial states for all the modules that have the same dependencies. A `substreams` that has a single store won't be affected, but one that has 3 top-level stores, which used to run 3 jobs for every segment now only runs a single job per segment to get all the states ready.

### Operators Upgrade
The app `substreams-tier1` and `substreams-tier2` should be upgraded concurrently. Some calls will fail while versions are misaligned.

### CLI Changes
### Backend Changes

* Authentication plugin `trust` can now specify an exclusive list of `allowed` headers (all lowercase), ex: `trust://?allowed=x-sf-user-id,x-sf-api-key-id,x-real-ip,x-sf-substreams-cache-tag`

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
github.com/streamingfast/pbgo v0.0.6-0.20221020131607-255008258d28
github.com/streamingfast/relayer v0.0.2-0.20220909122435-e67fbc964fd9
github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0
github.com/streamingfast/substreams v1.1.9-0.20230720151436-6d47b7b88fc4
github.com/streamingfast/substreams v1.1.9
github.com/stretchr/testify v1.8.3
go.uber.org/multierr v1.9.0
go.uber.org/zap v1.24.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -616,8 +616,8 @@ github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAt
github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8=
github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 h1:Y15G1Z4fpEdm2b+/70owI7TLuXadlqBtGM7rk4Hxrzk=
github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0/go.mod h1:/Rnz2TJvaShjUct0scZ9kKV2Jr9/+KBAoWy4UMYxgv4=
github.com/streamingfast/substreams v1.1.9-0.20230720151436-6d47b7b88fc4 h1:rywcKNxH1bR79/uYnhGq2kvodCsZMU1aTPC6oz+A+40=
github.com/streamingfast/substreams v1.1.9-0.20230720151436-6d47b7b88fc4/go.mod h1:U/wDfXapixXmpnBwzQRMGBXhXJGaLZe6XbFhyh5dF18=
github.com/streamingfast/substreams v1.1.9 h1:477zJWpvADeZL8s9gUuB830Di3mzx8b24AjN+o8Nrpk=
github.com/streamingfast/substreams v1.1.9/go.mod h1:U/wDfXapixXmpnBwzQRMGBXhXJGaLZe6XbFhyh5dF18=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
Expand Down
1 change: 1 addition & 0 deletions substreams_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func registerCommonSubstreamsFlags(cmd *cobra.Command) {
registerSSOnce.Do(func() {
cmd.Flags().Uint64("substreams-state-bundle-size", uint64(1_000), "Interval in blocks at which to save store snapshots and output caches")
cmd.Flags().String("substreams-state-store-url", "{sf-data-dir}/localdata", "where substreams state data are stored")
cmd.Flags().String("substreams-state-store-default-tag", "", "If non-empty, will be appended to {substreams-state-store-url} (ex: 'v1'). Can be overriden per-request with 'X-Sf-Substreams-Cache-Tag' header")
cmd.Flags().StringArray("substreams-rpc-endpoints", nil, "Remote endpoints to contact to satisfy Substreams 'eth_call's")
cmd.Flags().String("substreams-rpc-cache-store-url", "{sf-data-dir}/rpc-cache", "where rpc cache will be store call responses")
cmd.Flags().Uint64("substreams-rpc-cache-chunk-size", uint64(1_000), "RPC cache chunk size in block")
Expand Down
5 changes: 4 additions & 1 deletion substreams_tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func registerSubstreamsTier1App[B Block](chain *Chain[B]) {
grpcListenAddr := viper.GetString("substreams-tier1-grpc-listen-addr")

stateStoreURL := MustReplaceDataDir(sfDataDir, viper.GetString("substreams-state-store-url"))
stateStoreDefaultTag := viper.GetString("substreams-state-store-default-tag")

stateBundleSize := viper.GetUint64("substreams-state-bundle-size")

subrequestsEndpoint := viper.GetString("substreams-tier1-subrequests-endpoint")
Expand Down Expand Up @@ -112,6 +114,7 @@ func registerSubstreamsTier1App[B Block](chain *Chain[B]) {
BlockStreamAddr: blockstreamAddr,

StateStoreURL: stateStoreURL,
StateStoreDefaultTag: stateStoreDefaultTag,
StateBundleSize: stateBundleSize,
BlockType: getSubstreamsBlockMessageType(chain),
MaxSubrequests: maxSubrequests,
Expand All @@ -129,7 +132,7 @@ func registerSubstreamsTier1App[B Block](chain *Chain[B]) {
GRPCListenAddr: grpcListenAddr,
GRPCShutdownGracePeriod: time.Second,
ServiceDiscoveryURL: serviceDiscoveryURL,
}, &app.Modules{
}, &app.Tier1Modules{
Authenticator: authenticator,
HeadTimeDriftMetric: ss1HeadTimeDriftmetric,
HeadBlockNumberMetric: ss1HeadBlockNumMetric,
Expand Down
19 changes: 6 additions & 13 deletions substreams_tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/streamingfast/dauth"
discoveryservice "github.com/streamingfast/dgrpc/server/discovery-service"
"github.com/streamingfast/dlauncher/launcher"
"github.com/streamingfast/logging"
Expand Down Expand Up @@ -49,11 +48,6 @@ func registerSubstreamsTier2App[B Block](chain *Chain[B]) {
},

FactoryFunc: func(runtime *launcher.Runtime) (launcher.App, error) {
authenticator, err := dauth.New(viper.GetString("common-auth-plugin"))
if err != nil {
return nil, fmt.Errorf("unable to initialize dauth: %w", err)
}

mergedBlocksStoreURL, _, _, err := GetCommonStoresURLs(runtime.AbsDataDir)
if err != nil {
return nil, err
Expand All @@ -66,6 +60,8 @@ func registerSubstreamsTier2App[B Block](chain *Chain[B]) {
substreamsRequestsStats := viper.GetBool("substreams-tier2-request-stats")

stateStoreURL := MustReplaceDataDir(sfDataDir, viper.GetString("substreams-state-store-url"))
stateStoreDefaultTag := viper.GetString("substreams-state-store-default-tag")

stateBundleSize := viper.GetUint64("substreams-state-bundle-size")

tracing := os.Getenv("SUBSTREAMS_TRACING") == "modules_exec"
Expand All @@ -91,9 +87,10 @@ func registerSubstreamsTier2App[B Block](chain *Chain[B]) {
&app.Tier2Config{
MergedBlocksStoreURL: mergedBlocksStoreURL,

StateStoreURL: stateStoreURL,
StateBundleSize: stateBundleSize,
BlockType: getSubstreamsBlockMessageType(chain),
StateStoreURL: stateStoreURL,
StateStoreDefaultTag: stateStoreDefaultTag,
StateBundleSize: stateBundleSize,
BlockType: getSubstreamsBlockMessageType(chain),

WASMExtensions: wasmExtensions,
PipelineOptions: pipelineOptioner,
Expand All @@ -103,10 +100,6 @@ func registerSubstreamsTier2App[B Block](chain *Chain[B]) {

GRPCListenAddr: grpcListenAddr,
ServiceDiscoveryURL: serviceDiscoveryURL,
}, &app.Modules{
Authenticator: authenticator,
HeadTimeDriftMetric: ss2HeadTimeDriftmetric,
HeadBlockNumberMetric: ss2HeadBlockNumMetric,
}), nil
},
})
Expand Down

0 comments on commit 9ac4639

Please sign in to comment.