From df6d9b578a511e356080104f16717accf7ad132d Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Thu, 24 Oct 2024 15:04:34 +0300 Subject: [PATCH 01/22] Added implementation of interface and factory sceleton --- .../subscription_handler.go | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 engine/access/rest/ws_pub_sub/subscription_handlers/subscription_handler.go diff --git a/engine/access/rest/ws_pub_sub/subscription_handlers/subscription_handler.go b/engine/access/rest/ws_pub_sub/subscription_handlers/subscription_handler.go new file mode 100644 index 00000000000..b9e7b958c71 --- /dev/null +++ b/engine/access/rest/ws_pub_sub/subscription_handlers/subscription_handler.go @@ -0,0 +1,33 @@ +package subscription_handlers + +import ( + "fmt" +) + +const ( + EventsTopic = "events" + AccountStatusesTopic = "account_statuses" + BlocksTopic = "blocks" + BlockHeadersTopic = "block_headers" + BlockDigestsTopic = "block_digests" + TransactionStatusesTopic = "transaction_statuses" +) + +type SubscriptionHandler interface { + Close() error +} + +func CreateSubscriptionHandler(topic string, arguments map[string]interface{}, broadcastMessage func([]byte) error) (SubscriptionHandler, error) { + switch topic { + // TODO: Implemented handlers for each topic should be added in respective case + case EventsTopic, + AccountStatusesTopic, + BlocksTopic, + BlockHeadersTopic, + BlockDigestsTopic, + TransactionStatusesTopic: + return nil, fmt.Errorf("topic \"%s\" not implemented yet", topic) + default: + return nil, fmt.Errorf("unsupported topic \"%s\"", topic) + } +} From 9258118bc1771e66c77e888a68b7ce6098a54509 Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Thu, 24 Oct 2024 15:26:35 +0300 Subject: [PATCH 02/22] Create subscription handler factory struct --- .../subscription_handler.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/engine/access/rest/ws_pub_sub/subscription_handlers/subscription_handler.go b/engine/access/rest/ws_pub_sub/subscription_handlers/subscription_handler.go index b9e7b958c71..2b7e49190d0 100644 --- a/engine/access/rest/ws_pub_sub/subscription_handlers/subscription_handler.go +++ b/engine/access/rest/ws_pub_sub/subscription_handlers/subscription_handler.go @@ -2,6 +2,9 @@ package subscription_handlers import ( "fmt" + + "github.com/onflow/flow-go/access" + "github.com/onflow/flow-go/engine/access/state_stream" ) const ( @@ -17,7 +20,19 @@ type SubscriptionHandler interface { Close() error } -func CreateSubscriptionHandler(topic string, arguments map[string]interface{}, broadcastMessage func([]byte) error) (SubscriptionHandler, error) { +type SubscriptionHandlerFactory struct { + stateStreamApi state_stream.API + accessApi access.API +} + +func NewSubscriptionHandlerFactory(stateStreamApi state_stream.API, accessApi access.API) *SubscriptionHandlerFactory { + return &SubscriptionHandlerFactory{ + stateStreamApi: stateStreamApi, + accessApi: accessApi, + } +} + +func (s *SubscriptionHandlerFactory) CreateSubscriptionHandler(topic string, arguments map[string]interface{}, broadcastMessage func([]byte) error) (SubscriptionHandler, error) { switch topic { // TODO: Implemented handlers for each topic should be added in respective case case EventsTopic, From f8ce7c18afa47180be2c28747297c2d25e224f8f Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Thu, 24 Oct 2024 17:40:20 +0300 Subject: [PATCH 03/22] added factory creation --- engine/access/rest/server.go | 2 ++ .../subscription_handlers/subscription_handler.go | 9 ++++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/engine/access/rest/server.go b/engine/access/rest/server.go index 0d05fcd67cf..babac244eb4 100644 --- a/engine/access/rest/server.go +++ b/engine/access/rest/server.go @@ -9,6 +9,7 @@ import ( "github.com/onflow/flow-go/access" "github.com/onflow/flow-go/engine/access/rest/routes" + "github.com/onflow/flow-go/engine/access/rest/ws_pub_sub/subscription_handlers" "github.com/onflow/flow-go/engine/access/state_stream" "github.com/onflow/flow-go/engine/access/state_stream/backend" "github.com/onflow/flow-go/model/flow" @@ -43,6 +44,7 @@ func NewServer(serverAPI access.API, stateStreamConfig backend.Config, ) (*http.Server, error) { builder := routes.NewRouterBuilder(logger, restCollector).AddRestRoutes(serverAPI, chain) + _ = subscription_handlers.NewSubscriptionHandlerFactory(stateStreamConfig.EventFilterConfig, stateStreamApi, serverAPI) if stateStreamApi != nil { builder.AddWsRoutes(stateStreamApi, chain, stateStreamConfig) } diff --git a/engine/access/rest/ws_pub_sub/subscription_handlers/subscription_handler.go b/engine/access/rest/ws_pub_sub/subscription_handlers/subscription_handler.go index 2b7e49190d0..5a36438e447 100644 --- a/engine/access/rest/ws_pub_sub/subscription_handlers/subscription_handler.go +++ b/engine/access/rest/ws_pub_sub/subscription_handlers/subscription_handler.go @@ -21,14 +21,17 @@ type SubscriptionHandler interface { } type SubscriptionHandlerFactory struct { + eventFilterConfig state_stream.EventFilterConfig + stateStreamApi state_stream.API accessApi access.API } -func NewSubscriptionHandlerFactory(stateStreamApi state_stream.API, accessApi access.API) *SubscriptionHandlerFactory { +func NewSubscriptionHandlerFactory(eventFilterConfig state_stream.EventFilterConfig, stateStreamApi state_stream.API, accessApi access.API) *SubscriptionHandlerFactory { return &SubscriptionHandlerFactory{ - stateStreamApi: stateStreamApi, - accessApi: accessApi, + eventFilterConfig: eventFilterConfig, + stateStreamApi: stateStreamApi, + accessApi: accessApi, } } From 7dfe56f8102b36979c2a109f8f5f19852dda1e19 Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Thu, 24 Oct 2024 18:22:17 +0300 Subject: [PATCH 04/22] moved subscription handler to routes --- .../subscription_handlers/subscription_handler.go | 0 engine/access/rest/server.go | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename engine/access/rest/{ws_pub_sub => routes}/subscription_handlers/subscription_handler.go (100%) diff --git a/engine/access/rest/ws_pub_sub/subscription_handlers/subscription_handler.go b/engine/access/rest/routes/subscription_handlers/subscription_handler.go similarity index 100% rename from engine/access/rest/ws_pub_sub/subscription_handlers/subscription_handler.go rename to engine/access/rest/routes/subscription_handlers/subscription_handler.go diff --git a/engine/access/rest/server.go b/engine/access/rest/server.go index babac244eb4..82440652f9d 100644 --- a/engine/access/rest/server.go +++ b/engine/access/rest/server.go @@ -9,7 +9,7 @@ import ( "github.com/onflow/flow-go/access" "github.com/onflow/flow-go/engine/access/rest/routes" - "github.com/onflow/flow-go/engine/access/rest/ws_pub_sub/subscription_handlers" + "github.com/onflow/flow-go/engine/access/rest/routes/subscription_handlers" "github.com/onflow/flow-go/engine/access/state_stream" "github.com/onflow/flow-go/engine/access/state_stream/backend" "github.com/onflow/flow-go/model/flow" From 6fc0c2979df3b5de666eb78f80afcfe6db0a12d4 Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv Date: Thu, 24 Oct 2024 18:42:33 +0300 Subject: [PATCH 05/22] Added WSBrokerHandler --- engine/access/rest/routes/router.go | 15 ++++ .../access/rest/routes/ws_broker_handler.go | 82 +++++++++++++++++++ engine/access/rest/server.go | 3 + 3 files changed, 100 insertions(+) create mode 100644 engine/access/rest/routes/ws_broker_handler.go diff --git a/engine/access/rest/routes/router.go b/engine/access/rest/routes/router.go index 57e505d7497..82a7b688293 100644 --- a/engine/access/rest/routes/router.go +++ b/engine/access/rest/routes/router.go @@ -12,6 +12,7 @@ import ( "github.com/onflow/flow-go/access" "github.com/onflow/flow-go/engine/access/rest/middleware" "github.com/onflow/flow-go/engine/access/rest/models" + "github.com/onflow/flow-go/engine/access/rest/ws_pub_sub/subscription_handlers" "github.com/onflow/flow-go/engine/access/state_stream" "github.com/onflow/flow-go/engine/access/state_stream/backend" "github.com/onflow/flow-go/model/flow" @@ -78,6 +79,20 @@ func (b *RouterBuilder) AddWsRoutes( return b } +// AddPubSubRoute adds WebSocket route for the pub/sub mechanism to the router. +func (b *RouterBuilder) AddPubSubRoute( + chain flow.Chain, + subHandlerFactory *subscription_handlers.SubscriptionHandlerFactory, +) *RouterBuilder { + b.v1SubRouter. + Methods(http.MethodGet). + Path("/ws"). + Name("ws"). + Handler(NewWSBrokerHandler(b.logger, chain, subHandlerFactory)) + + return b +} + func (b *RouterBuilder) Build() *mux.Router { return b.router } diff --git a/engine/access/rest/routes/ws_broker_handler.go b/engine/access/rest/routes/ws_broker_handler.go new file mode 100644 index 00000000000..d406e9d5b2a --- /dev/null +++ b/engine/access/rest/routes/ws_broker_handler.go @@ -0,0 +1,82 @@ +package routes + +import ( + "net/http" + + "github.com/gorilla/websocket" + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/engine/access/rest/models" + "github.com/onflow/flow-go/engine/access/rest/ws_pub_sub/subscription_handlers" + "github.com/onflow/flow-go/model/flow" +) + +// WSBrokerHandler handles WebSocket connections for pub/sub subscriptions. +// It upgrades incoming HTTP requests to WebSocket connections and manages the lifecycle of these connections. +// This handler uses a SubscriptionHandlerFactory to create subscription handlers that manage specific pub-sub topics. +type WSBrokerHandler struct { + *HttpHandler + + logger zerolog.Logger + subHandlerFactory *subscription_handlers.SubscriptionHandlerFactory +} + +var _ http.Handler = (*WSBrokerHandler)(nil) + +// NewWSBrokerHandler creates a new instance of WSBrokerHandler. +// It initializes the handler with the provided logger, blockchain chain, and a factory for subscription handlers. +// +// Parameters: +// - logger: Logger for recording internal events. +// - chain: Flow blockchain chain used for context. +// - subHandlerFactory: Factory for creating handlers that manage specific pub-sub subscriptions. +func NewWSBrokerHandler( + logger zerolog.Logger, + chain flow.Chain, + subHandlerFactory *subscription_handlers.SubscriptionHandlerFactory, +) *WSBrokerHandler { + return &WSBrokerHandler{ + logger: logger, + subHandlerFactory: subHandlerFactory, + HttpHandler: NewHttpHandler(logger, chain), + } +} + +// ServeHTTP upgrades HTTP requests to WebSocket connections and initializes pub/sub subscriptions. +// It acts as the main entry point for handling WebSocket pub/sub requests. +// +// Parameters: +// - w: The HTTP response writer. +// - r: The HTTP request being handled. +// +// Expected errors during normal operation: +// - http.StatusBadRequest: Request verification failed. +// - http.StatusInternalServerError: WebSocket upgrade error or internal issues. +func (h *WSBrokerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // create a logger + logger := h.Logger.With().Str("pub_sub_subscribe_url", r.URL.String()).Logger() + + err := h.VerifyRequest(w, r) + if err != nil { + // VerifyRequest sets the response error before returning + return + } + + // Upgrade the HTTP connection to a WebSocket connection + upgrader := websocket.Upgrader{ + // allow all origins by default, operators can override using a proxy + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + h.errorHandler(w, models.NewRestError(http.StatusInternalServerError, "webSocket upgrade error: ", err), logger) + return + } + defer conn.Close() + + //TODO: create WebSocketBroker + + //TODO: add readMessages and writeMessages using WebSocketBroker +} diff --git a/engine/access/rest/server.go b/engine/access/rest/server.go index 0d05fcd67cf..cb6e195cb62 100644 --- a/engine/access/rest/server.go +++ b/engine/access/rest/server.go @@ -47,6 +47,9 @@ func NewServer(serverAPI access.API, builder.AddWsRoutes(stateStreamApi, chain, stateStreamConfig) } + //TODO: add SubscriptionHandlerFactory + builder.AddPubSubRoute(chain, nil) + c := cors.New(cors.Options{ AllowedOrigins: []string{"*"}, AllowedHeaders: []string{"*"}, From f14769e95cd8dfc253ccae78e0d1ecd50e6a4b8b Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv Date: Thu, 24 Oct 2024 18:45:52 +0300 Subject: [PATCH 06/22] Added pub/sub route to server --- engine/access/rest/server.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/engine/access/rest/server.go b/engine/access/rest/server.go index 95eeb63ad56..6b2acc131c1 100644 --- a/engine/access/rest/server.go +++ b/engine/access/rest/server.go @@ -44,13 +44,16 @@ func NewServer(serverAPI access.API, stateStreamConfig backend.Config, ) (*http.Server, error) { builder := routes.NewRouterBuilder(logger, restCollector).AddRestRoutes(serverAPI, chain) - _ = subscription_handlers.NewSubscriptionHandlerFactory(stateStreamConfig.EventFilterConfig, stateStreamApi, serverAPI) if stateStreamApi != nil { builder.AddWsRoutes(stateStreamApi, chain, stateStreamConfig) } - //TODO: add SubscriptionHandlerFactory - builder.AddPubSubRoute(chain, nil) + subscriptionHandlerFactory := subscription_handlers.NewSubscriptionHandlerFactory( + stateStreamConfig.EventFilterConfig, + stateStreamApi, + serverAPI, + ) + builder.AddPubSubRoute(chain, subscriptionHandlerFactory) c := cors.New(cors.Options{ AllowedOrigins: []string{"*"}, From 4d98253f4c226edb4be6c730991191097209a6c9 Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv Date: Thu, 24 Oct 2024 18:48:17 +0300 Subject: [PATCH 07/22] Fixed imports --- engine/access/rest/routes/router.go | 2 +- engine/access/rest/routes/ws_broker_handler.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/access/rest/routes/router.go b/engine/access/rest/routes/router.go index 82a7b688293..8cf9097f914 100644 --- a/engine/access/rest/routes/router.go +++ b/engine/access/rest/routes/router.go @@ -12,7 +12,7 @@ import ( "github.com/onflow/flow-go/access" "github.com/onflow/flow-go/engine/access/rest/middleware" "github.com/onflow/flow-go/engine/access/rest/models" - "github.com/onflow/flow-go/engine/access/rest/ws_pub_sub/subscription_handlers" + "github.com/onflow/flow-go/engine/access/rest/routes/subscription_handlers" "github.com/onflow/flow-go/engine/access/state_stream" "github.com/onflow/flow-go/engine/access/state_stream/backend" "github.com/onflow/flow-go/model/flow" diff --git a/engine/access/rest/routes/ws_broker_handler.go b/engine/access/rest/routes/ws_broker_handler.go index d406e9d5b2a..6974ce730aa 100644 --- a/engine/access/rest/routes/ws_broker_handler.go +++ b/engine/access/rest/routes/ws_broker_handler.go @@ -7,7 +7,7 @@ import ( "github.com/rs/zerolog" "github.com/onflow/flow-go/engine/access/rest/models" - "github.com/onflow/flow-go/engine/access/rest/ws_pub_sub/subscription_handlers" + "github.com/onflow/flow-go/engine/access/rest/routes/subscription_handlers" "github.com/onflow/flow-go/model/flow" ) From dd88958f12244e026a304ee11e7ab0bde23305da Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Fri, 25 Oct 2024 13:44:52 +0300 Subject: [PATCH 08/22] Added WebSocketBroker component and basic subscription methods --- engine/access/rest/routes/websocket_broker.go | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 engine/access/rest/routes/websocket_broker.go diff --git a/engine/access/rest/routes/websocket_broker.go b/engine/access/rest/routes/websocket_broker.go new file mode 100644 index 00000000000..db546a23277 --- /dev/null +++ b/engine/access/rest/routes/websocket_broker.go @@ -0,0 +1,59 @@ +package routes + +import ( + "time" + + "github.com/gorilla/websocket" + "github.com/rs/zerolog" + "go.uber.org/atomic" + + "github.com/onflow/flow-go/engine/access/rest/routes/subscription_handlers" +) + +type LimitsConfiguration struct { + maxSubscriptions uint64 + activeSubscriptions *atomic.Uint64 + + maxResponsesPerSecond uint64 + sendMessageTimeout time.Duration +} + +type WebSocketBroker struct { + logger zerolog.Logger + + conn *websocket.Conn // WebSocket connection for communication with the client + + subs map[string]map[string]subscription_handlers.SubscriptionHandler // First key is the topic, second key is the subscription ID + + limitsConfiguration LimitsConfiguration // Limits on the maximum number of subscriptions per connection, responses per second, and send message timeout. + + readChannel chan interface{} // Channel to read messages from the client + broadcastChannel chan interface{} // Channel to read messages from node subscriptions +} + +func NewWebSocketBroker(logger zerolog.Logger, conn *websocket.Conn, limitsConfiguration LimitsConfiguration) *WebSocketBroker { + return &WebSocketBroker{ + logger: logger, + conn: conn, + limitsConfiguration: limitsConfiguration, + } +} + +// Triggered by the readMessages method when the action is subscribe. It extracts the topic from the message’s topic +// field, creates the appropriate SubscriptionHandler for the topic using the factory function CreateSubscription, +// and adds an instance of the new handler to the subs map. The client receives a notification confirming the successful subscription along with the specific ID. +func (w *WebSocketBroker) subscribe(topic string, arguments map[string]interface{}) { + +} + +// It is triggered by the readMessages method when the action is unsubscribe. It removes the relevant handler from +// the subs map by calling SubscriptionHandler::CloseSubscription and notifying the client of successful unsubscription. +func (w *WebSocketBroker) unsubscribe(subscriptionID string) { + +} + +// It is triggered by the readMessages method when the action is list_subscriptions. It gathers all active subscriptions +// for the current connection, formats the response, and sends it back to the client. +func (w *WebSocketBroker) listOfSubscriptions() { + +} From c030fb0e45fd089de3aa8a9f8a06b9a4e8cb625e Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Fri, 25 Oct 2024 13:51:05 +0300 Subject: [PATCH 09/22] created broker in brocker handler --- engine/access/rest/routes/websocket_broker.go | 19 +++++++++++++++++++ .../access/rest/routes/ws_broker_handler.go | 8 +++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/engine/access/rest/routes/websocket_broker.go b/engine/access/rest/routes/websocket_broker.go index db546a23277..1b09728c248 100644 --- a/engine/access/rest/routes/websocket_broker.go +++ b/engine/access/rest/routes/websocket_broker.go @@ -39,6 +39,25 @@ func NewWebSocketBroker(logger zerolog.Logger, conn *websocket.Conn, limitsConfi } } +// TODO: I would name this SetConnectionConfig +func (w *WebSocketBroker) SetWebsocketConf() error { + return nil +} + +/* +readMessages: +This method runs while the connection is active. It retrieves, validates, and processes client messages. Actions handled include subscribe, unsubscribe, and list_subscriptions. Additional actions can be added as needed. + +writeMessages: +This method runs while the connection is active, listening on the broadcast channel. It retrieves responses and sends them to the client. + +broadcastMessage: +This method is called by each SubscriptionHandler, receiving formatted subscription messages and writing them to the broadcast channel. + +pingPongHandler: +This method periodically checks the connection's availability using ping/pong messages and terminates the connection if the client becomes unresponsive. +*/ + // Triggered by the readMessages method when the action is subscribe. It extracts the topic from the message’s topic // field, creates the appropriate SubscriptionHandler for the topic using the factory function CreateSubscription, // and adds an instance of the new handler to the subs map. The client receives a notification confirming the successful subscription along with the specific ID. diff --git a/engine/access/rest/routes/ws_broker_handler.go b/engine/access/rest/routes/ws_broker_handler.go index 6974ce730aa..06c3b68169b 100644 --- a/engine/access/rest/routes/ws_broker_handler.go +++ b/engine/access/rest/routes/ws_broker_handler.go @@ -76,7 +76,13 @@ func (h *WSBrokerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } defer conn.Close() - //TODO: create WebSocketBroker + //TODO: fill LimitsConfiguration + wsBroker := NewWebSocketBroker(logger, conn, LimitsConfiguration{}) + err = wsBroker.SetWebsocketConf() + if err != nil { + // TODO: handle error + return + } //TODO: add readMessages and writeMessages using WebSocketBroker } From bfe710d051d473d2df90d03d64ada8caf49b4297 Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv Date: Mon, 28 Oct 2024 13:27:54 +0200 Subject: [PATCH 10/22] Added base ws brocker implementation --- engine/access/rest/routes/websocket_broker.go | 254 ++++++++++++++++-- .../access/rest/routes/ws_broker_handler.go | 7 +- 2 files changed, 243 insertions(+), 18 deletions(-) diff --git a/engine/access/rest/routes/websocket_broker.go b/engine/access/rest/routes/websocket_broker.go index 1b09728c248..9d711a76bf8 100644 --- a/engine/access/rest/routes/websocket_broker.go +++ b/engine/access/rest/routes/websocket_broker.go @@ -1,21 +1,49 @@ package routes import ( + "encoding/json" + "errors" + "fmt" + "net/http" "time" "github.com/gorilla/websocket" "github.com/rs/zerolog" "go.uber.org/atomic" + "github.com/onflow/flow-go/engine/access/rest/models" "github.com/onflow/flow-go/engine/access/rest/routes/subscription_handlers" ) +// Define a base struct to determine the action +type BaseMessage struct { + Action string `json:"action"` +} + +type SubscribeMessage struct { + BaseMessage + Topic string `json:"topic"` + Arguments map[string]interface{} `json:"arguments"` +} + +type UnsubscribeMessage struct { + BaseMessage + Topic string `json:"topic"` + ID string `json:"id"` +} + +type ListSubscriptionsMessage struct { + BaseMessage +} + type LimitsConfiguration struct { maxSubscriptions uint64 activeSubscriptions *atomic.Uint64 - maxResponsesPerSecond uint64 - sendMessageTimeout time.Duration + maxResponsesPerSecond uint64 + activeResponsesPerSecond uint64 + + sendMessageTimeout time.Duration } type WebSocketBroker struct { @@ -27,36 +55,230 @@ type WebSocketBroker struct { limitsConfiguration LimitsConfiguration // Limits on the maximum number of subscriptions per connection, responses per second, and send message timeout. - readChannel chan interface{} // Channel to read messages from the client + readChannel chan error // Channel to read messages from the client broadcastChannel chan interface{} // Channel to read messages from node subscriptions } func NewWebSocketBroker(logger zerolog.Logger, conn *websocket.Conn, limitsConfiguration LimitsConfiguration) *WebSocketBroker { - return &WebSocketBroker{ + websocketBroker := &WebSocketBroker{ logger: logger, conn: conn, limitsConfiguration: limitsConfiguration, } + websocketBroker.startResponseLimiter() + + return websocketBroker } -// TODO: I would name this SetConnectionConfig -func (w *WebSocketBroker) SetWebsocketConf() error { +func (w *WebSocketBroker) startResponseLimiter() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for range ticker.C { + w.limitsConfiguration.activeResponsesPerSecond = 0 // Reset the response count every second + } +} + +func (w *WebSocketBroker) SetConnectionConfig() error { + err := w.conn.SetWriteDeadline(time.Now().Add(writeWait)) // Set the initial write deadline for the first ping message + if err != nil { + return models.NewRestError(http.StatusInternalServerError, "Set the initial write deadline error: ", err) + } + err = w.conn.SetReadDeadline(time.Now().Add(pongWait)) // Set the initial read deadline for the first pong message + if err != nil { + return models.NewRestError(http.StatusInternalServerError, "Set the initial read deadline error: ", err) + } + // Establish a Pong handler + w.conn.SetPongHandler(func(string) error { + err := w.conn.SetReadDeadline(time.Now().Add(pongWait)) + if err != nil { + return err + } + return nil + }) + return nil } -/* -readMessages: -This method runs while the connection is active. It retrieves, validates, and processes client messages. Actions handled include subscribe, unsubscribe, and list_subscriptions. Additional actions can be added as needed. +// readMessages runs while the connection is active. It retrieves, validates, and processes client messages. +// Actions handled include subscribe, unsubscribe, and list_subscriptions. Additional actions can be added as needed. +// It continuously reads messages from the WebSocket connection and closes +// the associated read channel when the connection is closed by client +// +// This method should be called after establishing the WebSocket connection +// to handle incoming messages asynchronously. +func (w *WebSocketBroker) readMessages() { + // Start a goroutine to handle the WebSocket connection + defer close(w.readChannel) // notify websocket about closed connection + + for { + // reads messages from the WebSocket connection when + // 1) the connection is closed by client + // 2) unexpected message is received from the client + + // Step 1: Read JSON message into a byte slice + _, message, err := w.conn.ReadMessage() + if err != nil { + var closeError *websocket.CloseError + w.readChannel <- err + + //it means that client connection has been terminated, and we need to stop this goroutine + if errors.As(err, &closeError) { + return + } + } + + var baseMsg BaseMessage + if err := json.Unmarshal(message, &baseMsg); err != nil { + //process the case when message does not have "action" + err := fmt.Errorf("invalid message: message does not have 'action' field : %w", err) + + w.readChannel <- err + continue + } + + // Process based on the action type + switch baseMsg.Action { + case "subscribe": + var subscribeMsg SubscribeMessage + if err := json.Unmarshal(message, &subscribeMsg); err != nil { + err := fmt.Errorf("error parsing 'subscribe' message: %w", err) + + w.readChannel <- err + continue + } + + if w.limitsConfiguration.activeSubscriptions.Load() >= w.limitsConfiguration.maxSubscriptions { + err := fmt.Errorf("maximum number of streams reached") + err = models.NewRestError(http.StatusServiceUnavailable, err.Error(), err) + w.readChannel <- err -writeMessages: -This method runs while the connection is active, listening on the broadcast channel. It retrieves responses and sends them to the client. + continue + } + w.limitsConfiguration.activeSubscriptions.Add(1) -broadcastMessage: -This method is called by each SubscriptionHandler, receiving formatted subscription messages and writing them to the broadcast channel. + w.subscribe(subscribeMsg.Topic, subscribeMsg.Arguments) + case "unsubscribe": + var unsubscribeMsg UnsubscribeMessage + if err := json.Unmarshal(message, &unsubscribeMsg); err != nil { + err := fmt.Errorf("error parsing 'unsubscribe' message: %w", err) -pingPongHandler: -This method periodically checks the connection's availability using ping/pong messages and terminates the connection if the client becomes unresponsive. -*/ + w.readChannel <- err + continue + } + + w.limitsConfiguration.activeSubscriptions.Add(-1) + + w.unsubscribe(unsubscribeMsg.ID) + + case "list_subscriptions": + var listSubscriptionsMsg ListSubscriptionsMessage + if err := json.Unmarshal(message, &listSubscriptionsMsg); err != nil { + err := fmt.Errorf("error parsing 'unsubsclist_subscriptionsribe' message: %w", err) + + w.readChannel <- err + continue + } + w.listOfSubscriptions() + + default: + err := fmt.Errorf("unknown action type: %s", baseMsg.Action) + + w.readChannel <- err + continue + } + } +} + +// writeMessages runs while the connection is active, listening on the broadcast channel. +// It retrieves responses and sends them to the client. +func (w *WebSocketBroker) writeMessages() { + for { + select { + case err := <-w.readChannel: + // we use `readChannel` + // 1) as indicator of client's status, when `readChannel` closes it means that client + // connection has been terminated, and we need to stop this goroutine to avoid memory leak. + + var closeError *websocket.CloseError + if errors.As(err, &closeError) { + // TODO: write with "close" + + err = w.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(closeError.Code, closeError.Error()), time.Now().Add(time.Second)) + if err != nil { + w.logger.Error().Err(err).Msg(fmt.Sprintf("error sending WebSocket error: %v", err)) + } + + return + } + + // 2) as error receiver for any errors that occur during the reading process + err = w.conn.WriteMessage(websocket.TextMessage, []byte(err.Error())) + if err != nil { + //w.wsErrorHandler(err) + return + } + case data, ok := <-w.broadcastChannel: + if !ok { + _ = fmt.Errorf("broadcast channel closed, no error occurred") + //w.wsErrorHandler(models.NewRestError(http.StatusRequestTimeout, "broadcast channel closed", err)) + return + } + err := w.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err != nil { + //w.wsErrorHandler(models.NewRestError(http.StatusInternalServerError, "failed to set the initial write deadline: ", err)) + return + } + + // Write the response to the WebSocket connection + err = w.conn.WriteJSON(data) + if err != nil { + //w.wsErrorHandler(err) + return + } + } + } +} + +// broadcastMessage is called by each SubscriptionHandler, +// receiving formatted subscription messages and writing them to the broadcast channel. +func (w *WebSocketBroker) broadcastMessage(data interface{}) { + if w.limitsConfiguration.activeResponsesPerSecond >= w.limitsConfiguration.maxResponsesPerSecond { + time.Sleep(w.limitsConfiguration.sendMessageTimeout) // Adjust the sleep duration as needed + } + + // Send the message to the broadcast channel + w.broadcastChannel <- data + w.limitsConfiguration.activeResponsesPerSecond++ +} + +// pingPongHandler periodically checks the connection's availability using ping/pong messages and +// terminates the connection if the client becomes unresponsive. +func (w *WebSocketBroker) pingPongHandler() { + ticker := time.NewTicker(pingPeriod) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + err := w.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err != nil { + //w.wsErrorHandler(models.NewRestError(http.StatusInternalServerError, "failed to set the initial write deadline: ", err)) + return + } + if err := w.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + //w.wsErrorHandler(err) + return + } + case err := <-w.readChannel: + //it means that client connection has been terminated, and we need to stop this goroutine + var closeError *websocket.CloseError + if errors.As(err, &closeError) { + return + } + } + } +} // Triggered by the readMessages method when the action is subscribe. It extracts the topic from the message’s topic // field, creates the appropriate SubscriptionHandler for the topic using the factory function CreateSubscription, diff --git a/engine/access/rest/routes/ws_broker_handler.go b/engine/access/rest/routes/ws_broker_handler.go index 06c3b68169b..4981e4aa7e3 100644 --- a/engine/access/rest/routes/ws_broker_handler.go +++ b/engine/access/rest/routes/ws_broker_handler.go @@ -78,11 +78,14 @@ func (h *WSBrokerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { //TODO: fill LimitsConfiguration wsBroker := NewWebSocketBroker(logger, conn, LimitsConfiguration{}) - err = wsBroker.SetWebsocketConf() + err = wsBroker.SetConnectionConfig() + //TODO : if err != nil { // TODO: handle error return } - //TODO: add readMessages and writeMessages using WebSocketBroker + go wsBroker.readMessages() + go wsBroker.pingPongHandler() + wsBroker.writeMessages() } From 4aa0e592f3e0e2ed42cf41f1379378ebc9c2e651 Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Mon, 28 Oct 2024 17:07:24 +0200 Subject: [PATCH 11/22] Added implementations and responces for actions --- .../subscription_handler.go | 2 + engine/access/rest/routes/websocket_broker.go | 184 ++++++++++++++++-- .../access/rest/routes/ws_broker_handler.go | 4 +- 3 files changed, 168 insertions(+), 22 deletions(-) diff --git a/engine/access/rest/routes/subscription_handlers/subscription_handler.go b/engine/access/rest/routes/subscription_handlers/subscription_handler.go index 5a36438e447..c49fd01ccb1 100644 --- a/engine/access/rest/routes/subscription_handlers/subscription_handler.go +++ b/engine/access/rest/routes/subscription_handlers/subscription_handler.go @@ -17,6 +17,8 @@ const ( ) type SubscriptionHandler interface { + ID() string + Topic() string Close() error } diff --git a/engine/access/rest/routes/websocket_broker.go b/engine/access/rest/routes/websocket_broker.go index 9d711a76bf8..911683afccf 100644 --- a/engine/access/rest/routes/websocket_broker.go +++ b/engine/access/rest/routes/websocket_broker.go @@ -15,6 +15,13 @@ import ( "github.com/onflow/flow-go/engine/access/rest/routes/subscription_handlers" ) +// Constants representing action types. +const ( + SubscribeAction = "subscribe" // Action for subscription message + UnsubscribeAction = "unsubscribe" // Action for unsubscription message + ListSubscriptionsAction = "list_subscriptions" // Action to list active subscriptions +) + // Define a base struct to determine the action type BaseMessage struct { Action string `json:"action"` @@ -26,16 +33,45 @@ type SubscribeMessage struct { Arguments map[string]interface{} `json:"arguments"` } +// SubscribeMessageResponse represents the response to a subscription message. +// It includes the topic and a unique subscription ID. +type SubscribeMessageResponse struct { + BaseMessage + Topic string `json:"topic"` + ID string `json:"id"` +} + type UnsubscribeMessage struct { BaseMessage Topic string `json:"topic"` ID string `json:"id"` } +// UnsubscribeMessageResponse represents the response to an unsubscription message. +// It includes the topic and subscription ID for confirmation. +type UnsubscribeMessageResponse struct { + BaseMessage + Topic string `json:"topic"` + ID string `json:"id"` +} + type ListSubscriptionsMessage struct { BaseMessage } +// SubscriptionEntry represents an active subscription entry with a specific topic and unique identifier. +type SubscriptionEntry struct { + Topic string `json:"topic"` + ID string `json:"id"` +} + +// ListSubscriptionsMessageResponse is the structure used to respond to list_subscriptions requests. +// It contains a list of active subscriptions for the current WebSocket connection. +type ListSubscriptionsMessageResponse struct { + BaseMessage + Subscriptions []SubscriptionEntry `json:"subscriptions"` +} + type LimitsConfiguration struct { maxSubscriptions uint64 activeSubscriptions *atomic.Uint64 @@ -47,11 +83,11 @@ type LimitsConfiguration struct { } type WebSocketBroker struct { - logger zerolog.Logger - - conn *websocket.Conn // WebSocket connection for communication with the client + logger zerolog.Logger + subHandlerFactory *subscription_handlers.SubscriptionHandlerFactory + conn *websocket.Conn // WebSocket connection for communication with the client - subs map[string]map[string]subscription_handlers.SubscriptionHandler // First key is the topic, second key is the subscription ID + subs map[string]subscription_handlers.SubscriptionHandler // First key is the subscription ID, second key is the topic limitsConfiguration LimitsConfiguration // Limits on the maximum number of subscriptions per connection, responses per second, and send message timeout. @@ -59,11 +95,18 @@ type WebSocketBroker struct { broadcastChannel chan interface{} // Channel to read messages from node subscriptions } -func NewWebSocketBroker(logger zerolog.Logger, conn *websocket.Conn, limitsConfiguration LimitsConfiguration) *WebSocketBroker { +func NewWebSocketBroker( + logger zerolog.Logger, + conn *websocket.Conn, + limitsConfiguration LimitsConfiguration, + subHandlerFactory *subscription_handlers.SubscriptionHandlerFactory, +) *WebSocketBroker { websocketBroker := &WebSocketBroker{ - logger: logger, + logger: logger.With().Str("component", "websocket-broker").Logger(), conn: conn, limitsConfiguration: limitsConfiguration, + subHandlerFactory: subHandlerFactory, + subs: make(map[string]subscription_handlers.SubscriptionHandler), } websocketBroker.startResponseLimiter() @@ -139,7 +182,7 @@ func (w *WebSocketBroker) readMessages() { // Process based on the action type switch baseMsg.Action { - case "subscribe": + case SubscribeAction: var subscribeMsg SubscribeMessage if err := json.Unmarshal(message, &subscribeMsg); err != nil { err := fmt.Errorf("error parsing 'subscribe' message: %w", err) @@ -157,8 +200,8 @@ func (w *WebSocketBroker) readMessages() { } w.limitsConfiguration.activeSubscriptions.Add(1) - w.subscribe(subscribeMsg.Topic, subscribeMsg.Arguments) - case "unsubscribe": + w.subscribe(&subscribeMsg) + case UnsubscribeAction: var unsubscribeMsg UnsubscribeMessage if err := json.Unmarshal(message, &unsubscribeMsg); err != nil { err := fmt.Errorf("error parsing 'unsubscribe' message: %w", err) @@ -169,9 +212,9 @@ func (w *WebSocketBroker) readMessages() { w.limitsConfiguration.activeSubscriptions.Add(-1) - w.unsubscribe(unsubscribeMsg.ID) + w.unsubscribe(&unsubscribeMsg) - case "list_subscriptions": + case ListSubscriptionsAction: var listSubscriptionsMsg ListSubscriptionsMessage if err := json.Unmarshal(message, &listSubscriptionsMsg); err != nil { err := fmt.Errorf("error parsing 'unsubsclist_subscriptionsribe' message: %w", err) @@ -280,21 +323,122 @@ func (w *WebSocketBroker) pingPongHandler() { } } -// Triggered by the readMessages method when the action is subscribe. It extracts the topic from the message’s topic -// field, creates the appropriate SubscriptionHandler for the topic using the factory function CreateSubscription, -// and adds an instance of the new handler to the subs map. The client receives a notification confirming the successful subscription along with the specific ID. -func (w *WebSocketBroker) subscribe(topic string, arguments map[string]interface{}) { +// subscribe processes a request to subscribe to a specific topic. It uses the topic field in +// the message to create a SubscriptionHandler, which is then added to the `subs` map to track +// active subscriptions. A confirmation response is sent back to the client with the subscription ID. +// +// This method is triggered by the readMessages method when the action is "subscribe". +// +// Example response sent to client: +// +// { +// "action": "subscribe", +// "topic": "example_topic", +// "id": "sub_id_1" +// } +func (w *WebSocketBroker) subscribe(msg *SubscribeMessage) { + subHandler, err := w.subHandlerFactory.CreateSubscriptionHandler(msg.Topic, msg.Arguments, func(bytes []byte) error { + w.logger.Info().Msg(fmt.Sprintf("message: %s", string(bytes))) + return nil + }) + if err != nil { + // TODO: Handle as client error response + w.logger.Err(err).Msg("Subscription handler creation failed") + return + } + + w.subs[subHandler.ID()] = subHandler + w.broadcastMessage(SubscribeMessageResponse{ + BaseMessage: BaseMessage{ + Action: SubscribeAction, + }, + Topic: subHandler.Topic(), + ID: subHandler.ID(), + }) } -// It is triggered by the readMessages method when the action is unsubscribe. It removes the relevant handler from -// the subs map by calling SubscriptionHandler::CloseSubscription and notifying the client of successful unsubscription. -func (w *WebSocketBroker) unsubscribe(subscriptionID string) { +// unsubscribe processes a request to cancel an active subscription, identified by its ID. +// It removes the relevant SubscriptionHandler from the `subs` map, closes the handler, +// and sends a confirmation response to the client. +// +// This method is triggered by the readMessages method when the action is "unsubscribe". +// +// Example response sent to client: +// +// { +// "action": "unsubscribe", +// "topic": "example_topic", +// "id": "sub_id_1" +// } +func (w *WebSocketBroker) unsubscribe(msg *UnsubscribeMessage) { + sub, found := w.subs[msg.ID] + if !found { + // TODO: Handle as client error response + w.logger.Info().Msg(fmt.Sprintf("No subscription found for ID %s", msg.ID)) + return + } + + response := UnsubscribeMessageResponse{ + BaseMessage: BaseMessage{ + Action: UnsubscribeAction, + }, + Topic: sub.Topic(), + ID: sub.ID(), + } + + if err := sub.Close(); err != nil { + // TODO: Handle as client error response + w.logger.Err(err).Msgf("Failed to close subscription with ID %s", msg.ID) + return + } + + delete(w.subs, msg.ID) + w.broadcastMessage(response) } -// It is triggered by the readMessages method when the action is list_subscriptions. It gathers all active subscriptions -// for the current connection, formats the response, and sends it back to the client. +// listOfSubscriptions gathers all active subscriptions for the current WebSocket connection, +// formats them into a ListSubscriptionsMessageResponse, and sends the response to the client. +// +// This method is triggered by the readMessages handler when the action "list_subscriptions" is received. +// +// Example message structure sent to the client: +// +// { +// "action": "list_subscriptions", +// "subscriptions": [ +// {"topic": "example_topic_1", "id": "sub_id_1"}, +// {"topic": "example_topic_2", "id": "sub_id_2"} +// ] +// } func (w *WebSocketBroker) listOfSubscriptions() { + response := ListSubscriptionsMessageResponse{ + BaseMessage: BaseMessage{ + Action: ListSubscriptionsAction, + }, + Subscriptions: make([]SubscriptionEntry, 0, len(w.subs)), + } + for id, sub := range w.subs { + response.Subscriptions = append(response.Subscriptions, SubscriptionEntry{ + Topic: sub.Topic(), + ID: id, + }) + } + + w.broadcastMessage(response) +} + +// clearSubscriptions closes each SubscriptionHandler in the subs map and +// removes all entries from the map. +func (w *WebSocketBroker) clearSubscriptions() { + for id, sub := range w.subs { + // Attempt to close the subscription + if err := sub.Close(); err != nil { + w.logger.Err(err).Msgf("Failed to close subscription with ID %s", id) + } + // Remove the subscription from the map + delete(w.subs, id) + } } diff --git a/engine/access/rest/routes/ws_broker_handler.go b/engine/access/rest/routes/ws_broker_handler.go index 4981e4aa7e3..6307728d918 100644 --- a/engine/access/rest/routes/ws_broker_handler.go +++ b/engine/access/rest/routes/ws_broker_handler.go @@ -24,7 +24,7 @@ type WSBrokerHandler struct { var _ http.Handler = (*WSBrokerHandler)(nil) // NewWSBrokerHandler creates a new instance of WSBrokerHandler. -// It initializes the handler with the provided logger, blockchain chain, and a factory for subscription handlers. +// It initializes the handler with the provided logger, blockchain chain, and a subHandlerFactory for subscription handlers. // // Parameters: // - logger: Logger for recording internal events. @@ -77,7 +77,7 @@ func (h *WSBrokerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { defer conn.Close() //TODO: fill LimitsConfiguration - wsBroker := NewWebSocketBroker(logger, conn, LimitsConfiguration{}) + wsBroker := NewWebSocketBroker(logger, conn, LimitsConfiguration{}, h.subHandlerFactory) err = wsBroker.SetConnectionConfig() //TODO : if err != nil { From 258866104905a8f13fbcc43aabc9b4f4a53d361a Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Mon, 28 Oct 2024 17:14:04 +0200 Subject: [PATCH 12/22] Pass broadcast message --- .../routes/subscription_handlers/subscription_handler.go | 2 +- engine/access/rest/routes/websocket_broker.go | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/engine/access/rest/routes/subscription_handlers/subscription_handler.go b/engine/access/rest/routes/subscription_handlers/subscription_handler.go index c49fd01ccb1..1898a58d213 100644 --- a/engine/access/rest/routes/subscription_handlers/subscription_handler.go +++ b/engine/access/rest/routes/subscription_handlers/subscription_handler.go @@ -37,7 +37,7 @@ func NewSubscriptionHandlerFactory(eventFilterConfig state_stream.EventFilterCon } } -func (s *SubscriptionHandlerFactory) CreateSubscriptionHandler(topic string, arguments map[string]interface{}, broadcastMessage func([]byte) error) (SubscriptionHandler, error) { +func (s *SubscriptionHandlerFactory) CreateSubscriptionHandler(topic string, arguments map[string]interface{}, broadcastMessage func(interface{})) (SubscriptionHandler, error) { switch topic { // TODO: Implemented handlers for each topic should be added in respective case case EventsTopic, diff --git a/engine/access/rest/routes/websocket_broker.go b/engine/access/rest/routes/websocket_broker.go index 911683afccf..640a67adcb0 100644 --- a/engine/access/rest/routes/websocket_broker.go +++ b/engine/access/rest/routes/websocket_broker.go @@ -337,10 +337,7 @@ func (w *WebSocketBroker) pingPongHandler() { // "id": "sub_id_1" // } func (w *WebSocketBroker) subscribe(msg *SubscribeMessage) { - subHandler, err := w.subHandlerFactory.CreateSubscriptionHandler(msg.Topic, msg.Arguments, func(bytes []byte) error { - w.logger.Info().Msg(fmt.Sprintf("message: %s", string(bytes))) - return nil - }) + subHandler, err := w.subHandlerFactory.CreateSubscriptionHandler(msg.Topic, msg.Arguments, w.broadcastMessage) if err != nil { // TODO: Handle as client error response w.logger.Err(err).Msg("Subscription handler creation failed") From fb6fceb731e6ef623dfa91604c81cd54bf8830a8 Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk Date: Mon, 28 Oct 2024 17:28:12 +0200 Subject: [PATCH 13/22] Added BaseMessageResponse --- engine/access/rest/routes/websocket_broker.go | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/engine/access/rest/routes/websocket_broker.go b/engine/access/rest/routes/websocket_broker.go index 640a67adcb0..65d2d365fe9 100644 --- a/engine/access/rest/routes/websocket_broker.go +++ b/engine/access/rest/routes/websocket_broker.go @@ -27,6 +27,11 @@ type BaseMessage struct { Action string `json:"action"` } +type BaseMessageResponse struct { + Action string `json:"action"` + Success bool `json:"success"` +} + type SubscribeMessage struct { BaseMessage Topic string `json:"topic"` @@ -36,7 +41,7 @@ type SubscribeMessage struct { // SubscribeMessageResponse represents the response to a subscription message. // It includes the topic and a unique subscription ID. type SubscribeMessageResponse struct { - BaseMessage + BaseMessageResponse Topic string `json:"topic"` ID string `json:"id"` } @@ -50,7 +55,7 @@ type UnsubscribeMessage struct { // UnsubscribeMessageResponse represents the response to an unsubscription message. // It includes the topic and subscription ID for confirmation. type UnsubscribeMessageResponse struct { - BaseMessage + BaseMessageResponse Topic string `json:"topic"` ID string `json:"id"` } @@ -68,7 +73,7 @@ type SubscriptionEntry struct { // ListSubscriptionsMessageResponse is the structure used to respond to list_subscriptions requests. // It contains a list of active subscriptions for the current WebSocket connection. type ListSubscriptionsMessageResponse struct { - BaseMessage + BaseMessageResponse Subscriptions []SubscriptionEntry `json:"subscriptions"` } @@ -347,8 +352,9 @@ func (w *WebSocketBroker) subscribe(msg *SubscribeMessage) { w.subs[subHandler.ID()] = subHandler w.broadcastMessage(SubscribeMessageResponse{ - BaseMessage: BaseMessage{ - Action: SubscribeAction, + BaseMessageResponse: BaseMessageResponse{ + Action: SubscribeAction, + Success: true, }, Topic: subHandler.Topic(), ID: subHandler.ID(), @@ -377,8 +383,9 @@ func (w *WebSocketBroker) unsubscribe(msg *UnsubscribeMessage) { } response := UnsubscribeMessageResponse{ - BaseMessage: BaseMessage{ - Action: UnsubscribeAction, + BaseMessageResponse: BaseMessageResponse{ + Action: UnsubscribeAction, + Success: true, }, Topic: sub.Topic(), ID: sub.ID(), @@ -386,6 +393,7 @@ func (w *WebSocketBroker) unsubscribe(msg *UnsubscribeMessage) { if err := sub.Close(); err != nil { // TODO: Handle as client error response + response.Success = false w.logger.Err(err).Msgf("Failed to close subscription with ID %s", msg.ID) return } @@ -411,8 +419,9 @@ func (w *WebSocketBroker) unsubscribe(msg *UnsubscribeMessage) { // } func (w *WebSocketBroker) listOfSubscriptions() { response := ListSubscriptionsMessageResponse{ - BaseMessage: BaseMessage{ - Action: ListSubscriptionsAction, + BaseMessageResponse: BaseMessageResponse{ + Action: ListSubscriptionsAction, + Success: true, }, Subscriptions: make([]SubscriptionEntry, 0, len(w.subs)), } From e878c082ee679f5ab36f12284fdcc108a2b1b8fd Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv Date: Tue, 29 Oct 2024 10:26:40 +0200 Subject: [PATCH 14/22] Updated read and write messages, upgraded error handling --- engine/access/rest/routes/websocket_broker.go | 323 ++++++++++-------- .../access/rest/routes/ws_broker_handler.go | 18 +- 2 files changed, 194 insertions(+), 147 deletions(-) diff --git a/engine/access/rest/routes/websocket_broker.go b/engine/access/rest/routes/websocket_broker.go index 65d2d365fe9..106134707c5 100644 --- a/engine/access/rest/routes/websocket_broker.go +++ b/engine/access/rest/routes/websocket_broker.go @@ -23,7 +23,7 @@ const ( ) // Define a base struct to determine the action -type BaseMessage struct { +type BaseMessageRequest struct { Action string `json:"action"` } @@ -32,8 +32,8 @@ type BaseMessageResponse struct { Success bool `json:"success"` } -type SubscribeMessage struct { - BaseMessage +type SubscribeMessageRequest struct { + BaseMessageRequest Topic string `json:"topic"` Arguments map[string]interface{} `json:"arguments"` } @@ -46,8 +46,8 @@ type SubscribeMessageResponse struct { ID string `json:"id"` } -type UnsubscribeMessage struct { - BaseMessage +type UnsubscribeMessageRequest struct { + BaseMessageRequest Topic string `json:"topic"` ID string `json:"id"` } @@ -60,8 +60,8 @@ type UnsubscribeMessageResponse struct { ID string `json:"id"` } -type ListSubscriptionsMessage struct { - BaseMessage +type ListSubscriptionsMessageRequest struct { + BaseMessageRequest } // SubscriptionEntry represents an active subscription entry with a specific topic and unique identifier. @@ -82,7 +82,7 @@ type LimitsConfiguration struct { activeSubscriptions *atomic.Uint64 maxResponsesPerSecond uint64 - activeResponsesPerSecond uint64 + activeResponsesPerSecond *atomic.Uint64 sendMessageTimeout time.Duration } @@ -96,7 +96,7 @@ type WebSocketBroker struct { limitsConfiguration LimitsConfiguration // Limits on the maximum number of subscriptions per connection, responses per second, and send message timeout. - readChannel chan error // Channel to read messages from the client + errChannel chan error // Channel to read messages from the client broadcastChannel chan interface{} // Channel to read messages from node subscriptions } @@ -113,41 +113,85 @@ func NewWebSocketBroker( subHandlerFactory: subHandlerFactory, subs: make(map[string]subscription_handlers.SubscriptionHandler), } - websocketBroker.startResponseLimiter() + go websocketBroker.resetResponseLimit() return websocketBroker } -func (w *WebSocketBroker) startResponseLimiter() { +// resetResponseLimit resets the response limit every second. +func (w *WebSocketBroker) resetResponseLimit() { ticker := time.NewTicker(time.Second) defer ticker.Stop() for range ticker.C { - w.limitsConfiguration.activeResponsesPerSecond = 0 // Reset the response count every second + w.limitsConfiguration.activeResponsesPerSecond.Store(0) // Reset the response count every second } } -func (w *WebSocketBroker) SetConnectionConfig() error { - err := w.conn.SetWriteDeadline(time.Now().Add(writeWait)) // Set the initial write deadline for the first ping message - if err != nil { +func (w *WebSocketBroker) configureConnection() error { + if err := w.conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { // Set the initial write deadline for the first ping message return models.NewRestError(http.StatusInternalServerError, "Set the initial write deadline error: ", err) } - err = w.conn.SetReadDeadline(time.Now().Add(pongWait)) // Set the initial read deadline for the first pong message - if err != nil { + if err := w.conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil { // Set the initial read deadline for the first pong message return models.NewRestError(http.StatusInternalServerError, "Set the initial read deadline error: ", err) } // Establish a Pong handler w.conn.SetPongHandler(func(string) error { - err := w.conn.SetReadDeadline(time.Now().Add(pongWait)) - if err != nil { - return err - } - return nil + return w.conn.SetReadDeadline(time.Now().Add(pongWait)) }) return nil } +// resolveWebSocketError handles WebSocket errors. +// +// If the error is an instance of models.StatusError, the function extracts the +// relevant information like status code and user message to construct the WebSocket +// close code and message. If the error is not a models.StatusError, a default +// internal server error close code and the error's message are used. +func (w *WebSocketBroker) resolveWebSocketError(err error) (int, string) { + // rest status type error should be returned with status and user message provided + var statusErr models.StatusError + + if errors.As(err, &statusErr) { + wsMsg := statusErr.UserMessage() + + if statusErr.Status() == http.StatusServiceUnavailable { + return websocket.CloseTryAgainLater, wsMsg + } + if statusErr.Status() == http.StatusRequestTimeout { + return websocket.CloseGoingAway, wsMsg + } + } + + return websocket.CloseInternalServerErr, err.Error() +} + +// handleWSError handles errors that should close the WebSocket connection gracefully. +// It retrieves the WebSocket close code and message, sends a close message to the client, +// closes read and broadcast channels, and clears all active subscriptions. +func (w *WebSocketBroker) handleWSError(err error) { + // Get WebSocket close code and message from the error + wsCode, wsMsg := w.resolveWebSocketError(err) + + // Send the close message to the client + closeMessage := websocket.FormatCloseMessage(wsCode, wsMsg) + err = w.conn.WriteControl(websocket.CloseMessage, closeMessage, time.Now().Add(time.Second)) + if err != nil { + w.logger.Error().Err(err).Msgf("error sending WebSocket CloseMessage error: %v", err) + } + + w.cleanupAfterError() +} + +func (w *WebSocketBroker) sendError(err error) { + // Attempt to send the error message + if err := w.conn.WriteMessage(websocket.TextMessage, []byte(err.Error())); err != nil { + // TODO: check if we need log error of handleWSError here + w.logger.Error().Err(err).Msgf("error sending WebSocket error: %v", err) + } +} + // readMessages runs while the connection is active. It retrieves, validates, and processes client messages. // Actions handled include subscribe, unsubscribe, and list_subscriptions. Additional actions can be added as needed. // It continuously reads messages from the WebSocket connection and closes @@ -156,176 +200,163 @@ func (w *WebSocketBroker) SetConnectionConfig() error { // This method should be called after establishing the WebSocket connection // to handle incoming messages asynchronously. func (w *WebSocketBroker) readMessages() { - // Start a goroutine to handle the WebSocket connection - defer close(w.readChannel) // notify websocket about closed connection - for { // reads messages from the WebSocket connection when // 1) the connection is closed by client // 2) unexpected message is received from the client - // Step 1: Read JSON message into a byte slice _, message, err := w.conn.ReadMessage() if err != nil { - var closeError *websocket.CloseError - w.readChannel <- err - //it means that client connection has been terminated, and we need to stop this goroutine - if errors.As(err, &closeError) { + if websocket.IsCloseError(err) { return } - } - - var baseMsg BaseMessage - if err := json.Unmarshal(message, &baseMsg); err != nil { - //process the case when message does not have "action" - err := fmt.Errorf("invalid message: message does not have 'action' field : %w", err) - - w.readChannel <- err + w.errChannel <- err continue } - // Process based on the action type - switch baseMsg.Action { - case SubscribeAction: - var subscribeMsg SubscribeMessage - if err := json.Unmarshal(message, &subscribeMsg); err != nil { - err := fmt.Errorf("error parsing 'subscribe' message: %w", err) - - w.readChannel <- err - continue - } - - if w.limitsConfiguration.activeSubscriptions.Load() >= w.limitsConfiguration.maxSubscriptions { - err := fmt.Errorf("maximum number of streams reached") - err = models.NewRestError(http.StatusServiceUnavailable, err.Error(), err) - w.readChannel <- err + if err := w.processMessage(message); err != nil { + w.errChannel <- err + } + } +} - continue - } - w.limitsConfiguration.activeSubscriptions.Add(1) +// Process message based on action type +func (w *WebSocketBroker) processMessage(message []byte) error { + var baseMsg BaseMessageRequest + if err := json.Unmarshal(message, &baseMsg); err != nil { + return models.NewRestError(http.StatusBadRequest, "invalid message structure", err) + } + switch baseMsg.Action { + case SubscribeAction: + return w.handleSubscribeRequest(message) + case UnsubscribeAction: + return w.handleUnsubscribeRequest(message) + case ListSubscriptionsAction: + return w.handleListSubscriptionsRequest(message) + default: + err := fmt.Errorf("unknown action type: %s", baseMsg.Action) + return models.NewRestError(http.StatusBadRequest, err.Error(), err) + } +} - w.subscribe(&subscribeMsg) - case UnsubscribeAction: - var unsubscribeMsg UnsubscribeMessage - if err := json.Unmarshal(message, &unsubscribeMsg); err != nil { - err := fmt.Errorf("error parsing 'unsubscribe' message: %w", err) +func (w *WebSocketBroker) handleSubscribeRequest(message []byte) error { + var subscribeMsg SubscribeMessageRequest + if err := json.Unmarshal(message, &subscribeMsg); err != nil { + return models.NewRestError(http.StatusBadRequest, "failed to parse 'subscribe' message", err) + } - w.readChannel <- err - continue - } + if w.limitsConfiguration.activeSubscriptions.Load() >= w.limitsConfiguration.maxSubscriptions { + return models.NewRestError(http.StatusServiceUnavailable, "max subscriptions reached", nil) - w.limitsConfiguration.activeSubscriptions.Add(-1) + } + w.limitsConfiguration.activeSubscriptions.Add(1) + w.subscribe(&subscribeMsg) - w.unsubscribe(&unsubscribeMsg) + return nil +} - case ListSubscriptionsAction: - var listSubscriptionsMsg ListSubscriptionsMessage - if err := json.Unmarshal(message, &listSubscriptionsMsg); err != nil { - err := fmt.Errorf("error parsing 'unsubsclist_subscriptionsribe' message: %w", err) +func (w *WebSocketBroker) handleUnsubscribeRequest(message []byte) error { + var unsubscribeMsg UnsubscribeMessageRequest + if err := json.Unmarshal(message, &unsubscribeMsg); err != nil { + return models.NewRestError(http.StatusBadRequest, "failed to parse 'unsubscribe' message", err) + } - w.readChannel <- err - continue - } - w.listOfSubscriptions() + w.limitsConfiguration.activeSubscriptions.Add(-1) + w.unsubscribe(&unsubscribeMsg) - default: - err := fmt.Errorf("unknown action type: %s", baseMsg.Action) + return nil +} - w.readChannel <- err - continue - } +func (w *WebSocketBroker) handleListSubscriptionsRequest(message []byte) error { + var listSubscriptionsMsg ListSubscriptionsMessageRequest + if err := json.Unmarshal(message, &listSubscriptionsMsg); err != nil { + return models.NewRestError(http.StatusBadRequest, "failed to parse 'list_subscriptions' message", err) } + w.listOfSubscriptions() + + return nil } // writeMessages runs while the connection is active, listening on the broadcast channel. // It retrieves responses and sends them to the client. func (w *WebSocketBroker) writeMessages() { + ticker := time.NewTicker(pingPeriod) + defer ticker.Stop() + for { select { - case err := <-w.readChannel: - // we use `readChannel` - // 1) as indicator of client's status, when `readChannel` closes it means that client + case err := <-w.errChannel: + // we use errChannel + // 1) as indicator of client's status, when errChannel closes it means that client // connection has been terminated, and we need to stop this goroutine to avoid memory leak. - - var closeError *websocket.CloseError - if errors.As(err, &closeError) { - // TODO: write with "close" - - err = w.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(closeError.Code, closeError.Error()), time.Now().Add(time.Second)) - if err != nil { - w.logger.Error().Err(err).Msg(fmt.Sprintf("error sending WebSocket error: %v", err)) - } - + if websocket.IsCloseError(err) { + w.handleWSError(err) return } // 2) as error receiver for any errors that occur during the reading process - err = w.conn.WriteMessage(websocket.TextMessage, []byte(err.Error())) - if err != nil { - //w.wsErrorHandler(err) - return - } + w.sendError(err) case data, ok := <-w.broadcastChannel: if !ok { - _ = fmt.Errorf("broadcast channel closed, no error occurred") - //w.wsErrorHandler(models.NewRestError(http.StatusRequestTimeout, "broadcast channel closed", err)) + err := fmt.Errorf("broadcast channel closed, no error occurred") + w.handleWSError(models.NewRestError(http.StatusRequestTimeout, "broadcast channel closed", err)) return } - err := w.conn.SetWriteDeadline(time.Now().Add(writeWait)) - if err != nil { - //w.wsErrorHandler(models.NewRestError(http.StatusInternalServerError, "failed to set the initial write deadline: ", err)) + + if err := w.sendData(data); err != nil { return } - - // Write the response to the WebSocket connection - err = w.conn.WriteJSON(data) - if err != nil { - //w.wsErrorHandler(err) + case <-ticker.C: + if err := w.sendPing(); err != nil { return } } } } +// sendData sends a JSON message to the WebSocket client, setting the write deadline. +// Returns an error if the write fails, causing the connection to close. +func (w *WebSocketBroker) sendData(data interface{}) error { + if err := w.conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { + w.handleWSError(models.NewRestError(http.StatusInternalServerError, "failed to set write deadline", err)) + return err + } + + if err := w.conn.WriteJSON(data); err != nil { + w.handleWSError(err) + return err + } + + return nil +} + +// sendPing sends a periodic ping message to the WebSocket client to keep the connection alive. +func (w *WebSocketBroker) sendPing() error { + if err := w.conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { + w.handleWSError(models.NewRestError(http.StatusInternalServerError, "failed to set the initial write deadline for ping", err)) + return err + } + + if err := w.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + w.handleWSError(err) + return err + } + + return nil +} + // broadcastMessage is called by each SubscriptionHandler, // receiving formatted subscription messages and writing them to the broadcast channel. func (w *WebSocketBroker) broadcastMessage(data interface{}) { - if w.limitsConfiguration.activeResponsesPerSecond >= w.limitsConfiguration.maxResponsesPerSecond { + if w.limitsConfiguration.activeResponsesPerSecond.Load() >= w.limitsConfiguration.maxResponsesPerSecond { + // TODO: recheck edge cases time.Sleep(w.limitsConfiguration.sendMessageTimeout) // Adjust the sleep duration as needed } // Send the message to the broadcast channel w.broadcastChannel <- data - w.limitsConfiguration.activeResponsesPerSecond++ -} - -// pingPongHandler periodically checks the connection's availability using ping/pong messages and -// terminates the connection if the client becomes unresponsive. -func (w *WebSocketBroker) pingPongHandler() { - ticker := time.NewTicker(pingPeriod) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - err := w.conn.SetWriteDeadline(time.Now().Add(writeWait)) - if err != nil { - //w.wsErrorHandler(models.NewRestError(http.StatusInternalServerError, "failed to set the initial write deadline: ", err)) - return - } - if err := w.conn.WriteMessage(websocket.PingMessage, nil); err != nil { - //w.wsErrorHandler(err) - return - } - case err := <-w.readChannel: - //it means that client connection has been terminated, and we need to stop this goroutine - var closeError *websocket.CloseError - if errors.As(err, &closeError) { - return - } - } - } + w.limitsConfiguration.activeResponsesPerSecond.Add(1) } // subscribe processes a request to subscribe to a specific topic. It uses the topic field in @@ -341,11 +372,11 @@ func (w *WebSocketBroker) pingPongHandler() { // "topic": "example_topic", // "id": "sub_id_1" // } -func (w *WebSocketBroker) subscribe(msg *SubscribeMessage) { +func (w *WebSocketBroker) subscribe(msg *SubscribeMessageRequest) { subHandler, err := w.subHandlerFactory.CreateSubscriptionHandler(msg.Topic, msg.Arguments, w.broadcastMessage) if err != nil { - // TODO: Handle as client error response w.logger.Err(err).Msg("Subscription handler creation failed") + w.sendError(err) return } @@ -374,11 +405,12 @@ func (w *WebSocketBroker) subscribe(msg *SubscribeMessage) { // "topic": "example_topic", // "id": "sub_id_1" // } -func (w *WebSocketBroker) unsubscribe(msg *UnsubscribeMessage) { +func (w *WebSocketBroker) unsubscribe(msg *UnsubscribeMessageRequest) { sub, found := w.subs[msg.ID] if !found { - // TODO: Handle as client error response w.logger.Info().Msg(fmt.Sprintf("No subscription found for ID %s", msg.ID)) + // TODO: update error + w.sendError(fmt.Errorf("no subscription found for ID: %s", msg.ID)) return } @@ -392,9 +424,9 @@ func (w *WebSocketBroker) unsubscribe(msg *UnsubscribeMessage) { } if err := sub.Close(); err != nil { - // TODO: Handle as client error response response.Success = false w.logger.Err(err).Msgf("Failed to close subscription with ID %s", msg.ID) + w.sendError(err) return } @@ -436,6 +468,13 @@ func (w *WebSocketBroker) listOfSubscriptions() { w.broadcastMessage(response) } +// Close channels and clear subscriptions on error +func (w *WebSocketBroker) cleanupAfterError() { + close(w.errChannel) + close(w.broadcastChannel) + w.clearSubscriptions() +} + // clearSubscriptions closes each SubscriptionHandler in the subs map and // removes all entries from the map. func (w *WebSocketBroker) clearSubscriptions() { diff --git a/engine/access/rest/routes/ws_broker_handler.go b/engine/access/rest/routes/ws_broker_handler.go index 6307728d918..c26fb032c74 100644 --- a/engine/access/rest/routes/ws_broker_handler.go +++ b/engine/access/rest/routes/ws_broker_handler.go @@ -5,6 +5,7 @@ import ( "github.com/gorilla/websocket" "github.com/rs/zerolog" + "go.uber.org/atomic" "github.com/onflow/flow-go/engine/access/rest/models" "github.com/onflow/flow-go/engine/access/rest/routes/subscription_handlers" @@ -76,12 +77,19 @@ func (h *WSBrokerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } defer conn.Close() - //TODO: fill LimitsConfiguration - wsBroker := NewWebSocketBroker(logger, conn, LimitsConfiguration{}, h.subHandlerFactory) - err = wsBroker.SetConnectionConfig() - //TODO : + wsBroker := NewWebSocketBroker( + logger, + conn, + //TODO: fill all limits + LimitsConfiguration{ + activeResponsesPerSecond: atomic.NewUint64(0), + activeSubscriptions: atomic.NewUint64(0), + }, + h.subHandlerFactory, + ) + err = wsBroker.configureConnection() if err != nil { - // TODO: handle error + wsBroker.handleWSError(err) return } From b2781d211400a511a93f49f4e1cc2ae2038ad30d Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv Date: Tue, 29 Oct 2024 10:40:53 +0200 Subject: [PATCH 15/22] Updated error message and convertion to ws errors --- engine/access/rest/routes/websocket_broker.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/engine/access/rest/routes/websocket_broker.go b/engine/access/rest/routes/websocket_broker.go index 106134707c5..c5b93b8f81b 100644 --- a/engine/access/rest/routes/websocket_broker.go +++ b/engine/access/rest/routes/websocket_broker.go @@ -156,6 +156,9 @@ func (w *WebSocketBroker) resolveWebSocketError(err error) (int, string) { if errors.As(err, &statusErr) { wsMsg := statusErr.UserMessage() + if statusErr.Status() == http.StatusBadRequest { + return websocket.CloseUnsupportedData, wsMsg + } if statusErr.Status() == http.StatusServiceUnavailable { return websocket.CloseTryAgainLater, wsMsg } @@ -225,7 +228,7 @@ func (w *WebSocketBroker) readMessages() { func (w *WebSocketBroker) processMessage(message []byte) error { var baseMsg BaseMessageRequest if err := json.Unmarshal(message, &baseMsg); err != nil { - return models.NewRestError(http.StatusBadRequest, "invalid message structure", err) + return models.NewRestError(http.StatusBadRequest, "invalid message structure: 'action' is required", err) } switch baseMsg.Action { case SubscribeAction: From 855146b2344da7d25615ba17cce7e86e04ebc298 Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv Date: Tue, 29 Oct 2024 11:24:06 +0200 Subject: [PATCH 16/22] Updated ws handler --- engine/access/rest/routes/ws_broker_handler.go | 1 - 1 file changed, 1 deletion(-) diff --git a/engine/access/rest/routes/ws_broker_handler.go b/engine/access/rest/routes/ws_broker_handler.go index c26fb032c74..bc618a4dc7e 100644 --- a/engine/access/rest/routes/ws_broker_handler.go +++ b/engine/access/rest/routes/ws_broker_handler.go @@ -94,6 +94,5 @@ func (h *WSBrokerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } go wsBroker.readMessages() - go wsBroker.pingPongHandler() wsBroker.writeMessages() } From 218c6ed7c15e229452221b407aded8840398e057 Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv Date: Tue, 29 Oct 2024 15:30:27 +0200 Subject: [PATCH 17/22] Updated error handling --- engine/access/rest/routes/websocket_broker.go | 85 ++++++++++--------- 1 file changed, 44 insertions(+), 41 deletions(-) diff --git a/engine/access/rest/routes/websocket_broker.go b/engine/access/rest/routes/websocket_broker.go index c5b93b8f81b..b98fd5bd6c9 100644 --- a/engine/access/rest/routes/websocket_broker.go +++ b/engine/access/rest/routes/websocket_broker.go @@ -28,8 +28,9 @@ type BaseMessageRequest struct { } type BaseMessageResponse struct { - Action string `json:"action"` - Success bool `json:"success"` + Action string `json:"action,omitempty"` + Success bool `json:"success"` + ErrorMessage string `json:"error_message,omitempty"` } type SubscribeMessageRequest struct { @@ -66,15 +67,15 @@ type ListSubscriptionsMessageRequest struct { // SubscriptionEntry represents an active subscription entry with a specific topic and unique identifier. type SubscriptionEntry struct { - Topic string `json:"topic"` - ID string `json:"id"` + Topic string `json:"topic,omitempty"` + ID string `json:"id,omitempty"` } // ListSubscriptionsMessageResponse is the structure used to respond to list_subscriptions requests. // It contains a list of active subscriptions for the current WebSocket connection. type ListSubscriptionsMessageResponse struct { BaseMessageResponse - Subscriptions []SubscriptionEntry `json:"subscriptions"` + Subscriptions []*SubscriptionEntry `json:"subscriptions,omitempty"` } type LimitsConfiguration struct { @@ -155,13 +156,6 @@ func (w *WebSocketBroker) resolveWebSocketError(err error) (int, string) { if errors.As(err, &statusErr) { wsMsg := statusErr.UserMessage() - - if statusErr.Status() == http.StatusBadRequest { - return websocket.CloseUnsupportedData, wsMsg - } - if statusErr.Status() == http.StatusServiceUnavailable { - return websocket.CloseTryAgainLater, wsMsg - } if statusErr.Status() == http.StatusRequestTimeout { return websocket.CloseGoingAway, wsMsg } @@ -187,14 +181,6 @@ func (w *WebSocketBroker) handleWSError(err error) { w.cleanupAfterError() } -func (w *WebSocketBroker) sendError(err error) { - // Attempt to send the error message - if err := w.conn.WriteMessage(websocket.TextMessage, []byte(err.Error())); err != nil { - // TODO: check if we need log error of handleWSError here - w.logger.Error().Err(err).Msgf("error sending WebSocket error: %v", err) - } -} - // readMessages runs while the connection is active. It retrieves, validates, and processes client messages. // Actions handled include subscribe, unsubscribe, and list_subscriptions. Additional actions can be added as needed. // It continuously reads messages from the WebSocket connection and closes @@ -210,11 +196,12 @@ func (w *WebSocketBroker) readMessages() { _, message, err := w.conn.ReadMessage() if err != nil { + w.errChannel <- err + //it means that client connection has been terminated, and we need to stop this goroutine if websocket.IsCloseError(err) { return } - w.errChannel <- err continue } @@ -228,7 +215,7 @@ func (w *WebSocketBroker) readMessages() { func (w *WebSocketBroker) processMessage(message []byte) error { var baseMsg BaseMessageRequest if err := json.Unmarshal(message, &baseMsg); err != nil { - return models.NewRestError(http.StatusBadRequest, "invalid message structure: 'action' is required", err) + return fmt.Errorf("invalid message structure: 'action' is required: %w", err) } switch baseMsg.Action { case SubscribeAction: @@ -238,20 +225,18 @@ func (w *WebSocketBroker) processMessage(message []byte) error { case ListSubscriptionsAction: return w.handleListSubscriptionsRequest(message) default: - err := fmt.Errorf("unknown action type: %s", baseMsg.Action) - return models.NewRestError(http.StatusBadRequest, err.Error(), err) + return fmt.Errorf("unknown action type: %s", baseMsg.Action) } } func (w *WebSocketBroker) handleSubscribeRequest(message []byte) error { var subscribeMsg SubscribeMessageRequest if err := json.Unmarshal(message, &subscribeMsg); err != nil { - return models.NewRestError(http.StatusBadRequest, "failed to parse 'subscribe' message", err) + return fmt.Errorf("failed to parse 'subscribe' message: %w", err) } if w.limitsConfiguration.activeSubscriptions.Load() >= w.limitsConfiguration.maxSubscriptions { - return models.NewRestError(http.StatusServiceUnavailable, "max subscriptions reached", nil) - + return fmt.Errorf("max subscriptions reached, max subscriptions count: %d", w.limitsConfiguration.maxSubscriptions) } w.limitsConfiguration.activeSubscriptions.Add(1) w.subscribe(&subscribeMsg) @@ -262,7 +247,7 @@ func (w *WebSocketBroker) handleSubscribeRequest(message []byte) error { func (w *WebSocketBroker) handleUnsubscribeRequest(message []byte) error { var unsubscribeMsg UnsubscribeMessageRequest if err := json.Unmarshal(message, &unsubscribeMsg); err != nil { - return models.NewRestError(http.StatusBadRequest, "failed to parse 'unsubscribe' message", err) + return fmt.Errorf("failed to parse 'unsubscribe' message: %w", err) } w.limitsConfiguration.activeSubscriptions.Add(-1) @@ -274,8 +259,9 @@ func (w *WebSocketBroker) handleUnsubscribeRequest(message []byte) error { func (w *WebSocketBroker) handleListSubscriptionsRequest(message []byte) error { var listSubscriptionsMsg ListSubscriptionsMessageRequest if err := json.Unmarshal(message, &listSubscriptionsMsg); err != nil { - return models.NewRestError(http.StatusBadRequest, "failed to parse 'list_subscriptions' message", err) + return fmt.Errorf("failed to parse 'list_subscriptions' message: %w", err) } + w.listOfSubscriptions() return nil @@ -284,8 +270,8 @@ func (w *WebSocketBroker) handleListSubscriptionsRequest(message []byte) error { // writeMessages runs while the connection is active, listening on the broadcast channel. // It retrieves responses and sends them to the client. func (w *WebSocketBroker) writeMessages() { - ticker := time.NewTicker(pingPeriod) - defer ticker.Stop() + pingTicker := time.NewTicker(pingPeriod) + defer pingTicker.Stop() for { select { @@ -299,7 +285,10 @@ func (w *WebSocketBroker) writeMessages() { } // 2) as error receiver for any errors that occur during the reading process - w.sendError(err) + w.sendData(BaseMessageResponse{ + Success: false, + ErrorMessage: err.Error(), + }) case data, ok := <-w.broadcastChannel: if !ok { err := fmt.Errorf("broadcast channel closed, no error occurred") @@ -310,7 +299,7 @@ func (w *WebSocketBroker) writeMessages() { if err := w.sendData(data); err != nil { return } - case <-ticker.C: + case <-pingTicker.C: if err := w.sendPing(); err != nil { return } @@ -379,7 +368,13 @@ func (w *WebSocketBroker) subscribe(msg *SubscribeMessageRequest) { subHandler, err := w.subHandlerFactory.CreateSubscriptionHandler(msg.Topic, msg.Arguments, w.broadcastMessage) if err != nil { w.logger.Err(err).Msg("Subscription handler creation failed") - w.sendError(err) + + err = fmt.Errorf("subscription handler creation failed: %w", err) + w.sendData(BaseMessageResponse{ + Action: SubscribeAction, + Success: false, + ErrorMessage: err.Error(), + }) return } @@ -411,9 +406,13 @@ func (w *WebSocketBroker) subscribe(msg *SubscribeMessageRequest) { func (w *WebSocketBroker) unsubscribe(msg *UnsubscribeMessageRequest) { sub, found := w.subs[msg.ID] if !found { - w.logger.Info().Msg(fmt.Sprintf("No subscription found for ID %s", msg.ID)) - // TODO: update error - w.sendError(fmt.Errorf("no subscription found for ID: %s", msg.ID)) + errMsg := fmt.Sprintf("No subscription found for ID %s", msg.ID) + w.logger.Info().Msg(errMsg) + w.sendData(BaseMessageResponse{ + Action: UnsubscribeAction, + Success: false, + ErrorMessage: errMsg, + }) return } @@ -427,9 +426,13 @@ func (w *WebSocketBroker) unsubscribe(msg *UnsubscribeMessageRequest) { } if err := sub.Close(); err != nil { - response.Success = false w.logger.Err(err).Msgf("Failed to close subscription with ID %s", msg.ID) - w.sendError(err) + err := fmt.Errorf("failed to close subscription with ID %s: %w", msg.ID, err) + w.sendData(BaseMessageResponse{ + Action: UnsubscribeAction, + Success: false, + ErrorMessage: err.Error(), + }) return } @@ -458,11 +461,11 @@ func (w *WebSocketBroker) listOfSubscriptions() { Action: ListSubscriptionsAction, Success: true, }, - Subscriptions: make([]SubscriptionEntry, 0, len(w.subs)), + Subscriptions: make([]*SubscriptionEntry, 0, len(w.subs)), } for id, sub := range w.subs { - response.Subscriptions = append(response.Subscriptions, SubscriptionEntry{ + response.Subscriptions = append(response.Subscriptions, &SubscriptionEntry{ Topic: sub.Topic(), ID: id, }) From 8562e893d94a1aa518c82db1c85caf2434ba3764 Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv Date: Tue, 29 Oct 2024 19:40:53 +0200 Subject: [PATCH 18/22] Added WebsocketConfig and new flags to configure websocket subscriptions --- .../node_builder/access_node_builder.go | 26 +++- cmd/observer/node_builder/observer_builder.go | 26 +++- cmd/util/cmd/run-script/cmd.go | 2 + .../access/handle_irrecoverable_state_test.go | 1 + .../integration_unsecure_grpc_server_test.go | 2 + engine/access/rest/routes/router.go | 3 +- engine/access/rest/routes/websocket_broker.go | 131 ++++++++---------- .../access/rest/routes/ws_broker_handler.go | 10 +- engine/access/rest/server.go | 3 +- engine/access/rest_api_test.go | 1 + engine/access/rpc/engine.go | 16 ++- engine/access/rpc/rate_limit_test.go | 2 + engine/access/secure_grpcr_test.go | 2 + 13 files changed, 143 insertions(+), 82 deletions(-) diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index c5c415f1357..67b20df22b9 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -146,6 +146,7 @@ type AccessNodeConfig struct { apiBurstlimits map[string]int rpcConf rpc.Config stateStreamConf statestreambackend.Config + wsConfig routes.WebsocketConfig stateStreamFilterConf map[string]int ExecutionNodeAddress string // deprecated HistoricalAccessRPCs []access.AccessAPIClient @@ -238,7 +239,12 @@ func DefaultAccessNodeConfig() *AccessNodeConfig { ResponseLimit: subscription.DefaultResponseLimit, HeartbeatInterval: subscription.DefaultHeartbeatInterval, }, - stateStreamFilterConf: nil, + stateStreamFilterConf: nil, + wsConfig: routes.WebsocketConfig{ + MaxSubscriptionsPerConnection: routes.DefaultMaxSubscriptionsPerConnection, + MaxResponsesPerSecond: routes.DefaultMaxResponsesPerSecond, + SendMessageTimeout: routes.DefaultSendMessageTimeout, + }, ExecutionNodeAddress: "localhost:9000", logTxTimeToFinalized: false, logTxTimeToExecuted: false, @@ -1436,6 +1442,23 @@ func (builder *FlowAccessNodeBuilder) extraFlags() { "registerdb-pruning-threshold", defaultConfig.registerDBPruneThreshold, fmt.Sprintf("specifies the number of blocks below the latest stored block height to keep in register db. default: %d", defaultConfig.registerDBPruneThreshold)) + + // Websocket subscriptions + flags.Uint64Var(&builder.wsConfig.MaxSubscriptionsPerConnection, + "websocket-max-subscriptions-per-connection", + defaultConfig.wsConfig.MaxSubscriptionsPerConnection, + fmt.Sprintf("maximum number of subscriptions per connection for websocket subscriptions. Default: %d", builder.wsConfig.MaxSubscriptionsPerConnection)) + + flags.Uint64Var(&builder.wsConfig.MaxResponsesPerSecond, + "websocket-max-responses-per-second", + defaultConfig.wsConfig.MaxResponsesPerSecond, + fmt.Sprintf("maximum number of responses per second for websocket subscriptions. Default: %d", builder.wsConfig.MaxResponsesPerSecond)) + + flags.DurationVar(&builder.wsConfig.SendMessageTimeout, + "websocket-send-message-timeout", + defaultConfig.wsConfig.SendMessageTimeout, + fmt.Sprintf("timeout value for send messages for websocket subscriptions. Default: %v", defaultConfig.wsConfig.SendMessageTimeout)) + }).ValidateFlags(func() error { if builder.supportsObserver && (builder.PublicNetworkConfig.BindAddress == cmd.NotSet || builder.PublicNetworkConfig.BindAddress == "") { return errors.New("public-network-address must be set if supports-observer is true") @@ -2015,6 +2038,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { builder.unsecureGrpcServer, builder.stateStreamBackend, builder.stateStreamConf, + builder.wsConfig, indexReporter, ) if err != nil { diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index 17d724758cf..b65efae7b55 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -139,6 +139,7 @@ type ObserverServiceConfig struct { checkpointFile string apiTimeout time.Duration stateStreamConf statestreambackend.Config + wsConfig routes.WebsocketConfig stateStreamFilterConf map[string]int upstreamNodeAddresses []string upstreamNodePublicKeys []string @@ -208,7 +209,12 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig { HeartbeatInterval: subscription.DefaultHeartbeatInterval, RegisterIDsRequestLimit: state_stream.DefaultRegisterIDsRequestLimit, }, - stateStreamFilterConf: nil, + stateStreamFilterConf: nil, + wsConfig: routes.WebsocketConfig{ + MaxSubscriptionsPerConnection: routes.DefaultMaxSubscriptionsPerConnection, + MaxResponsesPerSecond: routes.DefaultMaxResponsesPerSecond, + SendMessageTimeout: routes.DefaultSendMessageTimeout, + }, rpcMetricsEnabled: false, apiRatelimits: nil, apiBurstlimits: nil, @@ -798,6 +804,23 @@ func (builder *ObserverServiceBuilder) extraFlags() { "registerdb-pruning-threshold", defaultConfig.registerDBPruneThreshold, fmt.Sprintf("specifies the number of blocks below the latest stored block height to keep in register db. default: %d", defaultConfig.registerDBPruneThreshold)) + + // Websocket subscriptions + flags.Uint64Var(&builder.wsConfig.MaxSubscriptionsPerConnection, + "websocket-max-subscriptions-per-connection", + defaultConfig.wsConfig.MaxSubscriptionsPerConnection, + fmt.Sprintf("maximum number of subscriptions per connection for websocket subscriptions. Default: %d", builder.wsConfig.MaxSubscriptionsPerConnection)) + + flags.Uint64Var(&builder.wsConfig.MaxResponsesPerSecond, + "websocket-max-responses-per-second", + defaultConfig.wsConfig.MaxResponsesPerSecond, + fmt.Sprintf("maximum number of responses per second for websocket subscriptions. Default: %d", builder.wsConfig.MaxResponsesPerSecond)) + + flags.DurationVar(&builder.wsConfig.SendMessageTimeout, + "websocket-send-message-timeout", + defaultConfig.wsConfig.SendMessageTimeout, + fmt.Sprintf("timeout value for send messages for websocket subscriptions. Default: %v", defaultConfig.wsConfig.SendMessageTimeout)) + }).ValidateFlags(func() error { if builder.executionDataSyncEnabled { if builder.executionDataConfig.FetchTimeout <= 0 { @@ -1931,6 +1954,7 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { builder.unsecureGrpcServer, builder.stateStreamBackend, builder.stateStreamConf, + builder.wsConfig, indexReporter, ) if err != nil { diff --git a/cmd/util/cmd/run-script/cmd.go b/cmd/util/cmd/run-script/cmd.go index 1f24d2599c2..23d20fe7878 100644 --- a/cmd/util/cmd/run-script/cmd.go +++ b/cmd/util/cmd/run-script/cmd.go @@ -16,6 +16,7 @@ import ( "github.com/onflow/flow-go/cmd/util/ledger/util" "github.com/onflow/flow-go/cmd/util/ledger/util/registers" "github.com/onflow/flow-go/engine/access/rest" + "github.com/onflow/flow-go/engine/access/rest/routes" "github.com/onflow/flow-go/engine/access/state_stream/backend" "github.com/onflow/flow-go/engine/access/subscription" "github.com/onflow/flow-go/engine/execution/computation" @@ -169,6 +170,7 @@ func run(*cobra.Command, []string) { metrics.NewNoopCollector(), nil, backend.Config{}, + routes.WebsocketConfig{}, ) if err != nil { log.Fatal().Err(err).Msg("failed to create server") diff --git a/engine/access/handle_irrecoverable_state_test.go b/engine/access/handle_irrecoverable_state_test.go index 911ba5c2a53..934a4305b70 100644 --- a/engine/access/handle_irrecoverable_state_test.go +++ b/engine/access/handle_irrecoverable_state_test.go @@ -171,6 +171,7 @@ func (suite *IrrecoverableStateTestSuite) SetupTest() { suite.unsecureGrpcServer, nil, stateStreamConfig, + routes.WebsocketConfig{}, nil, ) assert.NoError(suite.T(), err) diff --git a/engine/access/integration_unsecure_grpc_server_test.go b/engine/access/integration_unsecure_grpc_server_test.go index f99805687ba..fe490b62194 100644 --- a/engine/access/integration_unsecure_grpc_server_test.go +++ b/engine/access/integration_unsecure_grpc_server_test.go @@ -21,6 +21,7 @@ import ( "github.com/onflow/flow-go/engine" "github.com/onflow/flow-go/engine/access/index" accessmock "github.com/onflow/flow-go/engine/access/mock" + "github.com/onflow/flow-go/engine/access/rest/routes" "github.com/onflow/flow-go/engine/access/rpc" "github.com/onflow/flow-go/engine/access/rpc/backend" "github.com/onflow/flow-go/engine/access/state_stream" @@ -215,6 +216,7 @@ func (suite *SameGRPCPortTestSuite) SetupTest() { suite.unsecureGrpcServer, nil, stateStreamConfig, + routes.WebsocketConfig{}, nil, ) assert.NoError(suite.T(), err) diff --git a/engine/access/rest/routes/router.go b/engine/access/rest/routes/router.go index 8cf9097f914..b9f0db14a19 100644 --- a/engine/access/rest/routes/router.go +++ b/engine/access/rest/routes/router.go @@ -82,13 +82,14 @@ func (b *RouterBuilder) AddWsRoutes( // AddPubSubRoute adds WebSocket route for the pub/sub mechanism to the router. func (b *RouterBuilder) AddPubSubRoute( chain flow.Chain, + wsConfig WebsocketConfig, subHandlerFactory *subscription_handlers.SubscriptionHandlerFactory, ) *RouterBuilder { b.v1SubRouter. Methods(http.MethodGet). Path("/ws"). Name("ws"). - Handler(NewWSBrokerHandler(b.logger, chain, subHandlerFactory)) + Handler(NewWSBrokerHandler(b.logger, wsConfig, chain, subHandlerFactory)) return b } diff --git a/engine/access/rest/routes/websocket_broker.go b/engine/access/rest/routes/websocket_broker.go index b98fd5bd6c9..16cc3afc4d3 100644 --- a/engine/access/rest/routes/websocket_broker.go +++ b/engine/access/rest/routes/websocket_broker.go @@ -22,6 +22,15 @@ const ( ListSubscriptionsAction = "list_subscriptions" // Action to list active subscriptions ) +const ( + // DefaultMaxSubscriptionsPerConnection defines the default max number of subscriptions that can be open at the same time. + DefaultMaxSubscriptionsPerConnection = 1000 + + DefaultMaxResponsesPerSecond = 100 + + DefaultSendMessageTimeout = 10 * time.Second +) + // Define a base struct to determine the action type BaseMessageRequest struct { Action string `json:"action"` @@ -78,14 +87,11 @@ type ListSubscriptionsMessageResponse struct { Subscriptions []*SubscriptionEntry `json:"subscriptions,omitempty"` } -type LimitsConfiguration struct { - maxSubscriptions uint64 - activeSubscriptions *atomic.Uint64 +type WebsocketConfig struct { + MaxSubscriptionsPerConnection uint64 + MaxResponsesPerSecond uint64 - maxResponsesPerSecond uint64 - activeResponsesPerSecond *atomic.Uint64 - - sendMessageTimeout time.Duration + SendMessageTimeout time.Duration } type WebSocketBroker struct { @@ -95,24 +101,29 @@ type WebSocketBroker struct { subs map[string]subscription_handlers.SubscriptionHandler // First key is the subscription ID, second key is the topic - limitsConfiguration LimitsConfiguration // Limits on the maximum number of subscriptions per connection, responses per second, and send message timeout. + config WebsocketConfig // Limits on the maximum number of subscriptions per connection, responses per second, and send message timeout. errChannel chan error // Channel to read messages from the client broadcastChannel chan interface{} // Channel to read messages from node subscriptions + + activeSubscriptions *atomic.Uint64 + activeResponsesPerSecond *atomic.Uint64 } func NewWebSocketBroker( logger zerolog.Logger, + config WebsocketConfig, conn *websocket.Conn, - limitsConfiguration LimitsConfiguration, subHandlerFactory *subscription_handlers.SubscriptionHandlerFactory, ) *WebSocketBroker { websocketBroker := &WebSocketBroker{ - logger: logger.With().Str("component", "websocket-broker").Logger(), - conn: conn, - limitsConfiguration: limitsConfiguration, - subHandlerFactory: subHandlerFactory, - subs: make(map[string]subscription_handlers.SubscriptionHandler), + logger: logger.With().Str("component", "websocket-broker").Logger(), + conn: conn, + config: config, + subHandlerFactory: subHandlerFactory, + subs: make(map[string]subscription_handlers.SubscriptionHandler), + activeResponsesPerSecond: atomic.NewUint64(0), + activeSubscriptions: atomic.NewUint64(0), } go websocketBroker.resetResponseLimit() @@ -125,7 +136,7 @@ func (w *WebSocketBroker) resetResponseLimit() { defer ticker.Stop() for range ticker.C { - w.limitsConfiguration.activeResponsesPerSecond.Store(0) // Reset the response count every second + w.activeResponsesPerSecond.Store(0) // Reset the response count every second } } @@ -235,13 +246,12 @@ func (w *WebSocketBroker) handleSubscribeRequest(message []byte) error { return fmt.Errorf("failed to parse 'subscribe' message: %w", err) } - if w.limitsConfiguration.activeSubscriptions.Load() >= w.limitsConfiguration.maxSubscriptions { - return fmt.Errorf("max subscriptions reached, max subscriptions count: %d", w.limitsConfiguration.maxSubscriptions) + if w.activeSubscriptions.Load() >= w.config.MaxSubscriptionsPerConnection { + return fmt.Errorf("max subscriptions reached, max subscriptions per connection count: %d", w.config.MaxSubscriptionsPerConnection) } - w.limitsConfiguration.activeSubscriptions.Add(1) - w.subscribe(&subscribeMsg) + w.activeSubscriptions.Add(1) - return nil + return w.subscribe(&subscribeMsg) } func (w *WebSocketBroker) handleUnsubscribeRequest(message []byte) error { @@ -250,10 +260,8 @@ func (w *WebSocketBroker) handleUnsubscribeRequest(message []byte) error { return fmt.Errorf("failed to parse 'unsubscribe' message: %w", err) } - w.limitsConfiguration.activeSubscriptions.Add(-1) - w.unsubscribe(&unsubscribeMsg) - - return nil + w.activeSubscriptions.Sub(1) + return w.unsubscribe(&unsubscribeMsg) } func (w *WebSocketBroker) handleListSubscriptionsRequest(message []byte) error { @@ -285,10 +293,14 @@ func (w *WebSocketBroker) writeMessages() { } // 2) as error receiver for any errors that occur during the reading process - w.sendData(BaseMessageResponse{ + err = w.sendResponse(BaseMessageResponse{ Success: false, ErrorMessage: err.Error(), + //TODO: add action }) + if err != nil { + return + } case data, ok := <-w.broadcastChannel: if !ok { err := fmt.Errorf("broadcast channel closed, no error occurred") @@ -296,7 +308,8 @@ func (w *WebSocketBroker) writeMessages() { return } - if err := w.sendData(data); err != nil { + if err := w.sendResponse(data); err != nil { + w.handleWSError(err) return } case <-pingTicker.C: @@ -307,20 +320,15 @@ func (w *WebSocketBroker) writeMessages() { } } -// sendData sends a JSON message to the WebSocket client, setting the write deadline. +// sendResponse sends a JSON message to the WebSocket client, setting the write deadline. // Returns an error if the write fails, causing the connection to close. -func (w *WebSocketBroker) sendData(data interface{}) error { +func (w *WebSocketBroker) sendResponse(data interface{}) error { if err := w.conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { w.handleWSError(models.NewRestError(http.StatusInternalServerError, "failed to set write deadline", err)) return err } - if err := w.conn.WriteJSON(data); err != nil { - w.handleWSError(err) - return err - } - - return nil + return w.conn.WriteJSON(data) } // sendPing sends a periodic ping message to the WebSocket client to keep the connection alive. @@ -341,14 +349,14 @@ func (w *WebSocketBroker) sendPing() error { // broadcastMessage is called by each SubscriptionHandler, // receiving formatted subscription messages and writing them to the broadcast channel. func (w *WebSocketBroker) broadcastMessage(data interface{}) { - if w.limitsConfiguration.activeResponsesPerSecond.Load() >= w.limitsConfiguration.maxResponsesPerSecond { + if w.activeResponsesPerSecond.Load() >= w.config.MaxResponsesPerSecond { // TODO: recheck edge cases - time.Sleep(w.limitsConfiguration.sendMessageTimeout) // Adjust the sleep duration as needed + time.Sleep(w.config.SendMessageTimeout) // Adjust the sleep duration as needed } // Send the message to the broadcast channel w.broadcastChannel <- data - w.limitsConfiguration.activeResponsesPerSecond.Add(1) + w.activeResponsesPerSecond.Add(1) } // subscribe processes a request to subscribe to a specific topic. It uses the topic field in @@ -364,18 +372,11 @@ func (w *WebSocketBroker) broadcastMessage(data interface{}) { // "topic": "example_topic", // "id": "sub_id_1" // } -func (w *WebSocketBroker) subscribe(msg *SubscribeMessageRequest) { +func (w *WebSocketBroker) subscribe(msg *SubscribeMessageRequest) error { subHandler, err := w.subHandlerFactory.CreateSubscriptionHandler(msg.Topic, msg.Arguments, w.broadcastMessage) if err != nil { w.logger.Err(err).Msg("Subscription handler creation failed") - - err = fmt.Errorf("subscription handler creation failed: %w", err) - w.sendData(BaseMessageResponse{ - Action: SubscribeAction, - Success: false, - ErrorMessage: err.Error(), - }) - return + return fmt.Errorf("subscription handler creation failed: %w", err) } w.subs[subHandler.ID()] = subHandler @@ -388,6 +389,8 @@ func (w *WebSocketBroker) subscribe(msg *SubscribeMessageRequest) { Topic: subHandler.Topic(), ID: subHandler.ID(), }) + + return nil } // unsubscribe processes a request to cancel an active subscription, identified by its ID. @@ -403,42 +406,30 @@ func (w *WebSocketBroker) subscribe(msg *SubscribeMessageRequest) { // "topic": "example_topic", // "id": "sub_id_1" // } -func (w *WebSocketBroker) unsubscribe(msg *UnsubscribeMessageRequest) { +func (w *WebSocketBroker) unsubscribe(msg *UnsubscribeMessageRequest) error { sub, found := w.subs[msg.ID] if !found { - errMsg := fmt.Sprintf("No subscription found for ID %s", msg.ID) + errMsg := fmt.Sprintf("no subscription found for ID %s", msg.ID) w.logger.Info().Msg(errMsg) - w.sendData(BaseMessageResponse{ - Action: UnsubscribeAction, - Success: false, - ErrorMessage: errMsg, - }) - return + return fmt.Errorf(errMsg) + } + + if err := sub.Close(); err != nil { + w.logger.Err(err).Msgf("Failed to close subscription with ID %s", msg.ID) + return fmt.Errorf("failed to close subscription with ID %s: %w", msg.ID, err) } - response := UnsubscribeMessageResponse{ + delete(w.subs, msg.ID) + w.broadcastMessage(UnsubscribeMessageResponse{ BaseMessageResponse: BaseMessageResponse{ Action: UnsubscribeAction, Success: true, }, Topic: sub.Topic(), ID: sub.ID(), - } - - if err := sub.Close(); err != nil { - w.logger.Err(err).Msgf("Failed to close subscription with ID %s", msg.ID) - err := fmt.Errorf("failed to close subscription with ID %s: %w", msg.ID, err) - w.sendData(BaseMessageResponse{ - Action: UnsubscribeAction, - Success: false, - ErrorMessage: err.Error(), - }) - return - } - - delete(w.subs, msg.ID) + }) - w.broadcastMessage(response) + return nil } // listOfSubscriptions gathers all active subscriptions for the current WebSocket connection, diff --git a/engine/access/rest/routes/ws_broker_handler.go b/engine/access/rest/routes/ws_broker_handler.go index bc618a4dc7e..46f1608fdc7 100644 --- a/engine/access/rest/routes/ws_broker_handler.go +++ b/engine/access/rest/routes/ws_broker_handler.go @@ -5,7 +5,6 @@ import ( "github.com/gorilla/websocket" "github.com/rs/zerolog" - "go.uber.org/atomic" "github.com/onflow/flow-go/engine/access/rest/models" "github.com/onflow/flow-go/engine/access/rest/routes/subscription_handlers" @@ -19,6 +18,7 @@ type WSBrokerHandler struct { *HttpHandler logger zerolog.Logger + config WebsocketConfig subHandlerFactory *subscription_handlers.SubscriptionHandlerFactory } @@ -33,11 +33,13 @@ var _ http.Handler = (*WSBrokerHandler)(nil) // - subHandlerFactory: Factory for creating handlers that manage specific pub-sub subscriptions. func NewWSBrokerHandler( logger zerolog.Logger, + config WebsocketConfig, chain flow.Chain, subHandlerFactory *subscription_handlers.SubscriptionHandlerFactory, ) *WSBrokerHandler { return &WSBrokerHandler{ logger: logger, + config: config, subHandlerFactory: subHandlerFactory, HttpHandler: NewHttpHandler(logger, chain), } @@ -79,12 +81,8 @@ func (h *WSBrokerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { wsBroker := NewWebSocketBroker( logger, + h.config, conn, - //TODO: fill all limits - LimitsConfiguration{ - activeResponsesPerSecond: atomic.NewUint64(0), - activeSubscriptions: atomic.NewUint64(0), - }, h.subHandlerFactory, ) err = wsBroker.configureConnection() diff --git a/engine/access/rest/server.go b/engine/access/rest/server.go index 6b2acc131c1..07ceaf2ce2a 100644 --- a/engine/access/rest/server.go +++ b/engine/access/rest/server.go @@ -42,6 +42,7 @@ func NewServer(serverAPI access.API, restCollector module.RestMetrics, stateStreamApi state_stream.API, stateStreamConfig backend.Config, + wsConfig routes.WebsocketConfig, ) (*http.Server, error) { builder := routes.NewRouterBuilder(logger, restCollector).AddRestRoutes(serverAPI, chain) if stateStreamApi != nil { @@ -53,7 +54,7 @@ func NewServer(serverAPI access.API, stateStreamApi, serverAPI, ) - builder.AddPubSubRoute(chain, subscriptionHandlerFactory) + builder.AddPubSubRoute(chain, wsConfig, subscriptionHandlerFactory) c := cors.New(cors.Options{ AllowedOrigins: []string{"*"}, diff --git a/engine/access/rest_api_test.go b/engine/access/rest_api_test.go index 96c6aadf150..722f831766b 100644 --- a/engine/access/rest_api_test.go +++ b/engine/access/rest_api_test.go @@ -193,6 +193,7 @@ func (suite *RestAPITestSuite) SetupTest() { suite.unsecureGrpcServer, nil, stateStreamConfig, + routes.WebsocketConfig{}, nil, ) assert.NoError(suite.T(), err) diff --git a/engine/access/rpc/engine.go b/engine/access/rpc/engine.go index 145e3d62143..264e14c4c55 100644 --- a/engine/access/rpc/engine.go +++ b/engine/access/rpc/engine.go @@ -14,6 +14,7 @@ import ( "github.com/onflow/flow-go/access" "github.com/onflow/flow-go/consensus/hotstuff/model" "github.com/onflow/flow-go/engine/access/rest" + "github.com/onflow/flow-go/engine/access/rest/routes" "github.com/onflow/flow-go/engine/access/rpc/backend" "github.com/onflow/flow-go/engine/access/state_stream" statestreambackend "github.com/onflow/flow-go/engine/access/state_stream/backend" @@ -71,6 +72,7 @@ type Engine struct { stateStreamBackend state_stream.API stateStreamConfig statestreambackend.Config + wsConfig routes.WebsocketConfig } type Option func(*RPCEngineBuilder) @@ -88,6 +90,7 @@ func NewBuilder(log zerolog.Logger, unsecureGrpcServer *grpcserver.GrpcServer, stateStreamBackend state_stream.API, stateStreamConfig statestreambackend.Config, + wsConfig routes.WebsocketConfig, indexReporter state_synchronization.IndexReporter, ) (*RPCEngineBuilder, error) { log = log.With().Str("engine", "rpc").Logger() @@ -114,6 +117,7 @@ func NewBuilder(log zerolog.Logger, restHandler: restHandler, stateStreamBackend: stateStreamBackend, stateStreamConfig: stateStreamConfig, + wsConfig: wsConfig, } backendNotifierActor, backendNotifierWorker := events.NewFinalizationActor(eng.processOnFinalizedBlock) eng.backendNotifierActor = backendNotifierActor @@ -240,8 +244,16 @@ func (e *Engine) serveREST(ctx irrecoverable.SignalerContext, ready component.Re e.log.Info().Str("rest_api_address", e.config.RestConfig.ListenAddress).Msg("starting REST server on address") - r, err := rest.NewServer(e.restHandler, e.config.RestConfig, e.log, e.chain, e.restCollector, e.stateStreamBackend, - e.stateStreamConfig) + r, err := rest.NewServer( + e.restHandler, + e.config.RestConfig, + e.log, + e.chain, + e.restCollector, + e.stateStreamBackend, + e.stateStreamConfig, + e.wsConfig, + ) if err != nil { e.log.Err(err).Msg("failed to initialize the REST server") ctx.Throw(err) diff --git a/engine/access/rpc/rate_limit_test.go b/engine/access/rpc/rate_limit_test.go index 622b06e3f54..9a0dba5f407 100644 --- a/engine/access/rpc/rate_limit_test.go +++ b/engine/access/rpc/rate_limit_test.go @@ -21,6 +21,7 @@ import ( "google.golang.org/grpc/status" accessmock "github.com/onflow/flow-go/engine/access/mock" + "github.com/onflow/flow-go/engine/access/rest/routes" "github.com/onflow/flow-go/engine/access/rpc/backend" statestreambackend "github.com/onflow/flow-go/engine/access/state_stream/backend" "github.com/onflow/flow-go/model/flow" @@ -187,6 +188,7 @@ func (suite *RateLimitTestSuite) SetupTest() { suite.unsecureGrpcServer, nil, stateStreamConfig, + routes.WebsocketConfig{}, nil, ) require.NoError(suite.T(), err) diff --git a/engine/access/secure_grpcr_test.go b/engine/access/secure_grpcr_test.go index cc1d1a75cc8..93bba91e931 100644 --- a/engine/access/secure_grpcr_test.go +++ b/engine/access/secure_grpcr_test.go @@ -19,6 +19,7 @@ import ( "github.com/onflow/crypto" accessmock "github.com/onflow/flow-go/engine/access/mock" + "github.com/onflow/flow-go/engine/access/rest/routes" "github.com/onflow/flow-go/engine/access/rpc" "github.com/onflow/flow-go/engine/access/rpc/backend" statestreambackend "github.com/onflow/flow-go/engine/access/state_stream/backend" @@ -171,6 +172,7 @@ func (suite *SecureGRPCTestSuite) SetupTest() { suite.unsecureGrpcServer, nil, stateStreamConfig, + routes.WebsocketConfig{}, nil, ) assert.NoError(suite.T(), err) From d111330f2364dd3753d72359ccc6f62dda282701 Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv Date: Wed, 30 Oct 2024 13:05:31 +0200 Subject: [PATCH 19/22] Updated error handling, added comments --- engine/access/rest/routes/websocket_broker.go | 132 +++++++++--------- 1 file changed, 67 insertions(+), 65 deletions(-) diff --git a/engine/access/rest/routes/websocket_broker.go b/engine/access/rest/routes/websocket_broker.go index 16cc3afc4d3..d5253f3d5c6 100644 --- a/engine/access/rest/routes/websocket_broker.go +++ b/engine/access/rest/routes/websocket_broker.go @@ -17,67 +17,68 @@ import ( // Constants representing action types. const ( + UnknownAction = "unknown" SubscribeAction = "subscribe" // Action for subscription message UnsubscribeAction = "unsubscribe" // Action for unsubscription message ListSubscriptionsAction = "list_subscriptions" // Action to list active subscriptions ) const ( - // DefaultMaxSubscriptionsPerConnection defines the default max number of subscriptions that can be open at the same time. - DefaultMaxSubscriptionsPerConnection = 1000 - - DefaultMaxResponsesPerSecond = 100 - - DefaultSendMessageTimeout = 10 * time.Second + DefaultMaxSubscriptionsPerConnection = 1000 // Default maximum subscriptions per connection + DefaultMaxResponsesPerSecond = 100 // Default maximum responses per second + DefaultSendMessageTimeout = 10 * time.Second // Default timeout for sending messages ) -// Define a base struct to determine the action +// BaseMessageRequest represents a base structure for incoming messages. type BaseMessageRequest struct { - Action string `json:"action"` + Action string `json:"action"` // Action type of the request } +// BaseMessageResponse represents a base structure for outgoing messages. type BaseMessageResponse struct { - Action string `json:"action,omitempty"` - Success bool `json:"success"` - ErrorMessage string `json:"error_message,omitempty"` + Action string `json:"action,omitempty"` // Action type of the response + Success bool `json:"success"` // Indicates success or failure + ErrorMessage string `json:"error_message,omitempty"` // Error message, if any + } +// SubscribeMessageRequest represents a request to subscribe to a topic. type SubscribeMessageRequest struct { BaseMessageRequest - Topic string `json:"topic"` - Arguments map[string]interface{} `json:"arguments"` + Topic string `json:"topic"` // Topic to subscribe to + Arguments map[string]interface{} `json:"arguments"` // Additional arguments for subscription } -// SubscribeMessageResponse represents the response to a subscription message. -// It includes the topic and a unique subscription ID. +// SubscribeMessageResponse represents the response to a subscription request. type SubscribeMessageResponse struct { BaseMessageResponse - Topic string `json:"topic"` - ID string `json:"id"` + Topic string `json:"topic"` // Topic of the subscription + ID string `json:"id"` // Unique subscription ID } +// UnsubscribeMessageRequest represents a request to unsubscribe from a topic. type UnsubscribeMessageRequest struct { BaseMessageRequest - Topic string `json:"topic"` - ID string `json:"id"` + Topic string `json:"topic"` // Topic to unsubscribe from + ID string `json:"id"` // Unique subscription ID } -// UnsubscribeMessageResponse represents the response to an unsubscription message. -// It includes the topic and subscription ID for confirmation. +// UnsubscribeMessageResponse represents the response to an unsubscription request. type UnsubscribeMessageResponse struct { BaseMessageResponse - Topic string `json:"topic"` - ID string `json:"id"` + Topic string `json:"topic"` // Topic of the unsubscription + ID string `json:"id"` // Unique subscription ID } +// ListSubscriptionsMessageRequest represents a request to list active subscriptions. type ListSubscriptionsMessageRequest struct { BaseMessageRequest } -// SubscriptionEntry represents an active subscription entry with a specific topic and unique identifier. +// SubscriptionEntry represents an active subscription entry. type SubscriptionEntry struct { - Topic string `json:"topic,omitempty"` - ID string `json:"id,omitempty"` + Topic string `json:"topic,omitempty"` // Topic of the subscription + ID string `json:"id,omitempty"` // Unique subscription ID } // ListSubscriptionsMessageResponse is the structure used to respond to list_subscriptions requests. @@ -87,6 +88,7 @@ type ListSubscriptionsMessageResponse struct { Subscriptions []*SubscriptionEntry `json:"subscriptions,omitempty"` } +// WebsocketConfig holds configuration for the WebSocketBroker connection. type WebsocketConfig struct { MaxSubscriptionsPerConnection uint64 MaxResponsesPerSecond uint64 @@ -101,15 +103,16 @@ type WebSocketBroker struct { subs map[string]subscription_handlers.SubscriptionHandler // First key is the subscription ID, second key is the topic - config WebsocketConfig // Limits on the maximum number of subscriptions per connection, responses per second, and send message timeout. + config WebsocketConfig // Configuration for the WebSocket broker - errChannel chan error // Channel to read messages from the client - broadcastChannel chan interface{} // Channel to read messages from node subscriptions + errChannel chan error // Channel for error messages + broadcastChannel chan interface{} // Channel for broadcast messages - activeSubscriptions *atomic.Uint64 - activeResponsesPerSecond *atomic.Uint64 + activeSubscriptions *atomic.Uint64 // Count of active subscriptions + activeResponsesPerSecond *atomic.Uint64 // Count of responses per second } +// NewWebSocketBroker initializes a new WebSocketBroker instance. func NewWebSocketBroker( logger zerolog.Logger, config WebsocketConfig, @@ -183,8 +186,7 @@ func (w *WebSocketBroker) handleWSError(err error) { wsCode, wsMsg := w.resolveWebSocketError(err) // Send the close message to the client - closeMessage := websocket.FormatCloseMessage(wsCode, wsMsg) - err = w.conn.WriteControl(websocket.CloseMessage, closeMessage, time.Now().Add(time.Second)) + err = w.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(wsCode, wsMsg), time.Now().Add(time.Second)) if err != nil { w.logger.Error().Err(err).Msgf("error sending WebSocket CloseMessage error: %v", err) } @@ -201,43 +203,47 @@ func (w *WebSocketBroker) handleWSError(err error) { // to handle incoming messages asynchronously. func (w *WebSocketBroker) readMessages() { for { - // reads messages from the WebSocket connection when - // 1) the connection is closed by client - // 2) unexpected message is received from the client - _, message, err := w.conn.ReadMessage() if err != nil { - w.errChannel <- err - - //it means that client connection has been terminated, and we need to stop this goroutine if websocket.IsCloseError(err) { - return + w.logger.Info().Msg("connection closed by client") } - continue + // Send the error to the error channel for handling in writeMessages + w.errChannel <- err + return } + // Process the incoming message if err := w.processMessage(message); err != nil { - w.errChannel <- err + // Send structured error response on failure + w.logger.Err(err).Msg("failed to send error message response") } } } -// Process message based on action type +// processMessage processes incoming WebSocket messages based on their action type. func (w *WebSocketBroker) processMessage(message []byte) error { var baseMsg BaseMessageRequest if err := json.Unmarshal(message, &baseMsg); err != nil { - return fmt.Errorf("invalid message structure: 'action' is required: %w", err) + return w.sendErrorResponse(UnknownAction, fmt.Sprintf("invalid message structure: 'action' is required: %v", err)) } + + var err error switch baseMsg.Action { case SubscribeAction: - return w.handleSubscribeRequest(message) + err = w.handleSubscribeRequest(message) case UnsubscribeAction: - return w.handleUnsubscribeRequest(message) + err = w.handleUnsubscribeRequest(message) case ListSubscriptionsAction: - return w.handleListSubscriptionsRequest(message) + err = w.handleListSubscriptionsRequest(message) default: - return fmt.Errorf("unknown action type: %s", baseMsg.Action) + err = fmt.Errorf("unsupported action type: %s", baseMsg.Action) + } + if err != nil { + return w.sendErrorResponse(baseMsg.Action, err.Error()) } + + return nil } func (w *WebSocketBroker) handleSubscribeRequest(message []byte) error { @@ -284,23 +290,10 @@ func (w *WebSocketBroker) writeMessages() { for { select { case err := <-w.errChannel: - // we use errChannel - // 1) as indicator of client's status, when errChannel closes it means that client + // we use errChannel as indicator of client's status, when errChannel closes it means that client // connection has been terminated, and we need to stop this goroutine to avoid memory leak. - if websocket.IsCloseError(err) { - w.handleWSError(err) - return - } - - // 2) as error receiver for any errors that occur during the reading process - err = w.sendResponse(BaseMessageResponse{ - Success: false, - ErrorMessage: err.Error(), - //TODO: add action - }) - if err != nil { - return - } + w.handleWSError(err) + return case data, ok := <-w.broadcastChannel: if !ok { err := fmt.Errorf("broadcast channel closed, no error occurred") @@ -331,6 +324,15 @@ func (w *WebSocketBroker) sendResponse(data interface{}) error { return w.conn.WriteJSON(data) } +// Helper to send structured error responses +func (w *WebSocketBroker) sendErrorResponse(action, errMsg string) error { + return w.sendResponse(BaseMessageResponse{ + Action: action, + Success: false, + ErrorMessage: errMsg, + }) +} + // sendPing sends a periodic ping message to the WebSocket client to keep the connection alive. func (w *WebSocketBroker) sendPing() error { if err := w.conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { From 51babfbcdecbbd25edabc9f3c96f6ba3743c2426 Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv Date: Wed, 30 Oct 2024 13:08:07 +0200 Subject: [PATCH 20/22] Updated comment --- engine/access/rest/routes/websocket_broker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/access/rest/routes/websocket_broker.go b/engine/access/rest/routes/websocket_broker.go index d5253f3d5c6..7ccad4ed63d 100644 --- a/engine/access/rest/routes/websocket_broker.go +++ b/engine/access/rest/routes/websocket_broker.go @@ -215,7 +215,7 @@ func (w *WebSocketBroker) readMessages() { // Process the incoming message if err := w.processMessage(message); err != nil { - // Send structured error response on failure + // Log error for sending structured error response on failure w.logger.Err(err).Msg("failed to send error message response") } } From 3bec34426850622e5f592b43e39f6617cdae710c Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv Date: Wed, 30 Oct 2024 18:37:26 +0200 Subject: [PATCH 21/22] Separated websocket models --- engine/access/rest/routes/websocket_broker.go | 59 ------------------- engine/access/rest/routes/websocket_models.go | 59 +++++++++++++++++++ 2 files changed, 59 insertions(+), 59 deletions(-) create mode 100644 engine/access/rest/routes/websocket_models.go diff --git a/engine/access/rest/routes/websocket_broker.go b/engine/access/rest/routes/websocket_broker.go index 7ccad4ed63d..22c09e06236 100644 --- a/engine/access/rest/routes/websocket_broker.go +++ b/engine/access/rest/routes/websocket_broker.go @@ -29,65 +29,6 @@ const ( DefaultSendMessageTimeout = 10 * time.Second // Default timeout for sending messages ) -// BaseMessageRequest represents a base structure for incoming messages. -type BaseMessageRequest struct { - Action string `json:"action"` // Action type of the request -} - -// BaseMessageResponse represents a base structure for outgoing messages. -type BaseMessageResponse struct { - Action string `json:"action,omitempty"` // Action type of the response - Success bool `json:"success"` // Indicates success or failure - ErrorMessage string `json:"error_message,omitempty"` // Error message, if any - -} - -// SubscribeMessageRequest represents a request to subscribe to a topic. -type SubscribeMessageRequest struct { - BaseMessageRequest - Topic string `json:"topic"` // Topic to subscribe to - Arguments map[string]interface{} `json:"arguments"` // Additional arguments for subscription -} - -// SubscribeMessageResponse represents the response to a subscription request. -type SubscribeMessageResponse struct { - BaseMessageResponse - Topic string `json:"topic"` // Topic of the subscription - ID string `json:"id"` // Unique subscription ID -} - -// UnsubscribeMessageRequest represents a request to unsubscribe from a topic. -type UnsubscribeMessageRequest struct { - BaseMessageRequest - Topic string `json:"topic"` // Topic to unsubscribe from - ID string `json:"id"` // Unique subscription ID -} - -// UnsubscribeMessageResponse represents the response to an unsubscription request. -type UnsubscribeMessageResponse struct { - BaseMessageResponse - Topic string `json:"topic"` // Topic of the unsubscription - ID string `json:"id"` // Unique subscription ID -} - -// ListSubscriptionsMessageRequest represents a request to list active subscriptions. -type ListSubscriptionsMessageRequest struct { - BaseMessageRequest -} - -// SubscriptionEntry represents an active subscription entry. -type SubscriptionEntry struct { - Topic string `json:"topic,omitempty"` // Topic of the subscription - ID string `json:"id,omitempty"` // Unique subscription ID -} - -// ListSubscriptionsMessageResponse is the structure used to respond to list_subscriptions requests. -// It contains a list of active subscriptions for the current WebSocket connection. -type ListSubscriptionsMessageResponse struct { - BaseMessageResponse - Subscriptions []*SubscriptionEntry `json:"subscriptions,omitempty"` -} - // WebsocketConfig holds configuration for the WebSocketBroker connection. type WebsocketConfig struct { MaxSubscriptionsPerConnection uint64 diff --git a/engine/access/rest/routes/websocket_models.go b/engine/access/rest/routes/websocket_models.go new file mode 100644 index 00000000000..3193d95c470 --- /dev/null +++ b/engine/access/rest/routes/websocket_models.go @@ -0,0 +1,59 @@ +package routes + +// BaseMessageRequest represents a base structure for incoming messages. +type BaseMessageRequest struct { + Action string `json:"action"` // Action type of the request +} + +// BaseMessageResponse represents a base structure for outgoing messages. +type BaseMessageResponse struct { + Action string `json:"action,omitempty"` // Action type of the response + Success bool `json:"success"` // Indicates success or failure + ErrorMessage string `json:"error_message,omitempty"` // Error message, if any +} + +// SubscribeMessageRequest represents a request to subscribe to a topic. +type SubscribeMessageRequest struct { + BaseMessageRequest + Topic string `json:"topic"` // Topic to subscribe to + Arguments map[string]interface{} `json:"arguments"` // Additional arguments for subscription +} + +// SubscribeMessageResponse represents the response to a subscription request. +type SubscribeMessageResponse struct { + BaseMessageResponse + Topic string `json:"topic"` // Topic of the subscription + ID string `json:"id"` // Unique subscription ID +} + +// UnsubscribeMessageRequest represents a request to unsubscribe from a topic. +type UnsubscribeMessageRequest struct { + BaseMessageRequest + Topic string `json:"topic"` // Topic to unsubscribe from + ID string `json:"id"` // Unique subscription ID +} + +// UnsubscribeMessageResponse represents the response to an unsubscription request. +type UnsubscribeMessageResponse struct { + BaseMessageResponse + Topic string `json:"topic"` // Topic of the unsubscription + ID string `json:"id"` // Unique subscription ID +} + +// ListSubscriptionsMessageRequest represents a request to list active subscriptions. +type ListSubscriptionsMessageRequest struct { + BaseMessageRequest +} + +// SubscriptionEntry represents an active subscription entry. +type SubscriptionEntry struct { + Topic string `json:"topic,omitempty"` // Topic of the subscription + ID string `json:"id,omitempty"` // Unique subscription ID +} + +// ListSubscriptionsMessageResponse is the structure used to respond to list_subscriptions requests. +// It contains a list of active subscriptions for the current WebSocket connection. +type ListSubscriptionsMessageResponse struct { + BaseMessageResponse + Subscriptions []*SubscriptionEntry `json:"subscriptions,omitempty"` +} From 173f14150bc827cf9dcea598f9ae7b5269c09eb2 Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv Date: Fri, 1 Nov 2024 11:13:50 +0200 Subject: [PATCH 22/22] Removed basic counter for responses per second --- engine/access/rest/routes/websocket_broker.go | 33 +++++-------------- 1 file changed, 8 insertions(+), 25 deletions(-) diff --git a/engine/access/rest/routes/websocket_broker.go b/engine/access/rest/routes/websocket_broker.go index 22c09e06236..deabbe0c387 100644 --- a/engine/access/rest/routes/websocket_broker.go +++ b/engine/access/rest/routes/websocket_broker.go @@ -49,8 +49,7 @@ type WebSocketBroker struct { errChannel chan error // Channel for error messages broadcastChannel chan interface{} // Channel for broadcast messages - activeSubscriptions *atomic.Uint64 // Count of active subscriptions - activeResponsesPerSecond *atomic.Uint64 // Count of responses per second + activeSubscriptions *atomic.Uint64 // Count of active subscriptions } // NewWebSocketBroker initializes a new WebSocketBroker instance. @@ -61,29 +60,17 @@ func NewWebSocketBroker( subHandlerFactory *subscription_handlers.SubscriptionHandlerFactory, ) *WebSocketBroker { websocketBroker := &WebSocketBroker{ - logger: logger.With().Str("component", "websocket-broker").Logger(), - conn: conn, - config: config, - subHandlerFactory: subHandlerFactory, - subs: make(map[string]subscription_handlers.SubscriptionHandler), - activeResponsesPerSecond: atomic.NewUint64(0), - activeSubscriptions: atomic.NewUint64(0), + logger: logger.With().Str("component", "websocket-broker").Logger(), + conn: conn, + config: config, + subHandlerFactory: subHandlerFactory, + subs: make(map[string]subscription_handlers.SubscriptionHandler), + activeSubscriptions: atomic.NewUint64(0), } - go websocketBroker.resetResponseLimit() return websocketBroker } -// resetResponseLimit resets the response limit every second. -func (w *WebSocketBroker) resetResponseLimit() { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - - for range ticker.C { - w.activeResponsesPerSecond.Store(0) // Reset the response count every second - } -} - func (w *WebSocketBroker) configureConnection() error { if err := w.conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { // Set the initial write deadline for the first ping message return models.NewRestError(http.StatusInternalServerError, "Set the initial write deadline error: ", err) @@ -292,14 +279,10 @@ func (w *WebSocketBroker) sendPing() error { // broadcastMessage is called by each SubscriptionHandler, // receiving formatted subscription messages and writing them to the broadcast channel. func (w *WebSocketBroker) broadcastMessage(data interface{}) { - if w.activeResponsesPerSecond.Load() >= w.config.MaxResponsesPerSecond { - // TODO: recheck edge cases - time.Sleep(w.config.SendMessageTimeout) // Adjust the sleep duration as needed - } + // TODO: add limitation for responses per second // Send the message to the broadcast channel w.broadcastChannel <- data - w.activeResponsesPerSecond.Add(1) } // subscribe processes a request to subscribe to a specific topic. It uses the topic field in