Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use generic checks for missing columns🙂 #71

Merged
merged 2 commits into from
Jan 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 42 additions & 60 deletions src/tclf/classical_classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from __future__ import annotations

import re
from typing import Literal, get_args

import numpy as np
Expand Down Expand Up @@ -55,6 +56,8 @@ class ClassicalClassifier(ClassifierMixin, BaseEstimator):
base estimator (BaseEstimator): base estimator for basic functionality, such as `transform()`
"""

X_ = pd.DataFrame

def __init__(
self,
layers: list[
Expand Down Expand Up @@ -395,59 +398,32 @@ def _nan(self, subset: str) -> npt.NDArray:
"""
return np.full(shape=(self.X_.shape[0],), fill_value=np.nan)

def _validate_columns(self, found_cols: list[str]) -> None:
def _validate_columns(self, missing_columns: list) -> None:
"""Validate if all required columns are present.

Args:
found_cols (list[str]): columns present in dataframe.
"""

def lookup_columns(func_str: str, sub: str) -> list[str]:
LR_LIKE = [
"trade_price",
f"price_{sub}_lag",
f"ask_{sub}",
f"bid_{sub}",
]
REV_LR_LIKE = [
"trade_price",
f"price_{sub}_lead",
f"ask_{sub}",
f"bid_{sub}",
]
missing_columns (list): list of missing columns.

LUT_REQUIRED_COLUMNS: dict[str, list[str]] = {
"nan": [],
"clnv": LR_LIKE,
"depth": [
"trade_price",
f"ask_{sub}",
f"bid_{sub}",
f"ask_size_{sub}",
f"bid_size_{sub}",
],
"emo": LR_LIKE,
"lr": LR_LIKE,
"quote": ["trade_price", f"ask_{sub}", f"bid_{sub}"],
"rev_clnv": REV_LR_LIKE,
"rev_emo": REV_LR_LIKE,
"rev_lr": REV_LR_LIKE,
"rev_tick": ["trade_price", f"price_{sub}_lead"],
"tick": ["trade_price", f"price_{sub}_lag"],
"trade_size": ["trade_size", f"ask_size_{sub}", f"bid_size_{sub}"],
}
return LUT_REQUIRED_COLUMNS[func_str]

required_cols_set = set()
for func_str, sub in self._layers:
func_col = lookup_columns(func_str, sub)
required_cols_set.update(func_col)

missing_cols = sorted(required_cols_set - set(found_cols))
if missing_cols:
Raises:
ValueError: columns missing in dataframe.
"""
columns = self.columns_ + missing_columns if self.columns_ else missing_columns
self.X_ = pd.DataFrame(np.zeros(shape=(1, len(columns))), columns=columns)
try:
self._predict()
except KeyError as e:
result = re.search(r"'([^']+)'", str(e))
if result:
add_missing = result.group(1)
if add_missing:
missing_columns.append(add_missing)
return self._validate_columns(missing_columns)
if missing_columns:
raise ValueError(
f"Expected to find columns: {missing_cols}. Check naming/presenence of columns. See: https://karelze.github.io/tclf/naming_conventions/"
f"Expected to find columns: {sorted(missing_columns)}. Check naming/presenence of columns. See: https://karelze.github.io/tclf/naming_conventions/"
)
del self.X_
return None

def fit(
self,
Expand Down Expand Up @@ -505,7 +481,7 @@ def fit(
self.classes_ = np.array([-1, 1])

# if no features are provided or inferred, use default
if not self.columns_:
if self.columns_ is None:
self.columns_ = [str(i) for i in range(X.shape[1])]

if len(self.columns_) > 0 and X.shape[1] != len(self.columns_):
Expand All @@ -521,9 +497,7 @@ def fit(
f"expected one of {ALLOWED_FUNC_STR}."
)

columns = self.columns_
self._validate_columns(columns)

self._validate_columns([])
return self

def predict(self, X: MatrixLike) -> npt.NDArray:
Expand All @@ -546,15 +520,7 @@ def predict(self, X: MatrixLike) -> npt.NDArray:
rs = check_random_state(self.random_state)

self.X_ = pd.DataFrame(data=X, columns=self.columns_)
pred = np.full(shape=(X.shape[0],), fill_value=np.nan)

for func_str, subset in self._layers:
func = self.func_mapping_[func_str]
pred = np.where(
np.isnan(pred),
func(subset=subset),
pred,
)
pred = self._predict()

# fill NaNs randomly with -1 and 1 or with constant zero
mask = np.isnan(pred)
Expand All @@ -567,6 +533,22 @@ def predict(self, X: MatrixLike) -> npt.NDArray:
del self.X_
return pred

def _predict(self) -> npt.NDArray:
"""Predict with rule stack.

Returns:
npt.NDArray: prediction
"""
pred = np.full(shape=(self.X_.shape[0],), fill_value=np.nan)
for func_str, subset in self._layers:
func = self.func_mapping_[func_str]
pred = np.where(
np.isnan(pred),
func(subset=subset),
pred,
)
return pred

def predict_proba(self, X: MatrixLike) -> npt.NDArray:
"""Predict class probabilities for X.

Expand Down
Loading