From 5e48e592851e383c53037c91641127acaa75b290 Mon Sep 17 00:00:00 2001 From: Shubham Date: Wed, 10 Apr 2024 14:25:18 +0530 Subject: [PATCH] Handle unsubscribe of streaming APIs (#188) --- plugin/evm/orderbook/service.go | 6 ++++++ plugin/evm/orderbook/trading_apis.go | 7 +++++++ 2 files changed, 13 insertions(+) diff --git a/plugin/evm/orderbook/service.go b/plugin/evm/orderbook/service.go index 4da6c95a4b..3cbb5f3162 100644 --- a/plugin/evm/orderbook/service.go +++ b/plugin/evm/orderbook/service.go @@ -266,6 +266,9 @@ func (api *OrderBookAPI) StreamDepthUpdateForMarket(ctx context.Context, market depthUpdate := getUpdateInDepth(newMarketDepth, oldMarketDepth) notifier.Notify(rpcSub.ID, depthUpdate) oldMarketDepth = newMarketDepth + case <-rpcSub.Err(): + ticker.Stop() + return case <-notifier.Closed(): ticker.Stop() return @@ -301,6 +304,9 @@ func (api *OrderBookAPI) StreamDepthUpdateForMarketAndFreq(ctx context.Context, depthUpdate := getUpdateInDepth(newMarketDepth, oldMarketDepth) notifier.Notify(rpcSub.ID, depthUpdate) oldMarketDepth = newMarketDepth + case <-rpcSub.Err(): + ticker.Stop() + return case <-notifier.Closed(): ticker.Stop() return diff --git a/plugin/evm/orderbook/trading_apis.go b/plugin/evm/orderbook/trading_apis.go index 080c9fbcb8..b5e906a4f3 100644 --- a/plugin/evm/orderbook/trading_apis.go +++ b/plugin/evm/orderbook/trading_apis.go @@ -251,6 +251,9 @@ func (api *TradingAPI) StreamDepthUpdateForMarket(ctx context.Context, market in } notifier.Notify(rpcSub.ID, response) oldMarketDepth = newMarketDepth + case <-rpcSub.Err(): + ticker.Stop() + return case <-notifier.Closed(): ticker.Stop() return @@ -300,6 +303,8 @@ func (api *TradingAPI) StreamTraderUpdates(ctx context.Context, trader string, b if strings.EqualFold(event.Trader.String(), trader) && event.BlockStatus == confirmationLevel { notifier.Notify(rpcSub.ID, event) } + case <-rpcSub.Err(): + return case <-notifier.Closed(): return } @@ -325,6 +330,8 @@ func (api *TradingAPI) StreamMarketTrades(ctx context.Context, market Market, bl if event.Market == market && event.BlockStatus == confirmationLevel { notifier.Notify(rpcSub.ID, event) } + case <-rpcSub.Err(): + return case <-notifier.Closed(): return }