Skip to content

Commit

Permalink
Add multi dimension plots to analysis.outliers module. (#805)
Browse files Browse the repository at this point in the history
* add multi dimension subplots and chosen 2 features scatter from many features

* add multi dimension subplots and chosen 2 features scatter from many features

* add multi dimension subplots and chosen 2 features scatter from many features

---------

Co-authored-by: Ian Hellen <[email protected]>
  • Loading branch information
Tatsuya-hasegawa and ianhelle authored Nov 23, 2024
1 parent eacda5c commit 992c53c
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 47 deletions.
2 changes: 2 additions & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ disable=raw-checker-failed,
deprecated-pragma,
use-symbolic-message-instead,
too-many-positional-arguments,
too-many-arguments,
too-many-statements,

# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
Expand Down
179 changes: 132 additions & 47 deletions msticpy/analysis/outliers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"""

import math
from typing import List, Tuple
from typing import List, Optional, Tuple, Union

import numpy as np
import pandas as pd
Expand All @@ -33,12 +33,15 @@
) from imp_err

__version__ = VERSION
__author__ = "Ian Hellen"
__author__ = "Ian Hellen, Tatsuya Hasegawa"


# pylint: disable=invalid-name
def identify_outliers(
x: np.ndarray, x_predict: np.ndarray, contamination: float = 0.05
x: np.ndarray,
x_predict: np.ndarray,
contamination: float = 0.05,
max_features: Optional[Union[int, float]] = None,
) -> Tuple[IsolationForest, np.ndarray, np.ndarray]:
"""
Identify outlier items using SkLearn IsolationForest.
Expand All @@ -51,6 +54,10 @@ def identify_outliers(
Model
contamination : float
Percentage contamination (default: {0.05})
max_features : int or float, optional
Specifies max num or max rate of features
to be randomly selected when building each tree.
default: None => {math.floor(math.sqrt(cols))}
Returns
-------
Expand All @@ -64,8 +71,10 @@ def identify_outliers(

# fit the model
rows, cols = x.shape
max_samples = min(100, cols)
max_features = math.floor(math.sqrt(rows))
max_samples = min(100, rows)
if not max_features:
max_features = math.floor(math.sqrt(cols))

clf = IsolationForest(
max_samples=max_samples,
max_features=max_features,
Expand Down Expand Up @@ -109,52 +118,128 @@ def plot_outlier_results(
list of feature columns to display
plt_title : str
Plot title
"""
# plot the line, the samples, and the nearest vectors to the plane
x_max_x = x[:, 0].max() + (x[:, 0].max() / 10)
x_min_x = -x[:, 0].max() / 10
x_max_y = x[:, 1].max() + (x[:, 1].max() / 10)
x_min_y = -x[:, 1].max() / 10
xx, yy = np.meshgrid(
np.linspace(x_min_x, x_max_x, 100), np.linspace(x_min_y, x_max_y, 100)
)
z = clf.decision_function(np.c_[xx.ravel(), yy.ravel()])
z = z.reshape(xx.shape)
if len(feature_columns) == 2:
# two dimension plot: mostly remain original codes from msticpy 2.14.0
# plot the line, the samples, and the nearest vectors to the plane
x_max_x = x[:, 0].max() + (x[:, 0].max() / 10)
x_min_x = -x[:, 0].max() / 10
x_max_y = x[:, 1].max() + (x[:, 1].max() / 10)
x_min_y = -x[:, 1].max() / 10
xx, yy = np.meshgrid(
np.linspace(x_min_x, x_max_x, 100), np.linspace(x_min_y, x_max_y, 100)
)
z = clf.decision_function(
np.c_[
xx.ravel(),
yy.ravel(),
np.zeros(
(xx.ravel().shape[0], clf.n_features_in_ - len(feature_columns))
),
]
)
z = z.reshape(xx.shape)

plt.rcParams["figure.figsize"] = (20, 10)
plt.rcParams["figure.figsize"] = (20, 10)

plt.title(plt_title)
# pylint: disable=no-member
plt.contourf(xx, yy, z, cmap=plt.cm.Blues_r) # type: ignore
plt.title(plt_title)
# pylint: disable=no-member
plt.contourf(xx, yy, z, cmap=plt.cm.Blues_r) # type: ignore

b1 = plt.scatter(x[:, 0], x[:, 1], c="white", s=20, edgecolor="k")
b2 = plt.scatter(x_predict[:, 0], x_predict[:, 1], c="green", s=40, edgecolor="k")
c = plt.scatter(
x_outliers[:, 0], x_outliers[:, 1], c="red", marker="x", s=200, edgecolor="k"
)
plt.axis("tight")

xp_max_x = x_predict[:, 0].max() + (x_predict[:, 0].max() / 10)
xp_min_x = -x_predict[:, 0].max() / 10
xp_max_y = x_predict[:, 1].max() + (x_predict[:, 1].max() / 10)
xp_min_y = -x_predict[:, 1].max() / 10

plt.xlim((xp_min_x, xp_max_x))
plt.ylim((xp_min_y, xp_max_y))
plt.xlabel(feature_columns[0]) # type: ignore
plt.ylabel(feature_columns[1]) # type: ignore

plt.legend(
[b1, b2, c],
[
"training observations",
"new regular observations",
"new abnormal observations",
],
loc="upper right",
)
plt.show()
b1 = plt.scatter(x[:, 0], x[:, 1], c="white", s=20, edgecolor="k")
b2 = plt.scatter(
x_predict[:, 0], x_predict[:, 1], c="green", s=40, edgecolor="k"
)
c = plt.scatter(x_outliers[:, 0], x_outliers[:, 1], c="red", marker="x", s=200)
plt.axis("tight")

xp_max_x = x_predict[:, 0].max() + (x_predict[:, 0].max() / 10)
xp_min_x = -x_predict[:, 0].max() / 10
xp_max_y = x_predict[:, 1].max() + (x_predict[:, 1].max() / 10)
xp_min_y = -x_predict[:, 1].max() / 10

plt.xlim((xp_min_x, xp_max_x))
plt.ylim((xp_min_y, xp_max_y))
plt.xlabel(feature_columns[0]) # type: ignore
plt.ylabel(feature_columns[1]) # type: ignore

plt.legend(
[b1, b2, c],
[
"training observations",
"new regular observations",
"new abnormal observations",
],
loc="upper right",
)
plt.show()

elif len(feature_columns) > 2: # multi dimension subplots
dimension_num = x.shape[1]
fig, axes = plt.subplots(
dimension_num, dimension_num, figsize=(20, 20), constrained_layout=True
)
for i in range(dimension_num):
for j in range(dimension_num):
if i != j:
# plot the line, the samples, and the nearest vectors to the plane
x_max_x = x[:, j].max() + (x[:, j].max() / 10)
x_min_x = -x[:, j].max() / 10
x_max_y = x[:, i].max() + (x[:, i].max() / 10)
x_min_y = -x[:, i].max() / 10
xx, yy = np.meshgrid(
np.linspace(x_min_x, x_max_x, 100),
np.linspace(x_min_y, x_max_y, 100),
)
z = clf.decision_function(
np.c_[
xx.ravel(),
yy.ravel(),
np.zeros((xx.ravel().shape[0], len(feature_columns) - 2)),
]
)
z = z.reshape(xx.shape)

# pylint: disable=no-member
axes[i, j].contourf(xx, yy, z, cmap=plt.cm.Blues_r) # type: ignore

b1 = axes[i, j].scatter(x[:, j], x[:, i], c="white", edgecolor="k")
b2 = axes[i, j].scatter(
x_predict[:, j], x_predict[:, i], c="green", edgecolor="k"
)
c = axes[i, j].scatter(
x_outliers[:, j], x_outliers[:, i], c="red", marker="x"
)

xp_max_x = x_predict[:, 0].max() + (x_predict[:, 0].max() / 10)
xp_min_x = -x_predict[:, 0].max() / 10
xp_max_y = x_predict[:, 1].max() + (x_predict[:, 1].max() / 10)
xp_min_y = -x_predict[:, 1].max() / 10

axes[i, j].axis(xmin=xp_min_x, xmax=xp_max_x)
axes[i, j].axis(ymin=xp_min_y, ymax=xp_max_y)
axes[i, j].set_xlabel(f"{feature_columns[j]}")
axes[i, j].set_ylabel(f"{feature_columns[i]}")

else:
# do not show the same features x,y each other.
axes[i, j].axis("off")

fig.suptitle(plt_title)
plt.legend(
[b1, b2, c],
[
"training observations",
"new regular observations",
"new abnormal observations",
],
facecolor="#0072BD",
framealpha=0.3,
)
plt.show()

else:
raise ValueError("plot_outlier_results function needs more than two features.")


def remove_common_items(data: pd.DataFrame, columns: List[str]) -> pd.DataFrame:
Expand Down

0 comments on commit 992c53c

Please sign in to comment.