Skip to content

Commit

Permalink
moves pricelog reader into Process/Q
Browse files Browse the repository at this point in the history
reads a bunch of gziped lines, splits them into symbol, date, price
discarding what we don't care about and batch places those values into a
shared Q.

This shaves a cpl of seconds out of a backtesting run, but also shrinks
RAM usage down from 700MB to about 70MB.
There is still a significant pause in processing lines, due to the batch
size in the reader process, the time it takes to split,  and the time to pickle items into the Q.
A trial and error led me to 128K as a good compromise between speed and
cpu usage (~3%)
There is room for optimization, maybe look into 0mq or async gzip readers, or
Queue read pooling if that's an option. Needs profiling, likely the
split_line is still the cause of the processing pauses.
  • Loading branch information
Azulinho committed Jan 2, 2023
1 parent 2c3e741 commit 304c648
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 42 deletions.
2 changes: 2 additions & 0 deletions .mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ ignore_missing_imports = True
ignore_missing_imports = True
[mypy-yaml.*]
ignore_missing_imports = True
[mypy-faster_fifo.*]
ignore_missing_imports = True
90 changes: 48 additions & 42 deletions lib/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from os.path import basename, exists
from time import sleep
from typing import Any, Dict, List, Tuple
from multiprocessing import Process
from faster_fifo import Queue # pylint: disable=no-name-in-module


from lib.helpers import (
Expand Down Expand Up @@ -1419,21 +1421,12 @@ def check_for_delisted_coin(self, symbol: str) -> bool:
return True
return False

def process_line(self, line: str) -> None:
def process_line(self, symbol, date, market_price) -> None:
"""processes a backlog line"""

if self.quit: # when told to quit, just go nicely
return

# skip processing the line if it doesn't not match our PAIRING settings
if self.pairing not in line:
return

symbol, date, market_price = self.split_logline(line)
# symbol will be False if we fail to process the line fields
if not symbol:
return

# TODO: rework this, generate a binance_data blob to pass to
# init_or_update_coin()
if symbol not in self.coins:
Expand Down Expand Up @@ -1471,44 +1464,34 @@ def process_line(self, line: str) -> None:
# and finally run through the strategy for our coin.
self.run_strategy(self.coins[symbol])

def backtest_logfile(self, price_log: str) -> None:
"""processes one price.log file for backtesting"""

# when told to quit, do it nicely
if self.quit:
return

logging.info(f"backtesting: {price_log}")
logging.info(f"wallet: {self.wallet}")
logging.info(f"exposure: {self.calculates_exposure()}")
try:
def place_klines_into_q(self, cfg: Dict, q_klines: Queue):
for logfile in cfg["PRICE_LOGS"]:
logging.info(f"backtesting: {logfile}")
logging.info(f"wallet: {self.wallet}")
logging.info(f"exposure: {self.calculates_exposure()}")
# we support .lz4 and .gz for our price.log files.
# gzip -3 files provide the fastest decompression times I was able
# to measure.
if price_log.endswith(".lz4"):
f = lz4open(price_log, mode="rt")
if logfile.endswith(".lz4"):
f = lz4open(logfile, mode="rt")
else:
f = igzip.open(price_log, "rt")
f = igzip.open(logfile, "rt")
while True:
# reading a chunk of lines like this speeds up backtesting
# by a large amount.
if self.quit:
break
next_n_lines = list(islice(f, 4 * 1024 * 1024))
next_n_lines = list(islice(f, 131070))
if not next_n_lines:
break

# now process each of the lines from our chunk
payload = []
for line in next_n_lines:
self.process_line(str(line))
f.close()
except Exception as error_msg: # pylint: disable=broad-except
logging.error("Exception:")
logging.error(traceback.format_exc())
# look into better ways to trapping a KeyboardInterrupt
# and then maybe setting self.quit = True
if error_msg == "KeyboardInterrupt":
sys.exit(1)
if cfg["PAIRING"] not in line:
continue
symbol, date, market_price = self.split_logline(str(line))
# symbol will be False if we fail to process the line fields
if not symbol:
continue
payload.append((symbol, date, market_price))
q_klines.put(payload)

q_klines.put([("QUIT", "QUIT", "QUIT")])

def backtesting(self) -> None:
"""the bot Backtesting main loop"""
Expand All @@ -1520,11 +1503,34 @@ def backtesting(self) -> None:
if not self.cfg["TICKERS"]:
logging.warning("no tickers to backtest")
else:
for price_log in self.price_logs:
self.backtest_logfile(price_log)

q_klines = Queue()
Process(
target=self.place_klines_into_q,
args=(
self.cfg,
q_klines,
),
).start()

finished = False
while True:
self.process_control_flags()
if self.quit:
break
if finished:
break
payload = q_klines.get()
for item in payload:
try:
symbol, date, market_price = item
except Exception as e:
print(e)
print(item)
if symbol == "QUIT":
finished = True
break
self.process_line(symbol, date, market_price)

# now that we are done, lets record our results
with open(
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,5 @@ websockets==9.1
Werkzeug==2.2.2
yarl==1.8.2
zipp==3.8.1
faster-fifo==1.4.2
Cython==0.29.32

0 comments on commit 304c648

Please sign in to comment.