Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Create draft ws brocker for new PUB/SUB (WIP/PoC) #6606

Draft
wants to merge 23 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
df6d9b5
Added implementation of interface and factory sceleton
Guitarheroua Oct 24, 2024
9258118
Create subscription handler factory struct
Guitarheroua Oct 24, 2024
f8ce7c1
added factory creation
Guitarheroua Oct 24, 2024
7dfe56f
moved subscription handler to routes
Guitarheroua Oct 24, 2024
6fc0c29
Added WSBrokerHandler
UlyanaAndrukhiv Oct 24, 2024
bc836f4
Merge branch 'AndriiSlisarchuk/6584-ws-subscription-handler' of githu…
UlyanaAndrukhiv Oct 24, 2024
f14769e
Added pub/sub route to server
UlyanaAndrukhiv Oct 24, 2024
4d98253
Fixed imports
UlyanaAndrukhiv Oct 24, 2024
dd88958
Added WebSocketBroker component and basic subscription methods
Guitarheroua Oct 25, 2024
c030fb0
created broker in brocker handler
Guitarheroua Oct 25, 2024
bfe710d
Added base ws brocker implementation
UlyanaAndrukhiv Oct 28, 2024
4aa0e59
Added implementations and responces for actions
Guitarheroua Oct 28, 2024
2588661
Pass broadcast message
Guitarheroua Oct 28, 2024
fb6fceb
Added BaseMessageResponse
Guitarheroua Oct 28, 2024
e878c08
Updated read and write messages, upgraded error handling
UlyanaAndrukhiv Oct 29, 2024
b2781d2
Updated error message and convertion to ws errors
UlyanaAndrukhiv Oct 29, 2024
855146b
Updated ws handler
UlyanaAndrukhiv Oct 29, 2024
218c6ed
Updated error handling
UlyanaAndrukhiv Oct 29, 2024
8562e89
Added WebsocketConfig and new flags to configure websocket subscriptions
UlyanaAndrukhiv Oct 29, 2024
d111330
Updated error handling, added comments
UlyanaAndrukhiv Oct 30, 2024
51babfb
Updated comment
UlyanaAndrukhiv Oct 30, 2024
3bec344
Separated websocket models
UlyanaAndrukhiv Oct 30, 2024
173f141
Removed basic counter for responses per second
UlyanaAndrukhiv Nov 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ type AccessNodeConfig struct {
apiBurstlimits map[string]int
rpcConf rpc.Config
stateStreamConf statestreambackend.Config
wsConfig routes.WebsocketConfig
stateStreamFilterConf map[string]int
ExecutionNodeAddress string // deprecated
HistoricalAccessRPCs []access.AccessAPIClient
Expand Down Expand Up @@ -238,7 +239,12 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
ResponseLimit: subscription.DefaultResponseLimit,
HeartbeatInterval: subscription.DefaultHeartbeatInterval,
},
stateStreamFilterConf: nil,
stateStreamFilterConf: nil,
wsConfig: routes.WebsocketConfig{
MaxSubscriptionsPerConnection: routes.DefaultMaxSubscriptionsPerConnection,
MaxResponsesPerSecond: routes.DefaultMaxResponsesPerSecond,
SendMessageTimeout: routes.DefaultSendMessageTimeout,
},
ExecutionNodeAddress: "localhost:9000",
logTxTimeToFinalized: false,
logTxTimeToExecuted: false,
Expand Down Expand Up @@ -1436,6 +1442,23 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
"registerdb-pruning-threshold",
defaultConfig.registerDBPruneThreshold,
fmt.Sprintf("specifies the number of blocks below the latest stored block height to keep in register db. default: %d", defaultConfig.registerDBPruneThreshold))

// Websocket subscriptions
flags.Uint64Var(&builder.wsConfig.MaxSubscriptionsPerConnection,
"websocket-max-subscriptions-per-connection",
defaultConfig.wsConfig.MaxSubscriptionsPerConnection,
fmt.Sprintf("maximum number of subscriptions per connection for websocket subscriptions. Default: %d", builder.wsConfig.MaxSubscriptionsPerConnection))

