This repository has been archived by the owner on Nov 3, 2024. It is now read-only.
forked from korney3/BI_ML21_HW
-
Notifications
You must be signed in to change notification settings - Fork 0
/
DecisionTree.py
135 lines (104 loc) · 5.23 KB
/
DecisionTree.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
class DecisionTreeLeaf:
def __init__(self, sample_Y):
n = len(sample_Y)
n0 = np.sum(sample_Y == 0)
n1 = np.sum(sample_Y == 1)
self.y = 1 if n1 > n0 else 0
self.prob = {0: n0 / n, 1: n1 / n}
class DecisionTreeNode:
def __init__(self, sample_X, sample_Y, criterion='gini',
current_depth=0, max_depth=None, min_samples_leaf=1):
split_index, split_value = self.find_best_split(sample_X, sample_Y, criterion)
left_index = sample_X[:, split_index] < split_value
right_index = sample_X[:, split_index] >= split_value
left_X = sample_X[left_index, :]
left_Y = sample_Y[left_index]
right_X = sample_X[right_index, :]
right_Y = sample_Y[right_index]
if len(np.unique(left_Y)) == 1 or len(left_Y) <= min_samples_leaf or current_depth is max_depth:
self.left = DecisionTreeLeaf(left_Y)
else:
self.left = DecisionTreeNode(left_X, left_Y,
criterion=criterion,
current_depth=current_depth+1,
max_depth=max_depth,
min_samples_leaf=min_samples_leaf)
if len(np.unique(right_Y)) == 1 or len(right_Y) <= min_samples_leaf or current_depth is max_depth:
self.right = DecisionTreeLeaf(right_Y)
else:
self.right = DecisionTreeNode(right_X, right_Y,
criterion=criterion,
current_depth=current_depth+1,
max_depth=max_depth,
min_samples_leaf=min_samples_leaf)
self.split_dim = split_index
self.split_value = split_value
self.depth = current_depth
def find_best_split(self, sample_X, sample_Y, criterion='gini'):
"""
Проходит по (условно) всем возможным разбиениям, находит такое, что обеспечивает наибольший gain.
Возвращает соответствующий индекс столба и значение, по которому осуществлялось это разбиение.
"""
if criterion == 'gini':
criterion = gini
elif criterion == 'entropy':
criterion = entropy
best_gain = -np.inf
best_feature_index = 0
best_split_value = 0
for i, feature in enumerate(sample_X.T):
min_value = np.min(feature)
max_value = np.max(feature)
n_splitters = min(10, len(np.unique(feature)))
split_values = np.linspace(min_value, max_value, n_splitters + 2)
for sv in split_values:
if sv in (min_value, max_value):
continue
left_y = sample_Y[feature < sv]
right_y = sample_Y[feature >= sv]
sv_gain = gain(left_y, right_y)
if sv_gain > best_gain:
best_gain = sv_gain
best_feature_index = i
best_split_value = sv
return best_feature_index, best_split_value
def predict_proba(self, X):
left_index = X[:, self.split_dim] < self.split_value
right_index = X[:, self.split_dim] >= self.split_value
pred = np.zeros(len(X), dtype='object')
if isinstance(self.left, DecisionTreeLeaf):
pred[left_index] = self.left.prob
else:
assert isinstance(self.left, DecisionTreeNode)
left_X = X[left_index, :]
pred[left_index] = self.left.predict_proba(left_X)
if isinstance(self.right, DecisionTreeLeaf):
pred[right_index] = self.right.prob
else:
assert isinstance(self.right, DecisionTreeNode)
right_X = X[right_index, :]
pred[right_index] = self.right.predict_proba(right_X)
return pred
class DecisionTreeClassifier:
def __init__(self, criterion="gini", max_depth=None, min_samples_leaf=1):
self.root = None
self.criterion = criterion
self.max_depth = max_depth
self.min_samples_leaf = min_samples_leaf
def fit(self, X, y):
if len(np.unique(y)) == 1:
self.root = DecisionTreeLeaf(y)
else:
self.root = DecisionTreeNode(X, y,
criterion=self.criterion,
current_depth=2,
max_depth=self.max_depth,
min_samples_leaf=self.min_samples_leaf)
def predict_proba(self, X):
if isinstance(self.root, DecisionTreeLeaf):
return [self.root.prob] * len(X)
else:
return self.root.predict_proba(X)
def predict(self, X):
proba = self.predict_proba(X)
return [max(p.keys(), key=lambda k: p[k]) for p in proba]