-
Notifications
You must be signed in to change notification settings - Fork 0
/
io.py
147 lines (124 loc) · 6.66 KB
/
io.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import numpy as np
import pickle
from .raw import rawEyes
from .classes import EyeHolder
def parse_eyes(fname, srate = 1000):#, binocular = False):
# if binocular:
# eyedata = _parse_binocular(fname, srate)
# elif not binocular:
# eyedata = _parse_monocular(fname, srate)
eyedata = _parse_eyes(fname, srate)
return eyedata
def _parse_eyes(fname, srate):
ncols_search = [6, 9] #if monocular search for 6 columns, if binocular search for 9 columns
d = open(fname, 'r')
raw_d = d.readlines()
d.close()
starts = [x for x in range(len(raw_d)) if raw_d[x] != '\n' and raw_d[x].split()[0] == 'START']
fstart = starts[0]
raw_d = raw_d[fstart:] #remove calibration info at the beginning of the file opening
starts = [x for x in range(len(raw_d)) if raw_d[x] != '\n' and raw_d[x].split()[0] == 'START']
ends = [x for x in range(len(raw_d)) if raw_d[x] != '\n' and raw_d[x].split()[0] == 'END']
# if len(ends) == len(starts)-1:
# print(f'')
eyedata = rawEyes(nblocks=len(starts), srate = srate)
eyedata.binocular=True #log that this *is* a binocular recording
# by handling binocular/monocular separately for each block of the data
# you can handle situations where you change binocular/monocular between task blocks
nsegments = len(starts)
for iseg in range(nsegments):
print(f'parsing block {iseg+1}/{nsegments}')
istart, iend = starts[iseg], ends[iseg]
rdata = raw_d[istart:iend+1]
startmsg = rdata[0].split() #parse the start message as this tells you how many eyes are recorded
if 'LEFT' in startmsg and 'RIGHT' in startmsg:
binoc = True
print('-- block recording is binocular')
else:
binoc = False
print('-- block recording is monocular')
data = rdata[7:] #cut out some of the nonsense before recording starts
idata = [x for x in data if len(x.split()) == ncols_search[int(binoc)]]
msgs = [x for x in data if len(x.split()) != ncols_search[int(binoc)]]
idata = np.asarray([x.split() for x in idata])[:, :-1] #drop the last column as it is nonsense (it's just '.....')
if iseg == 0:
fsamp = int(idata[0][0]) #get starting sample number
eyedata.fsamp = fsamp
#missing data is coded as '.' in the asc file - we need to make this usable but easily identifiable
idata = np.where(idata=='.', 'NaN', idata) #set this missing data to a string nan that numpy can handle easily
idata = idata.copy().astype(float) #convert into numbers now
segdata = EyeHolder()
segdata.binocular = binoc
if 'LEFT' in startmsg:
segdata.eyes_recorded.append('left')
if 'RIGHT' in startmsg:
segdata.eyes_recorded.append('right')
segdata.trackertime = idata[:,0]
segdata.time = np.subtract(segdata.trackertime, segdata.trackertime[0]) #time relative to the first sample
if segdata.binocular: #if binocular, add both eyes
colsadd = ['xpos_l', 'ypos_l', 'pupil_l', 'xpos_r', 'ypos_r', 'pupil_r']
for icol in range(len(colsadd)):
setattr(segdata, colsadd[icol], idata[:,icol+1])
if not segdata.binocular : #we'll log which eye was recorded for the block, but going to harmonise this down the line by saving with one name only regardless of eye
colsadd = ['xpos', 'ypos', 'pupil']
for icol in range(len(colsadd)):
setattr(segdata, colsadd[icol], idata[:, icol+1])
#some parsing of triggers etc
msgs = [x.split() for x in msgs]
if len([x for x in msgs if len(x)==0])>0: #check if we have any items that are empty and drop if so
msgs = [x for x in msgs if len(x)>0]
triggers = np.array([x for x in msgs if x[0] == 'MSG'])
segdata.triggers.timestamp = triggers[:,1].astype(int)
segdata.triggers.event_id = triggers[:,2]
eyedata.data.append(segdata)
return eyedata
def _parse_monocular(fname, srate):
#by default it's just going to look for stop/starts in the data file
d = open(fname, 'r')
raw_d = d.readlines()
d.close()
starts = [x for x in range(len(raw_d)) if raw_d[x] != '\n' and raw_d[x].split()[0] == 'START']
fstart = starts[0]
raw_d = raw_d[fstart:] #remove calibration info at the beginning of the file opening
starts = [x for x in range(len(raw_d)) if raw_d[x] != '\n' and raw_d[x].split()[0] == 'START']
ends = [x for x in range(len(raw_d)) if raw_d[x] != '\n' and raw_d[x].split()[0] == 'END']
eyedata = rawEyes(nblocks=len(starts), srate = srate)
eyedata.binocular = False #log that this is *not* a binocular recording
nsegments = len(starts)
for iseg in range(nsegments):
print(f'parsing block {iseg+1}/{nsegments}')
istart, iend = starts[iseg], ends[iseg]
data = raw_d[istart:iend+1]
data = data[7:] #cut out some of the nonsense before recording starts
idata = [x for x in data if len(x.split()) == 6]
msgs = [x for x in data if len(x.split()) != 6]
idata = np.asarray([x.split() for x in idata])[:,:-1] #drop the last column as it is nonsense
#idata now has: [trackertime, x, y, pupil, something random]
if iseg == 0:
fsamp = int(idata[0][0]) #get starting sample number
eyedata.fsamp = fsamp
#missing data is coded as '.' in the asc file - we need to make this usable but easily identifiable
idata = np.where(idata=='.', 'NaN', idata) #set this missing data to a string nan that numpy can handle easily
idata = idata.copy().astype(float) #convert into numbers now
segdata = EyeHolder()
#segdata.fsamp = fsamp
segdata.trackertime = idata[:,0]
segdata.xpos = idata[:,1]
segdata.ypos = idata[:,2]
segdata.pupil = idata[:,3]
segdata.time = np.subtract(segdata.trackertime, segdata.trackertime[0]) #time relative to the first sample
#some parsing of triggers etc
msgs = [x.split() for x in msgs]
triggers = np.array([x for x in msgs if x[0] == 'MSG'])
segdata.triggers.timestamp = triggers[:,1].astype(int)
segdata.triggers.event_id = triggers[:,2]
eyedata.data.append(segdata)
return eyedata # type: ignore
def save(obj, fname):
with open(fname, 'wb') as handle:
pickle.dump(obj, handle)
def load(fname):
#read in the raw data
with open(fname, 'rb') as handle:
data = pickle.load(handle)
return data