-
Notifications
You must be signed in to change notification settings - Fork 0
/
RunSimulation.py
147 lines (118 loc) · 5.56 KB
/
RunSimulation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import os
import traci
import pandas as pd
class Config_SUMO:
sumocfg_path = r'input/your/sumocfg_path'
scenario_path = r'input/your/scenario_path'
scenario_file = "test.det.xml"
sumoBinary = r'C:/Program Files (x86)/Eclipse/Sumo/bin/sumo-gui'
# Detector
class Detector:
def __init__(self, id):
self.id = id
self.vehicle_list = []
self.vehicle_interval_list = []
self.vehicle_total_count = 0
self.vehicle_interval_count = 0
self.vehicle_total_co2 = 0
self.vehicle_interval_co2 = 0
self.tmp = 0
def update(self):
if traci.inductionloop.getLastStepVehicleIDs(self.id) in self.vehicle_list:
pass
else:
self.vehicle_interval_list += traci.inductionloop.getLastStepVehicleIDs(self.id)
self.vehicle_interval_count = len(list(set(self.vehicle_interval_list)))
for v_id in self.vehicle_interval_list:
self.vehicle_interval_co2 += traci.vehicle.getCO2Emission(v_id)
self.vehicle_total_co2 += traci.vehicle.getCO2Emission(v_id)
def interval_reset(self):
self.vehicle_list += self.vehicle_interval_list
self.vehicle_interval_list = []
self.vehicle_total_count = len(list(set(self.vehicle_list)))
self.vehicle_interval_co2 = 0
def merge_data(list):
# Create a DataFrame from the detection results
df = pd.DataFrame(list)
# Fill in missing values
df.fillna(0)
# Combine rows with the same value in the time column
merge_df = df.groupby('Time').agg('sum').reset_index()
list_result = merge_df.values.tolist()
return list_result
def merge_col(df):
merged_columns = pd.DataFrame()
# Define the number of columns to merge at a time
columns_to_merge = 3
merged_columns['Time'] = df['Time']
# Loop through columns starting from the 2nd column
for i in range(1, len(df.columns), columns_to_merge):
start_col = i
end_col = min(i + columns_to_merge, len(df.columns))
# Merge the selected columns and concatenate them horizontally
merged = pd.concat([df.iloc[:, start_col:end_col]], axis=1)
# Rename the columns with a common prefix (e.g., 'Merged_')
merged.columns = [f'Merged_{col}' for col in merged.columns]
# Concatenate the merged columns to the result DataFrame
merged_columns = pd.concat([merged_columns, merged], axis=1)
return merged_columns
class SumoController:
def __init__(self,config):
self.config = config
self.__set_SUMO()
self.detectors = [Detector(detector_id) for detector_id in self.__get_detector_ids(self.config)]
self.detection_result_flow = []
self.detection_result_co2 = []
self.detection_result_co2_flow = []
self.detection_result_flow_merge = []
self.detection_result_co2_merge = []
self.detection_result_co2_flow_merge = []
pass
def __get_detector_ids(self, config):
detector_ids = []
with open(os.path.join(config.scenario_path, config.scenario_file), "r") as f:
for line in f:
if "inductionLoop" in line:
parts = line.split('"')
detector_ids.append(parts[1])
return detector_ids
def __set_SUMO(self):
traci.start(["sumo-gui", "-c", self.config.sumocfg_path])
traci.simulationStep()
def extract_excel(self):
# Create a DataFrame from the detection results
df1 = pd.DataFrame(self.detection_result_flow)
df2 = pd.DataFrame(self.detection_result_co2)
df3 = pd.DataFrame(self.detection_result_co2_flow)
# Fill in missing values
df1.fillna(0)
df2.fillna(0)
df3.fillna(0)
# Combine rows with the same value in the time column
merge_df1 = df1.groupby('Time').agg('sum').reset_index()
merge_df2 = df2.groupby('Time').agg('sum').reset_index()
merge_df3 = df3.groupby('Time').agg('sum').reset_index()
excel_writer = pd.ExcelWriter("results_new.xlsx", engine="xlsxwriter")
merge_df1.to_excel(excel_writer, sheet_name="Flow", index=False)
merge_df2.to_excel(excel_writer, sheet_name="CO2", index=False)
merge_df3.to_excel(excel_writer, sheet_name="CO2_Flow", index=False)
excel_writer.close()
def run_simulation(self):
step = 0
while step <= 360:
traci.simulationStep()
for detector in self.detectors:
detector.update()
if (step % 30) == 0:
# print("Current simulation time:", traci.simulation.getTime())
for detector in self.detectors:
self.detection_result_flow.append({"Time": traci.simulation.getTime(), detector.id: int(f"{detector.vehicle_interval_count}")})
self.detection_result_co2.append({"Time": traci.simulation.getTime(), detector.id: float(f"{detector.vehicle_interval_co2:.2f}")})
self.detection_result_co2_flow.append({"Time": traci.simulation.getTime(), detector.id : f"{detector.vehicle_interval_co2:.2f}/{detector.vehicle_interval_count}"})
# print(f"{detector.id}에서 통과한 차량 수 : {detector.vehicle_interval_count}")
detector.interval_reset()
self.detection_result_flow_merge = merge_data(self.detection_result_flow)
self.detection_result_co2_merge = merge_data(self.detection_result_co2)
self.detection_result_co2_flow_merge = merge_data(self.detection_result_co2_flow)
step += 1
traci.close()