-
Notifications
You must be signed in to change notification settings - Fork 3
/
utils_system.py
213 lines (182 loc) · 6.96 KB
/
utils_system.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import os, shutil, stat, errno, time, datetime, re
import pandas as pd
import numpy as np
from utils import path_prefix_free
import imageio, cv2
from os.path import join as oj
def archive_by_date(src_folder, dest_folder, archive_date):
assert os.path.exists(dest_folder)
os.chdir(src_folder)
tomoves = []
exceptions = ["_MouseBrain_Atlas3", "Lab 4CR Digging Masterfile", "Talks"]
for name in os.listdir(src_folder):
if (name[0] != "_") and (
datetime.datetime.fromtimestamp(os.path.getatime(name)) < archive_date
):
# atime: access time, mtime: modify time
# print(name, datetime.datetime.fromtimestamp(os.path.getmtime(name)))
if name not in exceptions:
tomoves.append(name)
for name in tomoves:
try:
print("moving", name)
shutil.move(name, os.path.join(dest_folder, name))
except Exception:
print("skipping", name)
def fix_files():
import os
import datetime
def str2date(s):
# str2time("12:00:00")
return datetime.datetime.strptime(s, "%Y-%m-%d").date()
folder = r"Z:\2ABT\ProbSwitch\BSDML_FP\BSD019"
dt0 = str2date("2022-08-09")
pv0 = 152
for f in os.listdir(folder):
pv = f.split("_")[2]
dt = str2date(f.split("_")[4].split("T")[0])
pvn = int(pv[1:])
pvn_new = pv0 + (dt - dt0).days
if pvn_new != pvn:
new_pv = f"p{pvn_new}"
os.rename(
os.path.join(folder, f), os.path.join(folder, f.replace(pv, new_pv))
)
def chunk_video_sample_from_file(filename, out_folder, fps, duration):
assert path_prefix_free(filename).count(".") == 1, "has to contain only one ."
file_code = path_prefix_free(filename).split(".")[0]
suffix = path_prefix_free(filename).split(".")[1]
w = imageio.get_writer(
os.path.join(out_folder, f"{file_code}_sample.{suffix}"),
format="FFMPEG",
mode="I",
fps=fps,
) # figure out what mode means
vid = imageio.get_reader(filename)
for ith, img in enumerate(vid):
if ith < duration * fps:
print("writing", img.shape, ith)
w.append_data(img)
if ith >= duration * fps:
print("all done")
break
w.close()
def chunk_four_vids_and_stitch(
filenames, ts_names, out_folder, fps, duration, time_zero=0
):
vids = [imageio.get_reader(fname) for fname in filenames]
tss = [(pd.read_csv(tsn, names=["time"]) - time_zero) / 1000 for tsn in ts_names]
file_code = path_prefix_free(filenames[0]).split(".")[0][3:]
suffix = path_prefix_free(filenames[0]).split(".")[1]
w = imageio.get_writer(
os.path.join(out_folder, f"{file_code}_sample.{suffix}"),
format="FFMPEG",
mode="I",
fps=fps,
) # figure out what mode means
alt_order = {2: 0, 1: 1, 3: 2, 0: 3}
total_frames = min(duration * fps, min(len(tsdf) for tsdf in tss))
for i in range(total_frames):
frames = [[None, None], [None, None]]
for j in range(4):
jframe = vids[j].get_data(i)
jtime = np.around(tss[j].time[i], 2)
pjframe = cv2.putText(
jframe,
f"R{j+1}: {jtime}",
(10, 200),
cv2.FONT_HERSHEY_SIMPLEX,
0.4,
(255, 255, 255),
2,
)
r, c = alt_order[j] // 2, alt_order[j] % 2
frames[r][c] = pjframe
frames[0] = np.concatenate(frames[0], axis=1)
frames[1] = np.concatenate(frames[1], axis=1)
final_frame = np.concatenate(frames, axis=0)
w.append_data(final_frame)
print(f"writing {i}th")
w.close()
def chunk_helper_session(animal, session, folder, out_folder, duration=20 * 60):
fps = 30
vid_files = [None] * 4
vid_times = [None] * 4
for f in os.listdir(os.path.join(folder, animal)):
if (animal in f) and (session in f) and (f.startswith("RR_")):
bfile = os.path.join(folder, animal, f)
bdf = pd.read_csv(
bfile, sep=" ", header=None, names=["time", "b_code", "none"]
)
df_zero = bdf.loc[0, "time"]
break
for f in os.listdir(os.path.join(folder, animal, "video")):
for i in range(1, 5):
vidf = f"R{i}_cam"
vtsf = f"R{i}_vidTS"
if (animal in f) and (session in f) and (f.startswith(vidf)):
vid_files[i - 1] = os.path.join(folder, animal, "video", f)
elif (animal in f) and (session in f) and (f.startswith(vtsf)):
vid_times[i - 1] = os.path.join(folder, animal, "video", f)
chunk_four_vids_and_stitch(
vid_files, vid_times, out_folder, fps, duration, time_zero=df_zero
)
"""####################################################
################### Data Structure ####################
####################################################"""
def rename_dir_files_recursive(root, namemap):
for p in os.listdir(root):
pname = os.path.join(root, p)
if os.path.isdir(pname):
rename_dir_files_recursive(pname, namemap)
for name in namemap:
if name in p:
newpname = os.path.join(root, p.replace(name, namemap[name]))
os.rename(pname, newpname)
def find_files_recursive(root, pattern, func, verbose=True):
for p in os.listdir(root):
pname = os.path.join(root, p)
if os.path.isdir(pname):
find_files_recursive(pname, pattern, func, verbose)
else:
if pattern in pname:
if verbose:
print("Found", pname)
func(pname)
def make_stim_blocks(n=20, p=0.25, zero=False):
# p: percentage of stimulation, p<0.5
"""Use this example to visualize:
import matplotlib.pyplot as plt
plt.stem(make_stim_blocks(20))
plt.xticks(np.arange(20)[4::5], np.arange(1, 21)[4::5])
"""
if zero:
return np.concatenate([[0], make_stim_blocks(n, p)])
subN = int(n * p)
stim_seq = np.zeros(n)
if subN:
randInds = np.sort(np.random.choice(n - subN, subN, replace=False)) + np.arange(
subN
)
stim_seq[randInds] = 1
return stim_seq
def make_long_stim_blocks(N, p=0.25, n=20):
all_stims = []
loops = N // n
tail = N % n
zero = False
for i in range(loops):
stim_seq = make_stim_blocks(n, p, zero)
zero = stim_seq[-1] == 1
all_stims.append(stim_seq)
all_stims.append(make_stim_blocks(tail, p, zero))
return np.concatenate(all_stims)
# from pipeline import organize_RR_structures
if __name__ == "__main__":
# archive_date = datetime.datetime.strptime("2019/05/31", "%Y/%m/%d")
# src_folder = "/Volumes/Wilbrecht_file_server"
# archive = os.path.join(src_folder, '_ARCHIVE')
# if not os.path.exists(archive):
# os.makedirs(archive)
# archive_by_date(src_folder, archive, archive_date)
pass