-
Notifications
You must be signed in to change notification settings - Fork 0
/
least_squares_risk_parity.py
134 lines (94 loc) · 3.76 KB
/
least_squares_risk_parity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import pandas as pd
import numpy as np
import math
from scipy.optimize import minimize
#-----------------------------------------------------------------------------------------------------------------------------------------------------------#
#-----------#
# Functions #
#-----------#
def RP_X_sum(params_rp, Q, X_sum):
'''
Sum(weights) constraint
'''
# Selecting portfolio weights
X = params_rp[:Q.shape[0]]
sum_X = sum(X) - X_sum
return sum_X
def F_RP_X(params_rp, Q, rho):
'''
Objective Function F, min X
'''
# Slicing array
X_n = Q.shape[0]
X, theta = params_rp[:X_n], params_rp[-1]
least_squares = [math.pow(X[N]*Q[N]@X - theta, 2) for N, i in enumerate(X)]
F = sum(least_squares) + rho*X.T@Q@X
return F
def RP_opt(covariance_matrix, LB_UB_x, X_sum, rho):
'''
Risk Parity optimization
'''
# Loading variables
LB_x, UB_x, Q = LB_UB_x[0], LB_UB_x[0], covariance_matrix
LB_theta, UB_theta = -50, 50
init_guess_X = list(np.random.uniform(low=LB_x , high=UB_x, size=len(Q)))
init_guess_theta = list(np.random.uniform(low=LB_theta, high=UB_theta, size=1))
init_guess_X_theta = np.array(init_guess_X + init_guess_theta)
bounds_X_theta = np.array([LB_UB_x for _ in Q] + [(LB_theta, UB_theta)])
opt_dict = { 'fun': F_RP_X,
'x0': init_guess_X_theta,
'args': tuple((Q, rho)),
'bounds': bounds_X_theta,
'constraints': {'type': 'eq', 'fun': RP_X_sum, 'args': tuple((Q, X_sum))},
'tol': math.pow(10, -14),
'options': {'maxiter': math.pow(10, 6)}}
opt_report = minimize(**opt_dict)
return opt_report
def sequential_mv_rp(covariance_matrix, LB_x, UB_x, X_sum):
'''
Minimum variance with risk parity
Sequential min-variance risk parity algorithm
'''
Q = covariance_matrix
rho_L = []
rho_start = 4
for k in list(range(0, 12)):
rho_i = math.pow(10, rho_start)
rho_start -= 1
rho_L.append(rho_i)
RP_solutions = []
n_RP_trials = 50
for rho_i in rho_L:
print('\n====== Rho i: {} '.format(rho_i))
for trial in range(n_RP_trials):
rp_opt_dict = { 'covariance_matrix': Q,
'LB_UB_x': tuple((LB_x, UB_x)),
'X_sum': X_sum,
'rho': rho_i}
result = RP_opt(**rp_opt_dict)
RP_X = result.x[:len(Q)]
# Check RP solution
RP_variance = RP_X@Q@RP_X
RP_vol = math.pow(RP_variance, 0.5)
RC_target = (RP_variance)/len(RP_X)
RC_L = ([RP_X[N]*Q[N]@RP_X for N, i in enumerate(RP_X)])
RC_tolerance = math.pow(10, -5)
RC_test = [abs(RC_i - RC_target) for RC_i in RC_L]
if ((all(i <= RC_tolerance for i in RC_test)) == True):
RP_solutions.append(tuple((RP_X, RP_vol)))
RP_X_sorted = sorted(RP_solutions, key= lambda x: x[1])
RP_X = RP_X_sorted[0]
return RP_X_sorted, RP_X
#-----------------------------------------------------------------------------------------------------------------------------------------------------------#
#------#
# Main #
#------#
# Variance-Covarariance Matrix Q
Q = np.array([[1, -0.9, 0.6],
[-0.9, 1, -0.2],
[0.6, -0.2, 4]])
RP_param_dict = {'covariance_matrix': Q, 'LB_x': -1, 'UB_x': 1, 'X_sum': 1}
RP_report = sequential_mv_rp(**RP_param_dict)
RP_max_vol, RP_min_vol = RP_report[0][-1], RP_report[0][0]
print(RP_max_vol)
print(RP_min_vol)