-
Notifications
You must be signed in to change notification settings - Fork 0
/
lincircorrel.py
212 lines (164 loc) · 7.68 KB
/
lincircorrel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
#!/usr/bin/env python
'''
This script plots and calculates circular-linear correlations between
all possible combinations of circular variables x and linear variables y
By Luis Sosa
'''
from mpmath import *
import random
import sys
from pandas import Series
from scipy.stats import pearsonr
import matplotlib.pyplot as plt
from scipy import mean
from statistics import median
from copy import copy
import csv
def calculate_correlation_coef(cicrcular, linear, parametric=True):
''' Calculates correlation coefficient '''
# Get dataset length
assert (len(linear) == len(circular)), "The length of the circular and linear variable arrays must match"
n = len(linear)
# Parametric test
if parametric:
c = [float(cos(entry)) for entry in circular]
s = [float(sin(entry)) for entry in circular]
rxc = pearsonr(linear, c)[0]
rxs = pearsonr(linear, s)[0]
rcs = pearsonr(c, s)[0]
r2 = (rxc**2 + rxs**2 - 2*rxc*rxs*rcs)/(1-rcs**2)
return sqrt(r2)
# Non-parametric test
else:
# Ranked data
cir_ranks = Series(circular).rank()
lin_ranks = Series(linear).rank()
# Interim calculations
beta = [2*pi*entry/n for entry in cir_ranks]
C = sum([x_i*cos(beta_i) for beta_i,x_i in zip(beta, lin_ranks)])
S = sum([x_i*sin(beta_i) for beta_i,x_i in zip(beta, lin_ranks)])
# Denormalized correlation coefficient
# U = 24*(C**2 + S**2)/(n**2 * (n+1))
# Normalization coefficient
alpha = 2*(sin(pi/n))**4 / (1 + (cos(pi/n)))**3 if n%2 \
else 1/(1 + 5*(cot(pi/n))**2 + 4*(cot(pi/n))**4)
# Normalized correlation coefficient
D = alpha*(C**2 + S**2)
return D
def calculate_correlation(circular, linear, parametric=True, iterations=100):
''' Calculates circular statistics and performs permutation test to determine p-value '''
actual_correl = calculate_correlation_coef(circular, linear, parametric=parametric)
stronger_correls = 0
rand_linear = copy(linear)
for i in range(iterations):
# Continually shuffle data and recalculate correlation
random.shuffle(rand_linear)
simulated_correl = calculate_correlation_coef(circular, rand_linear, parametric=parametric)
# Note random correlation as strong or stronger than observed correlation
if abs(simulated_correl) >= actual_correl:
stronger_correls += 1
# p-value is the ratio of stronger random correlations there are
p_val = stronger_correls/iterations
return (round(actual_correl,3), round(p_val,3))
def plot_data(circular, linear, correl, pval, title="", dual=True):
''' Plots data and correlation statistics on circular histogram '''
# Create plot
plt.figure()
ax = plt.subplot(111, polar=True)
bars = ax.bar(circular, linear, width=2*pi/len(circular), edgecolor='black')
# Use custom colors and opacity
for r, bar in zip(linear, bars):
bar.set_facecolor((random.random(), 1, random.random()))
bar.set_alpha(0.8)
# Adjust labelling
plt.title(f"{title}\n")
ax.set_xticklabels(['N', '', 'E', '', 'S', '', 'W', ''])
ax.set_theta_offset(pi/2)
ax.set_theta_direction(-1)
# Add corelation data
if dual:
plt.figtext(0.15, 0.1, f"Parametric:\nR = {correl[0]}\np-val={p_val[0]}")
plt.text(0.9, 0, f"Non-parametric:\nR = {correl[1]}\np-val={p_val[1]}", transform=ax.transAxes)
else:
plt.figtext(0.15, 0.1, f"R = {correl}\np-val={p_val}")
# Show plot non-blockingly
plt.ion()
plt.show()
def read_input(datafile, degrees=True):
with open(datafile) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
line_count = 0
linear_columns = []
circular_columns = []
compounded_series = []
linear_names = []
circular_names = []
for row in csv_reader:
# Detect and store series header
if line_count == 0:
for i in range(len(row)):
# Headers preceeded by _x_ are circular series
if row[i].find("_x_") == 0:
circular_columns.append(i)
circular_names.append(row[i][3:])
# Headers preceeded by _y_ are linear series
elif row[i].find("_y_") == 0:
linear_columns.append(i)
linear_names.append(row[i][3:])
# Create a list of tuples to accomodate all pairings of linear and circular variables
compounded_series = [[[],[]] for _ in range((len(linear_columns)*len(circular_columns)))]
# Store data with its corresponding series
else:
for i in range(len(circular_columns)):
circular = circular_columns[i]
# Exclude missing data
if row[circular] == "-1":
continue
for j in range(len(linear_columns)):
linear = linear_columns[j]
# Exclude missing data
if row[linear] == "-1":
continue
# Compounded series will contain all combinations of linear and circular variables
else:
x = float(row[circular])
y = float(row[linear])
# Cast angular data into radians
if degrees:
x = float(x*pi/180)
pass
compounded_series[i*len(linear_columns) + j][0].append(x)
compounded_series[i*len(linear_columns) + j][1].append(y)
line_count += 1
print(f'Processed {line_count} lines, producing {len(compounded_series)} distinct combinations')
titles = []
for x in circular_names:
for y in linear_names:
titles.append(f'{x} vs {y}')
return compounded_series, titles
if __name__ == "__main__":
print('ass')
quit()
# Get input file location
if len(sys.argv) < 2:
datafile = input("Please input the filepath of the .csv file you wish to analyze: ")
else:
datafile = sys.argv[1]
# Relevance contains the minimum correlation and p-value that are considered relevant enough to plot
relevance = (0.2, 0.1)
# Parse input and generate compounded data
compounded_data, titles = read_input(datafile)
# Calculate correlation and plot data for each circular-linear pair
for i in range(len(compounded_data)):
series = compounded_data[i]
circular = series[0]
linear = series[1]
correl = [None] * 2
p_val = [None] * 2
correl[0], p_val[0] = calculate_correlation(circular, linear, parametric=True, iterations=1000)
correl[1], p_val[1] = calculate_correlation(circular, linear, parametric=False, iterations=1000)
# Display only relevant correlations
if( (correl[0] >= relevance[0] and p_val[0] <= relevance[1]) or (correl[1] >= relevance[0] and p_val[1] <= relevance[1]) ):
plot_data(circular, linear, correl, p_val, title=titles[i], dual=True)
print(f"{round((i+1)/len(compounded_data) * 100,0)}% complete")
input("Press any key to exit...")