From fbe9a5b957e30352b30d67481689de2074ec3fb9 Mon Sep 17 00:00:00 2001 From: qmatias Date: Tue, 10 May 2022 18:16:20 -0500 Subject: [PATCH] fix bar events --- blankly/frameworks/strategy/strategy.py | 31 ++++++++++++++++--------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/blankly/frameworks/strategy/strategy.py b/blankly/frameworks/strategy/strategy.py index 6ae4b53b..9b8268f6 100644 --- a/blankly/frameworks/strategy/strategy.py +++ b/blankly/frameworks/strategy/strategy.py @@ -53,20 +53,20 @@ def construct_strategy(self, schedulers, orderbook_websockets, self.orderbook_manager = orderbook_manager self.ticker_manager = ticker_manager - def rest_event(self, **kwargs): - callback = kwargs['callback'] # type: callable - symbol = kwargs['symbol'] # type: str - resolution = kwargs['resolution'] # type: int - variables = kwargs['variables'] # type: dict - type_ = kwargs['type'] # type: EventType - state = kwargs['state'] # type: StrategyState + def rest_event(self, event): + callback = event['callback'] # type: callable + symbol = event['symbol'] # type: str + resolution = event['resolution'] # type: int + variables = event['variables'] # type: dict + type_ = event['type'] # type: EventType + state = event['state'] # type: StrategyState state.variables = variables state.resolution = resolution if type_ == EventType.bar_event: if not self.is_backtesting: - bar_time = kwargs['bar_time'] + bar_time = event['bar_time'] while True: # Sometimes coinbase doesn't download recent data correctly try: @@ -86,9 +86,17 @@ def rest_event(self, **kwargs): except IndexError: warnings.warn("No bar found for this time range") return - threshold = 10 - if data['time'] + threshold < self.time: + + # don't send duplicate events + if data['time'] == event.get('prev_ev_time', None): + return + event['prev_ev_time'] = data['time'] + + # delay event once to sync up with historical data + was_delayed = event.get('was_delayed', False) + if not was_delayed and data['time'] < self.time: return data['time'] + resolution + args = [data, symbol, state] elif type_ == EventType.price_event: data = self.interface.get_price(symbol) @@ -140,11 +148,12 @@ def run_price_events(self, events: list): self.sleep(event['next_run'] - self.time) # Run the event - next_run = self.rest_event(**event) + next_run = self.rest_event(event) if next_run: # if rest_event returns something, run this event again at that time # this implies the event did *not* run event['next_run'] = next_run + event['was_delayed'] = True else: # otherwise, the event ran. we can revalue account and re-run normally @ `resolution` intervals self.backtester.value_account()