Skip to content

Commit

Permalink
Stylized facts plot update. (#21)
Browse files Browse the repository at this point in the history
* Stylized facts plot update.

Following changes:

  1. Asset return stylized facts interface is streamlined. Interface for
preprocessed WRDS data removed and ABIDES interface simplified.
  2. Order flow stylized facts processing bugfixes.
  3. Import code for sim data preprocessing refactored.
  4. Efficiency improvement via fewer disk reads from pandas. 

Co-authored-by: Danial Dervovic <[email protected]>
  • Loading branch information
ddervs and ddervs authored Jul 9, 2020
1 parent 7d56876 commit b7c72b5
Show file tree
Hide file tree
Showing 16 changed files with 341 additions and 113 deletions.
101 changes: 44 additions & 57 deletions realism/asset_returns_stylized_facts.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
from glob import glob
from pathlib import Path
import argparse
sys.path.append('..')
from order_flow_stylized_facts import get_plot_colors
from tqdm import tqdm

p = str(Path(__file__).resolve().parents[1]) # directory one level up from this file
sys.path.append(p)
from realism_utils import get_plot_colors
from util.formatting.convert_order_stream import dir_path

# Create cache folder if it does not exist
Expand All @@ -29,101 +32,85 @@
all_metrics = [MinutelyReturns, AggregationNormality, Autocorrelation, VolatilityClustering, Kurtosis, VolumeVolatilityCorrelation, ReturnsVolatilityCorrelation]


def get_sims(sim_dir, n, my_metric):
def get_sims(sim_dir, my_metric, ohlcv_dict):
sims = []
exchanges = [a for a in Path(sim_dir).rglob('*.bz2') if "exchange" in str(a).lower()]
mult = math.ceil(n * samples_per_day / len(exchanges))

for exchange in exchanges:
ohlcv = get_trades(exchange)
sims += my_metric.compute(ohlcv)
ohlcv = ohlcv_dict[exchange]
if ohlcv is not None:
sims += my_metric.compute(ohlcv)

sims = sims * mult # Duplicate data to match size of historical data
random.shuffle(sims)

return sims


def plot_metrics(samples_per_day, real_dir, sim_dirs, sim_colors, recompute):
def get_ohlcvs(sim_dirs, recompute):
print("Loading simulation data...")
exchanges = []
for sim_dir in sim_dirs:
exchanges += [a for a in Path(sim_dir).rglob('*.bz2') if "exchange" in str(a).lower()]

pickled_ohclv = "cache/{}_ohclv.pickle".format("_".join(sim_dirs).replace("/", ""))
if (not os.path.exists(pickled_ohclv)) or recompute: # Pickled simulated metric not found in cache.
ohclv_dict = dict()
for exchange in tqdm(exchanges, desc="Files loaded"):
ohclv_dict.update(
{exchange: get_trades(exchange)}
)
pickle.dump(ohclv_dict, open(pickled_ohclv, "wb"))
else: # Pickled ohclv found in cache.
ohclv_dict = pickle.load(open(pickled_ohclv, "rb"))

return ohclv_dict


def plot_metrics(sim_dirs, sim_colors, output_dir, ohclv_dict, recompute):
# Loop through all metrics
for my_metric in all_metrics:
print(my_metric)
my_metric = my_metric()

# Calculate metrics for real data
real_data = sorted(os.listdir(real_dir))
n = len(real_data)
pickled_real = "cache/{}_real.pickle".format(my_metric.__class__.__name__)

# HISTORICAL METRIC
if (not os.path.exists(pickled_real)) or recompute: # Pickled historical metric not found in cache.
reals = []
for f in real_data: # For each historical trading day...
print(f)
f = os.path.join(real_dir, f)
df = pd.read_pickle(f, compression="bz2")
df.reset_index(level=-1, inplace=True)
df.level_1 = pd.to_datetime(df.level_1)
symbols = df.index.unique().tolist()
random.shuffle(symbols)
symbols = symbols[:samples_per_day] # ...sample `samples_per_day` symbols.
df = df[df.index.isin(symbols)] # Only keep data for sampled symbols.
for sym in symbols:
select = df[df.index == sym].set_index("level_1") # Set timestamp as index.
vol = select["volume"]
select = select.drop("volume", axis=1)
select = (np.round((1000*select/select.iloc[0]).dropna())*100).astype("int")
select["volume"] = vol
reals += my_metric.compute(select) # Compute metric on sampled data.
pickle.dump(reals, open(pickled_real, "wb"))
else: # Pickled historical metric found in cache.
reals = pickle.load(open(pickled_real, "rb"))

# SIMULATED METRIC
first = True if real_dir is None else False
result = dict()
for i, sim_dir in enumerate(sim_dirs):
# Calculate metrics for simulated data (via sampling)
pickled_sim = "cache/{}_{}.pickle".format(my_metric.__class__.__name__, sim_dir.replace("/",""))
if (not os.path.exists(pickled_sim)) or recompute: # Pickled simulated metric not found in cache.
sims = get_sims(sim_dir, n, my_metric)
sims = sims[:len(reals)] # Ensure length of simulated and historical data matches
sims = get_sims(sim_dir, my_metric, ohclv_dict)
pickle.dump(sims, open(pickled_sim, "wb"))
else: # Pickled simulated metric found in cache.
sims = pickle.load(open(pickled_sim, "rb"))

sim_name = sim_dir.rstrip('/').split("/")[-1]
result = {(sim_name, sim_colors[i]): sims}
result.update({(sim_name, sim_colors[i]): sims})

# Create plot for each config and metric
my_metric.visualize(result, reals, plot_real=not first)
first = True
# Create plot for each config and metric
my_metric.visualize(result)

plt.title(plt.gca().title.get_text())
try: os.mkdir("visualizations")
try: os.mkdir(output_dir)
except: pass
plt.savefig("visualizations/{}_{}.png".format(my_metric.__class__.__name__, sim_name),bbox_inches='tight')
plt.savefig("{}/{}.png".format(output_dir, my_metric.__class__.__name__), bbox_inches='tight')
plt.clf()


if __name__ == "__main__":

parser = argparse.ArgumentParser(description='Processes historical data and simulated stream files and produce plots'
' of stylized fact metrics for asset return distributions.')
parser.add_argument('-r', '--historical-data-dir', type=dir_path, required=False, help="Directory containing preprocessed"
"historical data for asset return"
"stylized facts")
parser.add_argument('-s', '--simulated-data-dir', type=dir_path, action='append', required=False,
parser.add_argument('-s', '--simulated-data-dir', type=dir_path, action='append', required=True,
help="Directory containing .bz2 output log files from ABIDES Exchange Agent. Note that the "
"filenames MUST contain the word 'Exchange' in any case. One can add many simulated data "
"directories")
parser.add_argument('-z', '--recompute', action="store_true", help="Rerun computations without caching.")

parser.add_argument('-o', '--output-dir', default='visualizations', help='Path to output directory', type=dir_path)

args, remaining_args = parser.parse_known_args()

# Settings
samples_per_day = 30
real_dir = args.historical_data_dir
sim_dirs = args.simulated_data_dir
sim_colors = get_plot_colors(sim_dirs, start_idx=1)
sim_dirs.sort()
sim_colors = get_plot_colors(sim_dirs)
ohclv_dict = get_ohlcvs(sim_dirs, args.recompute)

plot_metrics(samples_per_day, real_dir, sim_dirs, sim_colors, args.recompute)
plot_metrics(sim_dirs, sim_colors, args.output_dir, ohclv_dict, args.recompute)
110 changes: 110 additions & 0 deletions realism/market_impact/abm_market_impact.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import argparse
import pandas as pd
import numpy as np
import sys
p = str(Path(__file__).resolve().parents[2]) # directory two levels up from this file
sys.path.append(p)

from realism.realism_utils import make_orderbook_for_analysis


def create_orderbooks(exchange_path, ob_path):
MID_PRICE_CUTOFF = 10000
processed_orderbook = make_orderbook_for_analysis(exchange_path, ob_path, num_levels=1,
hide_liquidity_collapse=False)
cleaned_orderbook = processed_orderbook[(processed_orderbook['MID_PRICE'] > - MID_PRICE_CUTOFF) &
(processed_orderbook['MID_PRICE'] < MID_PRICE_CUTOFF)]
transacted_orders = cleaned_orderbook.loc[cleaned_orderbook.TYPE == "ORDER_EXECUTED"]

transacted_orders = transacted_orders.reset_index()
transacted_orders = transacted_orders.sort_values(by=['index', 'ORDER_ID']).iloc[1::2]
transacted_orders.set_index('index', inplace=True)
return processed_orderbook, transacted_orders, cleaned_orderbook


def calculate_market_impact(orders_df, ob_df, start_time, end_time, tao):

def create_bins(tao, start_time, end_time, orders_df, is_buy):
bins = pd.interval_range(start=start_time, end=end_time, freq=pd.DateOffset(seconds=tao))
binned = pd.cut(orders_df.loc[orders_df.BUY_SELL_FLAG == is_buy].index, bins=bins)
binned_volume = orders_df.loc[orders_df.BUY_SELL_FLAG == is_buy].groupby(binned).SIZE.agg(np.sum)
return binned_volume

def calculate_mid_move(row):
try:
t_start = row.name.left
t_end = row.name.right
mid_t_start = mid_resampled.loc[mid_resampled.index == t_start].item()
mid_t_end = mid_resampled.loc[mid_resampled.index == t_end].item()
if row.ti < 0:
row.mi = -1 * ((mid_t_end - mid_t_start) / mid_t_start) * 10000 # bps
else:
row.mi = (mid_t_end - mid_t_start) / mid_t_start * 10000 # bps
return row.mi
except:
pass

ob_df = ob_df.reset_index().drop_duplicates(subset='index', keep='last').set_index('index')

mid = ob_df.MID_PRICE
mid_resampled = mid.resample(f'{tao}s').ffill()

binned_buy_volume = create_bins(tao=int(tao), start_time=start_time, end_time=end_time, orders_df=orders_df,
is_buy=True).fillna(0)
binned_sell_volume = create_bins(tao=int(tao), start_time=start_time, end_time=end_time, orders_df=orders_df,
is_buy=False).fillna(0)

midf = pd.DataFrame()
midf['buy_vol'] = binned_buy_volume
midf['sell_vol'] = binned_sell_volume
midf['ti'] = midf['buy_vol'] - midf['sell_vol'] # Trade Imbalance
midf['pov'] = abs(midf['ti']) / (midf['buy_vol'] + midf['sell_vol']) # Participation of Volume in tao
midf['mi'] = None
midf.index = pd.interval_range(start=start_time, end=end_time, freq=pd.DateOffset(seconds=int(tao)))

midf.mi = midf.apply(calculate_mid_move, axis=1)

pov_bins = np.linspace(start=0, stop=1, num=1000, endpoint=False)
pov_binned = pd.cut(x=midf['pov'], bins=pov_bins)

midf['pov_bins'] = pov_binned

midf_gpd = midf.sort_values(by='pov_bins')
midf_gpd.index = midf_gpd.pov_bins
del midf_gpd['pov_bins']

df = pd.DataFrame(index=midf_gpd.index)
df['mi'] = midf_gpd['mi']
df['pov'] = midf_gpd['pov']
df = df.groupby(df.index).mean()

return df


if __name__ == "__main__":

parser = argparse.ArgumentParser(description='Market Impact Curve as described in AlmgrenChriss 05 paper')

parser.add_argument('--stock', default=None, required=True, help='stock (ABM)')
parser.add_argument('--date', default=None, required=True, help='date (20200101)')
parser.add_argument('--log', type=str, default=None, required=True, help='log folder')
parser.add_argument('--tao', type=int, required=True, help='Number of seconds in each bin')

args, remaining_args = parser.parse_known_args()

stock = args.stock
date = args.date
start_time = pd.Timestamp(date) + pd.to_timedelta('09:30:00')
end_time = pd.Timestamp(date) + pd.to_timedelta('16:00:00')
abides_log_folder = args.log

print('Processing market impact data for {}'.format(abides_log_folder))

processed_orderbook, transacted_orders, cleaned_orderbook = create_orderbooks(
exchange_path=abides_log_folder + '/EXCHANGE_AGENT.bz2',
ob_path=abides_log_folder + '/ORDERBOOK_{}_FULL.bz2'.format(stock))

df = calculate_market_impact(transacted_orders, cleaned_orderbook, start_time, end_time, tao=args.tao)
df.to_pickle(abides_log_folder + f'/market_impact_df_tao_{args.tao}.bz2')

print('Processed market impact data for {}'.format(abides_log_folder))
128 changes: 128 additions & 0 deletions realism/market_impact/marketreplay_market_impact.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import argparse
import pandas as pd
import numpy as np

num_levels = 50
columns = [[f'ask_price_{level}', f'ask_size_{level}', f'bid_price_{level}', f'bid_size_{level}'] for level in range(1, num_levels+1)]
columns = [x for b in columns for x in b]


def process_data(abides_log_folder, stock, date):
csv_orderbooks_parent_folder = '/efs/data/get_real_data/lobsterized/orderbook/'

# Orderbook snapshots
ob_df = pd.read_csv(csv_orderbooks_parent_folder + f'orderbook_{stock}_{date}.csv')
ob_df.columns = columns
ob_df.index = pd.read_pickle(abides_log_folder + f'ORDERBOOK_{stock}_FREQ_ALL_{date.replace("-", "")}.bz2').index[1:]

start_time = pd.Timestamp(date) + pd.to_timedelta('09:30:00')
end_time = pd.Timestamp(date) + pd.to_timedelta('16:00:00')
ob_df = ob_df.loc[(ob_df.index >= start_time) & (ob_df.index <= end_time)]


# Transacted Orders
ea_df = pd.read_pickle(abides_log_folder + 'EXCHANGE_AGENT.bz2')
ea_df = ea_df.loc[ea_df.EventType == 'ORDER_EXECUTED']

transacted_orders_df = pd.DataFrame(columns=['TIMESTAMP', 'ORDER_ID', 'PRICE', 'SIZE', 'BUY_SELL_FLAG'])

i = 0
for index, row in ea_df.iterrows():
transacted_orders_df = transacted_orders_df.append(pd.Series(data={
'TIMESTAMP': index,
'ORDER_ID': row.Event['order_id'],
'PRICE': row.Event['fill_price'],
'SIZE': row.Event['quantity'],
'BUY_SELL_FLAG': row.Event['is_buy_order']
}), ignore_index=True)
i += 1

transacted_orders_df.set_index('TIMESTAMP', inplace=True)

transacted_orders_df = transacted_orders_df.sort_values(by=['TIMESTAMP', 'ORDER_ID']).iloc[1::2]

return ob_df, transacted_orders_df, start_time, end_time


def calculate_market_impact(orders_df, ob_df, start_time, end_time, tao):

def create_bins(tao, start_time, end_time, orders_df, is_buy):
bins = pd.interval_range(start=start_time, end=end_time, freq=pd.DateOffset(seconds=tao))
binned = pd.cut(orders_df.loc[orders_df.BUY_SELL_FLAG == is_buy].index, bins=bins)
binned_volume = orders_df.loc[orders_df.BUY_SELL_FLAG == is_buy].groupby(binned).SIZE.agg(np.sum)
return binned_volume

def calculate_mid_move(row):
try:
t_start = row.name.left
t_end = row.name.right
mid_t_start = mid_resampled.loc[mid_resampled.index == t_start].item()
mid_t_end = mid_resampled.loc[mid_resampled.index == t_end].item()
if row.ti < 0:
row.mi = -1 * ((mid_t_end - mid_t_start) / mid_t_start) * 10000 # bps
else:
row.mi = (mid_t_end - mid_t_start) / mid_t_start * 10000 # bps
return row.mi
except:
pass

mid = (ob_df.ask_price_1 + ob_df.bid_price_1) / 2
mid_resampled = mid.resample(f'{tao}s').ffill()

binned_buy_volume = create_bins(tao=int(tao), start_time=start_time, end_time=end_time, orders_df=orders_df,
is_buy=True).fillna(0)
binned_sell_volume = create_bins(tao=int(tao), start_time=start_time, end_time=end_time, orders_df=orders_df,
is_buy=False).fillna(0)

midf = pd.DataFrame()
midf['buy_vol'] = binned_buy_volume
midf['sell_vol'] = binned_sell_volume
midf['ti'] = midf['buy_vol'] - midf['sell_vol'] # Trade Imbalance
midf['pov'] = abs(midf['ti']) / (midf['buy_vol'] + midf['sell_vol']) # Participation of Volume in tao
midf['mi'] = None
midf.index = pd.interval_range(start=start_time, end=end_time, freq=pd.DateOffset(seconds=int(tao)))

midf.mi = midf.apply(calculate_mid_move, axis=1)

pov_bins = np.linspace(start=0, stop=1, num=1000, endpoint=False)
pov_binned = pd.cut(x=midf['pov'], bins=pov_bins)

midf['pov_bins'] = pov_binned

midf_gpd = midf.sort_values(by='pov_bins')
midf_gpd.index = midf_gpd.pov_bins
del midf_gpd['pov_bins']

df = pd.DataFrame(index=midf_gpd.index)
df['mi'] = midf_gpd['mi']
df['pov'] = midf_gpd['pov']
df = df.groupby(df.index).mean()

return df


if __name__ == "__main__":

parser = argparse.ArgumentParser(description='Market Impact Curve as described in AlmgrenChriss 05 paper')

parser.add_argument('--tao', required=True, help='Number of seconds in each bin')
parser.add_argument('--ticker', required=True, help='Name of the stock/symbol')
parser.add_argument('--date', required=True, help='Historical date')

args, remaining_args = parser.parse_known_args()

stock = args.ticker
date = args.date

abides_logs_parent_folder = '/efs/data/get_real_data/marketreplay-logs/log/'
abides_log_folder = abides_logs_parent_folder + f'marketreplay_{stock}_{date.replace("-", "")}/'

ob_df, orders_df, start_time, end_time = process_data(abides_log_folder=abides_log_folder,
stock=stock,
date=date)
print(f'Processed order book data for {stock} {date}, calculating market impact ...')

df = calculate_market_impact(orders_df, ob_df, start_time, end_time, tao=args.tao)
df.to_pickle(abides_log_folder + f'market_impact_df_tao_{args.tao}.bz2')

print(f'Processed market impact data for {stock} {date}')
Loading

0 comments on commit b7c72b5

Please sign in to comment.