flags.Uint64Var(&builder.wsConfig.MaxResponsesPerSecond,
"websocket-max-responses-per-second",
defaultConfig.wsConfig.MaxResponsesPerSecond,
fmt.Sprintf("maximum number of responses per second for websocket subscriptions. Default: %d", builder.wsConfig.MaxResponsesPerSecond))

flags.DurationVar(&builder.wsConfig.SendMessageTimeout,
"websocket-send-message-timeout",
defaultConfig.wsConfig.SendMessageTimeout,
fmt.Sprintf("timeout value for send messages for websocket subscriptions. Default: %v", defaultConfig.wsConfig.SendMessageTimeout))

}).ValidateFlags(func() error {
if builder.supportsObserver && (builder.PublicNetworkConfig.BindAddress == cmd.NotSet || builder.PublicNetworkConfig.BindAddress == "") {
return errors.New("public-network-address must be set if supports-observer is true")
Expand Down Expand Up @@ -2015,6 +2038,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.unsecureGrpcServer,
builder.stateStreamBackend,
builder.stateStreamConf,
builder.wsConfig,
indexReporter,
)
if err != nil {
Expand Down
26 changes: 25 additions & 1 deletion cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ type ObserverServiceConfig struct {
checkpointFile string
apiTimeout time.Duration
stateStreamConf statestreambackend.Config
wsConfig routes.WebsocketConfig
stateStreamFilterConf map[string]int
upstreamNodeAddresses []string
upstreamNodePublicKeys []string
Expand Down Expand Up @@ -208,7 +209,12 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig {
HeartbeatInterval: subscription.DefaultHeartbeatInterval,
RegisterIDsRequestLimit: state_stream.DefaultRegisterIDsRequestLimit,
},
stateStreamFilterConf: nil,
stateStreamFilterConf: nil,
wsConfig: routes.WebsocketConfig{
MaxSubscriptionsPerConnection: routes.DefaultMaxSubscriptionsPerConnection,
MaxResponsesPerSecond: routes.DefaultMaxResponsesPerSecond,
SendMessageTimeout: routes.DefaultSendMessageTimeout,
},
rpcMetricsEnabled: false,
apiRatelimits: nil,
apiBurstlimits: nil,
Expand Down Expand Up @@ -798,6 +804,23 @@ func (builder *ObserverServiceBuilder) extraFlags() {
"registerdb-pruning-threshold",
defaultConfig.registerDBPruneThreshold,
fmt.Sprintf("specifies the number of blocks below the latest stored block height to keep in register db. default: %d", defaultConfig.registerDBPruneThreshold))

// Websocket subscriptions
flags.Uint64Var(&builder.wsConfig.MaxSubscriptionsPerConnection,
"websocket-max-subscriptions-per-connection",
defaultConfig.wsConfig.MaxSubscriptionsPerConnection,
fmt.Sprintf("maximum number of subscriptions per connection for websocket subscriptions. Default: %d", builder.wsConfig.MaxSubscriptionsPerConnection))

flags.Uint64Var(&builder.wsConfig.MaxResponsesPerSecond,
"websocket-max-responses-per-second",
defaultConfig.wsConfig.MaxResponsesPerSecond,
fmt.Sprintf("maximum number of responses per second for websocket subscriptions. Default: %d", builder.wsConfig.MaxResponsesPerSecond))

flags.DurationVar(&builder.wsConfig.SendMessageTimeout,
"websocket-send-message-timeout",
defaultConfig.wsConfig.SendMessageTimeout,
fmt.Sprintf("timeout value for send messages for websocket subscriptions. Default: %v", defaultConfig.wsConfig.SendMessageTimeout))

}).ValidateFlags(func() error {
if builder.executionDataSyncEnabled {
if builder.executionDataConfig.FetchTimeout <= 0 {
Expand Down Expand Up @@ -1931,6 +1954,7 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
builder.unsecureGrpcServer,
builder.stateStreamBackend,
builder.stateStreamConf,
builder.wsConfig,
indexReporter,
)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions cmd/util/cmd/run-script/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/onflow/flow-go/cmd/util/ledger/util"
"github.com/onflow/flow-go/cmd/util/ledger/util/registers"
"github.com/onflow/flow-go/engine/access/rest"
"github.com/onflow/flow-go/engine/access/rest/routes"
"github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/engine/execution/computation"
Expand Down Expand Up @@ -169,6 +170,7 @@ func run(*cobra.Command, []string) {
metrics.NewNoopCollector(),
nil,
backend.Config{},
routes.WebsocketConfig{},
)
if err != nil {
log.Fatal().Err(err).Msg("failed to create server")
Expand Down
1 change: 1 addition & 0 deletions engine/access/handle_irrecoverable_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func (suite *IrrecoverableStateTestSuite) SetupTest() {
suite.unsecureGrpcServer,
nil,
stateStreamConfig,
routes.WebsocketConfig{},
nil,
)
assert.NoError(suite.T(), err)
Expand Down
2 changes: 2 additions & 0 deletions engine/access/integration_unsecure_grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/access/index"
accessmock "github.com/onflow/flow-go/engine/access/mock"
"github.com/onflow/flow-go/engine/access/rest/routes"
"github.com/onflow/flow-go/engine/access/rpc"
"github.com/onflow/flow-go/engine/access/rpc/backend"
"github.com/onflow/flow-go/engine/access/state_stream"
Expand Down Expand Up @@ -215,6 +216,7 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
suite.unsecureGrpcServer,
nil,
stateStreamConfig,
routes.WebsocketConfig{},
nil,
)
assert.NoError(suite.T(), err)
Expand Down
16 changes: 16 additions & 0 deletions engine/access/rest/routes/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/onflow/flow-go/access"
"github.com/onflow/flow-go/engine/access/rest/middleware"
"github.com/onflow/flow-go/engine/access/rest/models"
"github.com/onflow/flow-go/engine/access/rest/routes/subscription_handlers"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/model/flow"
Expand Down Expand Up @@ -78,6 +79,21 @@ func (b *RouterBuilder) AddWsRoutes(
return b
}

// AddPubSubRoute adds WebSocket route for the pub/sub mechanism to the router.
func (b *RouterBuilder) AddPubSubRoute(
chain flow.Chain,
wsConfig WebsocketConfig,
subHandlerFactory *subscription_handlers.SubscriptionHandlerFactory,
) *RouterBuilder {
b.v1SubRouter.
Methods(http.MethodGet).
Path("/ws").
Name("ws").
Handler(NewWSBrokerHandler(b.logger, wsConfig, chain, subHandlerFactory))

return b
}

func (b *RouterBuilder) Build() *mux.Router {
return b.router
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package subscription_handlers

import (
"fmt"

"github.com/onflow/flow-go/access"
"github.com/onflow/flow-go/engine/access/state_stream"
)

const (
EventsTopic = "events"
AccountStatusesTopic = "account_statuses"
BlocksTopic = "blocks"
BlockHeadersTopic = "block_headers"
BlockDigestsTopic = "block_digests"
TransactionStatusesTopic = "transaction_statuses"
)

type SubscriptionHandler interface {
ID() string
Topic() string
Close() error
}

type SubscriptionHandlerFactory struct {
eventFilterConfig state_stream.EventFilterConfig

stateStreamApi state_stream.API
accessApi access.API
}

func NewSubscriptionHandlerFactory(eventFilterConfig state_stream.EventFilterConfig, stateStreamApi state_stream.API, accessApi access.API) *SubscriptionHandlerFactory {
return &SubscriptionHandlerFactory{
eventFilterConfig: eventFilterConfig,
stateStreamApi: stateStreamApi,
accessApi: accessApi,
}
}

func (s *SubscriptionHandlerFactory) CreateSubscriptionHandler(topic string, arguments map[string]interface{}, broadcastMessage func(interface{})) (SubscriptionHandler, error) {
switch topic {
// TODO: Implemented handlers for each topic should be added in respective case
case EventsTopic,
AccountStatusesTopic,
BlocksTopic,
BlockHeadersTopic,
BlockDigestsTopic,
TransactionStatusesTopic:
return nil, fmt.Errorf("topic \"%s\" not implemented yet", topic)
default:
return nil, fmt.Errorf("unsupported topic \"%s\"", topic)
}
}
Loading
Loading