Skip to content

Commit

Permalink
Handle unsubscribe of streaming APIs (#188)
Browse files Browse the repository at this point in the history
  • Loading branch information
lumos42 authored Apr 10, 2024
1 parent 77be032 commit 5e48e59
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 0 deletions.
6 changes: 6 additions & 0 deletions plugin/evm/orderbook/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ func (api *OrderBookAPI) StreamDepthUpdateForMarket(ctx context.Context, market
depthUpdate := getUpdateInDepth(newMarketDepth, oldMarketDepth)
notifier.Notify(rpcSub.ID, depthUpdate)
oldMarketDepth = newMarketDepth
case <-rpcSub.Err():
ticker.Stop()
return
case <-notifier.Closed():
ticker.Stop()
return
Expand Down Expand Up @@ -301,6 +304,9 @@ func (api *OrderBookAPI) StreamDepthUpdateForMarketAndFreq(ctx context.Context,
depthUpdate := getUpdateInDepth(newMarketDepth, oldMarketDepth)
notifier.Notify(rpcSub.ID, depthUpdate)
oldMarketDepth = newMarketDepth
case <-rpcSub.Err():
ticker.Stop()
return
case <-notifier.Closed():
ticker.Stop()
return
Expand Down
7 changes: 7 additions & 0 deletions plugin/evm/orderbook/trading_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ func (api *TradingAPI) StreamDepthUpdateForMarket(ctx context.Context, market in
}
notifier.Notify(rpcSub.ID, response)
oldMarketDepth = newMarketDepth
case <-rpcSub.Err():
ticker.Stop()
return
case <-notifier.Closed():
ticker.Stop()
return
Expand Down Expand Up @@ -300,6 +303,8 @@ func (api *TradingAPI) StreamTraderUpdates(ctx context.Context, trader string, b
if strings.EqualFold(event.Trader.String(), trader) && event.BlockStatus == confirmationLevel {
notifier.Notify(rpcSub.ID, event)
}
case <-rpcSub.Err():
return
case <-notifier.Closed():
return
}
Expand All @@ -325,6 +330,8 @@ func (api *TradingAPI) StreamMarketTrades(ctx context.Context, market Market, bl
if event.Market == market && event.BlockStatus == confirmationLevel {
notifier.Notify(rpcSub.ID, event)
}
case <-rpcSub.Err():
return
case <-notifier.Closed():
return
}
Expand Down

0 comments on commit 5e48e59

Please sign in to comment.