-
Notifications
You must be signed in to change notification settings - Fork 16
/
simulation_data.py
159 lines (131 loc) · 5.32 KB
/
simulation_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import datetime
import json
import os
import shutil
import uuid
from collections import namedtuple
from time import sleep
from typing import List, Union
from pathlib import Path
from self_driving.beamng_road_imagery import BeamNGRoadImagery
from self_driving.decal_road import DecalRoad
SimulationDataRecordProperties = ['timer', 'pos', 'dir', 'vel', 'steering',
'steering_input', 'brake', 'brake_input', 'throttle', 'throttle_input',
'wheelspeed', 'vel_kmh', 'is_oob', 'oob_counter',
'max_oob_percentage', 'oob_distance', 'oob_percentage']
SimulationDataRecord = namedtuple('SimulationDataRecord', SimulationDataRecordProperties)
SimulationDataRecords = List[SimulationDataRecord]
SimulationParams = namedtuple('SimulationParameters', ['beamng_steps', 'delay_msec'])
def delete_folder_recursively(path: Union[str, Path]):
path = str(path)
if not os.path.exists(path):
return
assert os.path.isdir(path), path
print(f'Removing [{path}]')
shutil.rmtree(path, ignore_errors=True)
# sometimes rmtree fails to remove files
for tries in range(20):
if os.path.exists(path):
sleep(0.1)
shutil.rmtree(path, ignore_errors=True)
if os.path.exists(path):
shutil.rmtree(path)
if os.path.exists(path):
raise Exception(f'Unable to remove folder [{path}]')
class SimulationInfo:
start_time: str
end_time: str
success: bool
exception_str: str
computer_name: str
ip_address: str
id: str
class SimulationData:
f_info = 'info'
f_params = 'params'
f_road = 'road'
f_records = 'records'
def __init__(self, simulation_name: str):
self.name = simulation_name
root: Path = Path(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
self.simulations: Path = root.joinpath('simulations')
self.path_root: Path = self.simulations.joinpath(simulation_name)
self.path_json: Path = self.path_root.joinpath('simulation.full.json')
self.path_partial: Path = self.path_root.joinpath('simulation.partial.tsv')
self.path_road_img: Path = self.path_root.joinpath('road')
self.id: str = None
self.params: SimulationParams = None
self.road: DecalRoad = None
self.states: SimulationDataRecord = None
self.info: SimulationInfo = None
self.exception_str = None
assert len(self.name) >= 3, 'the simulation name must be a string of at least 3 character'
@property
def n(self):
return len(self.states)
def set(self, params: SimulationParams, road: DecalRoad,
states: SimulationDataRecords, info: SimulationInfo = None):
self.params = params
self.road = road
if info:
self.info = info
else:
self.info = SimulationInfo()
self.info.id = str(uuid.uuid4())
self.states = states
def clean(self):
delete_folder_recursively(self.path_root)
def __str__(self) -> str:
return json.dumps({
self.f_params: self.params._asdict(),
self.f_info: self.info.__dict__,
self.f_road: self.road.to_dict(),
self.f_records: [r._asdict() for r in self.states]
})
def save(self):
self.path_root.mkdir(parents=True, exist_ok=True)
print(self.path_root)
with open(self.path_json, 'w') as f:
f.write(json.dumps({
self.f_params: self.params._asdict(),
self.f_info: self.info.__dict__,
self.f_road: self.road.to_dict(),
self.f_records: [r._asdict() for r in self.states]
}))
with open(self.path_partial, 'w') as f:
sep = '\t'
f.write(sep.join(SimulationDataRecordProperties) + '\n')
gen = (r._asdict() for r in self.states)
gen2 = (sep.join([str(d[key]) for key in SimulationDataRecordProperties]) + '\n' for d in gen)
f.writelines(gen2)
road_imagery = BeamNGRoadImagery.from_sample_nodes(self.road.nodes)
road_imagery.save(self.path_road_img.with_suffix('.jpg'))
road_imagery.save(self.path_road_img.with_suffix('.svg'))
def load(self) -> 'SimulationData':
with open(self.path_json, 'r') as f:
obj = json.loads(f.read())
info = SimulationInfo()
info.__dict__ = obj.get(self.f_info, {})
self.set(
SimulationParams(**obj[self.f_params]),
DecalRoad.from_dict(obj[self.f_road]),
[SimulationDataRecord(**r) for r in obj[self.f_records]],
info=info)
return self
def complete(self) -> bool:
return self.path_json.exists()
def min_oob_distance(self) -> float:
return min(state.oob_distance for state in self.states)
def start(self):
self.info.success = None
self.info.start_time = str(datetime.datetime.now())
try:
import platform
self.info.computer_name = platform.node()
except Exception as ex:
self.info.computer_name = str(ex)
def end(self, success: bool, exception=None):
self.info.end_time = str(datetime.datetime.now())
self.info.success = success
if exception:
self.exception_str = str(exception)