forked from awerries/online-svr
-
Notifications
You must be signed in to change notification settings - Fork 0
/
online_svr.py
562 lines (524 loc) · 22.7 KB
/
online_svr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
#!/usr/bin/env python3
"""Implementation of Online Support Vector Regression (OSVR) as library for a class project in 16-831
Statistical Techniques in Robotics.
Requires Python 3.5
Author: Adam Werries, [email protected], 12/2015.
Adapted from MATLAB code available at http://onlinesvr.altervista.org/
Parameters defined in main() below. C is the regularization parameter, essentially defining the limit on how close the learner must adhere to the dataset (smoothness). Epsilon is the acceptable error, and defines the width of what is sometimes called the "SVR tube". The kernel parameter is the scaling factor for comparing feature distance (this implementation uses a Radial Basis Function).
"""
import sys
import numpy as np
def sign(x):
""" Returns sign. Numpys sign function returns 0 instead of 1 for zero values. :( """
if x >= 0:
return 1
else:
return -1
class OnlineSVR:
def __init__(self, numFeatures, C, eps, kernelParam, bias = 0, debug = False):
# Configurable Parameters
self.numFeatures = numFeatures
self.C = C
self.eps = eps
self.kernelParam = kernelParam
self.bias = bias
self.debug = debug
print('SELF',self.C,self.eps,self.kernelParam)
# Algorithm initialization
self.numSamplesTrained = 0
self.weights = np.array([])
# Samples X (features) and Y (truths)
self.X = list()
self.Y = list()
# Working sets, contains indices pertaining to X and Y
self.supportSetIndices = list()
self.errorSetIndices = list()
self.remainderSetIndices = list()
self.R = np.matrix([])
def findMinVariation(self, H, beta, gamma, i):
""" Finds the variations of each sample to the new set.
Lc1: distance of the new sample to the SupportSet
Lc2: distance of the new sample to the ErrorSet
Ls(i): distance of the support samples to the ErrorSet/RemainingSet
Le(i): distance of the error samples to the SupportSet
Lr(i): distance of the remaining samples to the SupportSet
"""
# Find direction q of the new sample
q = -sign(H[i])
# Compute variations
Lc1 = self.findVarLc1(H, gamma, q, i)
q = sign(Lc1)
Lc2 = self.findVarLc2(H, q, i)
Ls = self.findVarLs(H, beta, q)
Le = self.findVarLe(H, gamma, q)
Lr = self.findVarLr(H, gamma, q)
# Check for duplicate minimum values, grab one with max gamma/beta, set others to inf
# Support set
if Ls.size > 1:
minS = np.abs(Ls).min()
results = np.array([k for k,val in enumerate(Ls) if np.abs(val)==minS])
if len(results) > 1:
betaIndex = beta[results+1].argmax()
Ls[results] = q*np.inf
Ls[results[betaIndex]] = q*minS
# Error set
if Le.size > 1:
minE = np.abs(Le).min()
results = np.array([k for k,val in enumerate(Le) if np.abs(val)==minE])
if len(results) > 1:
errorGamma = gamma[self.errorSetIndices]
gammaIndex = errorGamma[results].argmax()
Le[results] = q*np.inf
Le[results[gammaIndex]] = q*minE
# Remainder Set
if Lr.size > 1:
minR = np.abs(Lr).min()
results = np.array([k for k,val in enumerate(Lr) if np.abs(val)==minR])
if len(results) > 1:
remGamma = gamma[self.remainderSetIndices]
gammaIndex = remGamma[results].argmax()
Lr[results] = q*np.inf
Lr[results[gammaIndex]] = q*minR
# Find minimum absolute variation of all, retain signs. Flag determines set-switching cases.
minLsIndex = np.abs(Ls).argmin()
minLeIndex = np.abs(Le).argmin()
minLrIndex = np.abs(Lr).argmin()
minIndices = [None, None, minLsIndex, minLeIndex, minLrIndex]
minValues = np.array([Lc1, Lc2, Ls[minLsIndex], Le[minLeIndex], Lr[minLrIndex]])
if np.abs(minValues).min() == np.inf:
print('No weights to modify! Something is wrong.')
sys.exit()
flag = np.abs(minValues).argmin()
if self.debug:
print('MinValues',minValues)
return minValues[flag], flag, minIndices[flag]
def findVarLc1(self, H, gamma, q, i):
# weird hacks below
Lc1 = np.nan
if gamma.size < 2:
g = gamma
else:
g = gamma.item(i)
# weird hacks above
if g <= 0:
Lc1 = np.array(q*np.inf)
elif H[i] > self.eps and -self.C < self.weights[i] and self.weights[i] <= 0:
Lc1 = (-H[i] + self.eps) / g
elif H[i] < -self.eps and 0 <= self.weights[i] and self.weights[i] <= self.C:
Lc1 = (-H[i] - self.eps) / g
else:
print('Something is weird.')
print('i',i)
print('q',q)
print('gamma',gamma)
print('g',g)
print('H[i]',H[i])
print('weights[i]',self.weights[i])
if np.isnan(Lc1):
Lc1 = np.array(q*np.inf)
return Lc1.item()
def findVarLc2(self, H, q, i):
if len(self.supportSetIndices) > 0:
if q > 0:
Lc2 = -self.weights[i] + self.C
else:
Lc2 = -self.weights[i] - self.C
else:
Lc2 = np.array(q*np.inf)
if np.isnan(Lc2):
Lc2 = np.array(q*np.inf)
return Lc2
def findVarLs(self, H, beta, q):
if len(self.supportSetIndices) > 0 and len(beta) > 0:
Ls = np.zeros([len(self.supportSetIndices),1])
supportWeights = self.weights[self.supportSetIndices]
supportH = H[self.supportSetIndices]
for k in range(len(self.supportSetIndices)):
if q*beta[k+1] == 0:
Ls[k] = q*np.inf
elif q*beta[k+1] > 0:
if supportH[k] > 0:
if supportWeights[k] < -self.C:
Ls[k] = (-supportWeights[k] - self.C) / beta[k+1]
elif supportWeights[k] <= 0:
Ls[k] = -supportWeights[k] / beta[k+1]
else:
Ls[k] = q*np.inf
else:
if supportWeights[k] < 0:
Ls[k] = -supportWeights[k] / beta[k+1]
elif supportWeights[k] <= self.C:
Ls[k] = (-supportWeights[k] + self.C) / beta[k+1]
else:
Ls[k] = q*np.inf
else:
if supportH[k] > 0:
if supportWeights[k] > 0:
Ls[k] = -supportWeights[k] / beta[k+1]
elif supportWeights[k] >= -self.C:
Ls[k] = (-supportWeights[k] - self.C) / beta[k+1]
else:
Ls[k] = q*np.inf
else:
if supportWeights[k] > self.C:
Ls[k] = (-supportWeights[k] + self.C) / beta[k+1]
elif supportWeights[k] >= self.C:
Ls[k] = -supportWeights[k] / beta[k+1]
else:
Ls[k] = q*np.inf
else:
Ls = np.array([q*np.inf])
# Correct for NaN
Ls[np.isnan(Ls)] = q*np.inf
if Ls.size > 1:
Ls.shape = (len(Ls),1)
# Check for broken signs
for val in Ls:
if sign(val) == -sign(q) and val != 0:
print('Sign mismatch error in Ls! Exiting.')
sys.exit()
# print('findVarLs',Ls)
return Ls
def findVarLe(self, H, gamma, q):
if len(self.errorSetIndices) > 0:
Le = np.zeros([len(self.errorSetIndices),1])
errorGamma = gamma[self.errorSetIndices]
errorWeights = self.weights[self.errorSetIndices]
errorH = H[self.errorSetIndices]
for k in range(len(self.errorSetIndices)):
if q*errorGamma[k] == 0:
Le[k] = q*np.inf
elif q*errorGamma[k] > 0:
if errorWeights[k] > 0:
if errorH[k] < -self.eps:
Le[k] = (-errorH[k] - self.eps) / errorGamma[k]
else:
Le[k] = q*np.inf
else:
if errorH[k] < self.eps:
Le[k] = (-errorH[k] + self.eps) / errorGamma[k]
else:
Le[k] = q*np.inf
else:
if errorWeights[k] > 0:
if errorH[k] > -self.eps:
Le[k] = (-errorH[k] - self.eps) / errorGamma[k]
else:
Le[k] = q*np.inf
else:
if errorH[k] > self.eps:
Le[k] = (-errorH[k] + self.eps) / errorGamma[k]
else:
Le[k] = q*np.inf
else:
Le = np.array([q*np.inf])
# Correct for NaN
Le[np.isnan(Le)] = q*np.inf
if Le.size > 1:
Le.shape = (len(Le),1)
# Check for broken signs
for val in Le:
if sign(val) == -sign(q) and val != 0:
print('Sign mismatch error in Le! Exiting.')
sys.exit()
# print('findVarLe',Le)
return Le
def findVarLr(self, H, gamma, q):
if len(self.remainderSetIndices) > 0:
Lr = np.zeros([len(self.remainderSetIndices),1])
remGamma = gamma[self.remainderSetIndices]
remH = H[self.remainderSetIndices]
for k in range(len(self.remainderSetIndices)):
if q*remGamma[k] == 0:
Lr[k] = q*np.inf
elif q*remGamma[k] > 0:
if remH[k] < -self.eps:
Lr[k] = (-remH[k] - self.eps) / remGamma[k]
elif remH[k] < self.eps:
Lr[k] = (-remH[k] + self.eps) / remGamma[k]
else:
Lr[k] = q*np.inf
else:
if remH[k] > self.eps:
Lr[k] = (-remH[k] + self.eps) / remGamma[k]
elif remH[k] > -self.eps:
Lr[k] = (-remH[k] - self.eps) / remGamma[k]
else:
Lr[k] = q*np.inf
else:
Lr = np.array([q*np.inf])
# Correct for NaN
Lr[np.isnan(Lr)] = q*np.inf
if Lr.size > 1:
Lr.shape = (len(Lr),1)
# Check for broken signs
for val in Lr:
if sign(val) == -sign(q) and val != 0:
print('Sign mismatch error in Lr! Exiting.')
sys.exit()
# print('findVarLr',Lr)
return Lr
def computeKernelOutput(self, set1, set2):
"""Compute kernel output. Uses a radial basis function kernel."""
X1 = np.matrix(set1)
X2 = np.matrix(set2).T
# Euclidean distance calculation done properly
[S,R] = X1.shape
[R2,Q] = X2.shape
X = np.zeros([S,Q])
if Q < S:
copies = np.zeros(S,dtype=int)
for q in range(Q):
if self.debug:
print('X1',X1)
print('X2copies',X2.T[q+copies,:])
print('power',np.power(X1-X2.T[q+copies,:],2))
xsum = np.sum(np.power(X1-X2.T[q+copies,:],2),axis=1)
xsum.shape = (xsum.size,)
X[:,q] = xsum
else:
copies = np.zeros(Q,dtype=int)
for i in range(S):
X[i,:] = np.sum(np.power(X1.T[:,i+copies]-X2,2),axis=0)
X = np.sqrt(X)
y = np.matrix(np.exp(-self.kernelParam*X**2))
if self.debug:
print('distance',X)
print('kernelOutput',y)
return y
def predict(self, newSampleX):
X = np.array(self.X)
newX = np.array(newSampleX)
weights = np.array(self.weights)
weights.shape = (weights.size,1)
if self.numSamplesTrained > 0:
y = self.computeKernelOutput(X, newX)
return (weights.T @ y).T + self.bias
else:
return np.zeros_like(newX) + self.bias
def computeMargin(self, newSampleX, newSampleY):
fx = self.predict(newSampleX)
newSampleY = np.array(newSampleY)
newSampleY.shape = (newSampleY.size, 1)
if self.debug:
print('fx',fx)
print('newSampleY',newSampleY)
print('hx',fx-newSampleY)
return fx-newSampleY
def computeBetaGamma(self,i):
"""Returns beta and gamma arrays."""
# Compute beta vector
X = np.array(self.X)
Qsi = self.computeQ(X[self.supportSetIndices,:], X[i,:])
if len(self.supportSetIndices) == 0 or self.R.size == 0:
beta = np.array([])
else:
beta = -self.R @ np.append(np.matrix([1]),Qsi,axis=0)
# Compute gamma vector
Qxi = self.computeQ(X, X[i,:])
Qxs = self.computeQ(X, X[self.supportSetIndices,:])
if len(self.supportSetIndices) == 0 or Qxi.size == 0 or Qxs.size == 0 or beta.size == 0:
gamma = np.array(np.ones_like(Qxi))
else:
gamma = Qxi + np.append(np.ones([self.numSamplesTrained,1]), Qxs, 1) @ beta
# Correct for NaN
beta[np.isnan(beta)] = 0
gamma[np.isnan(gamma)] = 0
if self.debug:
print('R',self.R)
print('beta',beta)
print('gamma',gamma)
return beta, gamma
def computeQ(self, set1, set2):
set1 = np.matrix(set1)
set2 = np.matrix(set2)
Q = np.matrix(np.zeros([set1.shape[0],set2.shape[0]]))
for i in range(set1.shape[0]):
for j in range(set2.shape[0]):
Q[i,j] = self.computeKernelOutput(set1[i,:],set2[j,:])
return np.matrix(Q)
def adjustSets(self, H, beta, gamma, i, flag, minIndex):
print('Entered adjustSet logic with flag {0} and minIndex {1}.'.format(flag,minIndex))
if flag not in range(5):
print('Received unexpected flag {0}, exiting.'.format(flag))
sys.exit()
# add new sample to Support set
if flag == 0:
print('Adding new sample {0} to support set.'.format(i))
H[i] = np.sign(H[i])*self.eps
self.supportSetIndices.append(i)
self.R = self.addSampleToR(i,'SupportSet',beta,gamma)
return H,True
# add new sample to Error set
elif flag == 1:
print('Adding new sample {0} to error set.'.format(i))
self.weights[i] = np.sign(self.weights[i])*self.C
self.errorSetIndices.append(i)
return H,True
# move sample from Support set to Error or Remainder set
elif flag == 2:
index = self.supportSetIndices[minIndex]
weightsValue = self.weights[index]
if np.abs(weightsValue) < np.abs(self.C - abs(weightsValue)):
self.weights[index] = 0
weightsValue = 0
else:
self.weights[index] = np.sign(weightsValue)*self.C
weightsValue = self.weights[index]
# Move from support to remainder set
if weightsValue == 0:
print('Moving sample {0} from support to remainder set.'.format(index))
self.remainderSetIndices.append(index)
self.R = self.removeSampleFromR(minIndex)
self.supportSetIndices.pop(minIndex)
# move from support to error set
elif np.abs(weightsValue) == self.C:
print('Moving sample {0} from support to error set.'.format(index))
self.errorSetIndices.append(index)
self.R = self.removeSampleFromR(minIndex)
self.supportSetIndices.pop(minIndex)
else:
print('Issue with set swapping, flag 2.','weightsValue:',weightsValue)
sys.exit()
# move sample from Error set to Support set
elif flag == 3:
index = self.errorSetIndices[minIndex]
print('Moving sample {0} from error to support set.'.format(index))
H[index] = np.sign(H[index])*self.eps
self.supportSetIndices.append(index)
self.errorSetIndices.pop(minIndex)
self.R = self.addSampleToR(index, 'ErrorSet', beta, gamma)
# move sample from Remainder set to Support set
elif flag == 4:
index = self.remainderSetIndices[minIndex]
print('Moving sample {0} from remainder to support set.'.format(index))
H[index] = np.sign(H[index])*self.eps
self.supportSetIndices.append(index)
self.remainderSetIndices.pop(minIndex)
self.R = self.addSampleToR(index, 'RemainingSet', beta, gamma)
return H, False
def addSampleToR(self, sampleIndex, sampleOldSet, beta, gamma):
print('Adding sample {0} to R matrix.'.format(sampleIndex))
X = np.array(self.X)
sampleX = X[sampleIndex,:]
sampleX.shape = (sampleX.size/self.numFeatures,self.numFeatures)
# Add first element
if self.R.shape[0] <= 1:
Rnew = np.ones([2,2])
Rnew[0,0] = -self.computeKernelOutput(sampleX,sampleX)
Rnew[1,1] = 0
# Other elements
else:
# recompute beta/gamma if from error/remaining set
if sampleOldSet == 'ErrorSet' or sampleOldSet == 'RemainingSet':
# beta, gamma = self.computeBetaGamma(sampleIndex)
Qii = self.computeKernelOutput(sampleX, sampleX)
Qsi = self.computeKernelOutput(X[self.supportSetIndices[0:-1],:], sampleX)
beta = -self.R @ np.append(np.matrix([1]),Qsi,axis=0)
beta[np.isnan(beta)] = 0
beta.shape = (len(beta),1)
gamma[sampleIndex] = Qii + np.append(1,Qsi.T)@beta
gamma[np.isnan(gamma)] = 0
gamma.shape = (len(gamma),1)
# add a column and row of zeros onto right/bottom of R
r,c = self.R.shape
Rnew = np.append(self.R, np.zeros([r,1]), axis=1)
Rnew = np.append(Rnew, np.zeros([1,c+1]), axis=0)
# update R
if gamma[sampleIndex] != 0:
# Numpy so wonky! SO WONKY.
beta1 = np.append(beta, [[1]], axis=0)
Rnew = Rnew + 1/gamma[sampleIndex].item()*[email protected]
if np.any(np.isnan(Rnew)):
print('R has become inconsistent. Training failed at sampleIndex {0}'.format(sampleIndex))
sys.exit()
return Rnew
def removeSampleFromR(self, sampleIndex):
print('Removing sample {0} from R matrix.'.format(sampleIndex))
sampleIndex += 1
I = list(range(sampleIndex))
I.extend(range(sampleIndex+1,self.R.shape[0]))
I = np.array(I)
I.shape = (1,I.size)
if self.debug:
print('I',I)
print('RII',self.R[I.T,I])
# Adjust R
if self.R[sampleIndex,sampleIndex] != 0:
Rnew = self.R[I.T,I] - (self.R[I.T,sampleIndex]*self.R[sampleIndex,I]) / self.R[sampleIndex,sampleIndex].item()
else:
Rnew = np.copy(self.R[I.T,I])
# Check for bad things
if np.any(np.isnan(Rnew)):
print('R has become inconsistent. Training failed removing sampleIndex {0}'.format(sampleIndex))
sys.exit()
if Rnew.size == 1:
print('Time to annhilate R? R:',Rnew)
Rnew = np.matrix([])
return Rnew
def learn(self, newSampleX, newSampleY):
self.numSamplesTrained += 1
self.X.append(newSampleX)
self.Y.append(newSampleY)
self.weights = np.append(self.weights,0)
i = self.numSamplesTrained - 1 # stupid off-by-one errors
H = self.computeMargin(self.X, self.Y)
# correctly classified sample, skip the rest of the algorithm!
if (abs(H[i]) <= self.eps):
print('Adding new sample {0} to remainder set, within eps.'.format(i))
if self.debug:
print('weights',self.weights)
self.remainderSetIndices.append(i)
return
newSampleAdded = False
iterations = 0
while not newSampleAdded:
# Ensure we're not looping infinitely
iterations += 1
if iterations > self.numSamplesTrained*100:
print('Warning: we appear to be in an infinite loop.')
sys.exit()
iterations = 0
# Compute beta/gamma for constraint optimization
beta, gamma = self.computeBetaGamma(i)
# Find minimum variation and determine how we should shift samples between sets
deltaC, flag, minIndex = self.findMinVariation(H, beta, gamma, i)
# Update weights and bias based on variation
if len(self.supportSetIndices) > 0 and len(beta)>0:
self.weights[i] += deltaC
delta = beta*deltaC
self.bias += delta.item(0)
# numpy is wonky...
weightDelta = np.array(delta[1:])
weightDelta.shape = (len(weightDelta),)
self.weights[self.supportSetIndices] += weightDelta
H += gamma*deltaC
else:
self.bias += deltaC
H += deltaC
# Adjust sets, moving samples between them according to flag
H,newSampleAdded = self.adjustSets(H, beta, gamma, i, flag, minIndex)
if self.debug:
print('weights',self.weights)
def main(argv):
# Test of Online SVR algorithm
debug = True if len(argv)>1 and argv[1] == 'debug' else False
#testSetX = np.array([[0.1],[0.2],[0.3],[0.4],[0.5]])
testSetX = np.random.rand(10,2)
testSetY = np.sin(2*np.pi*testSetX[:,0] + testSetX[:,1])
OSVR = OnlineSVR(numFeatures = testSetX.shape[1], C = 10, eps = 0.1, kernelParam = 30, bias = 0, debug = debug)
for i in range(testSetX.shape[0]):
print('%%%%%%%%%%%%%%% Data point {0} %%%%%%%%%%%%%%%'.format(i))
OSVR.learn(testSetX[i,:], testSetY[i])
# Predict stuff as quick test
testX = np.array([[0.15,0.1],[0.25,0.2]])
testY = np.sin(2*np.pi*testX)
print('testX:',testX)
print('testY:',testY)
PredictedY = OSVR.predict(np.array([testX[0,:]]))
Error = OSVR.computeMargin(testX[0],testY[0])
print('PredictedY:',PredictedY)
print('Error:',Error)
return OSVR
if __name__ == '__main__':
main(sys.argv)