forked from evshary/zenoh_autoware_fms
-
Notifications
You must be signed in to change notification settings - Fork 0
/
api_server.py
182 lines (144 loc) · 5.13 KB
/
api_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import asyncio
import math
import cv2
import zenoh
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from zenoh_app.camera_autoware import MJPEG_server
from zenoh_app.list_autoware import list_autoware
from zenoh_app.pose_service import PoseServer
from zenoh_app.status_autoware import get_cpu_status, get_vehicle_status
from zenoh_app.teleop_autoware import ManualController
MJPEG_HOST = '0.0.0.0'
MJPEG_PORT = 5000
app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=['*'])
conf = zenoh.Config.from_file('config.json5')
session = zenoh.open(conf)
use_bridge_ros2dds = True
manual_controller = None
mjpeg_server = None
pose_service = PoseServer(session, use_bridge_ros2dds)
@app.get('/')
async def root():
return {'message': 'Hello World'}
@app.get('/list')
async def manage_list_autoware():
return list_autoware(session, use_bridge_ros2dds)
@app.get('/status/{scope}')
async def manage_status_autoware(scope):
return {'cpu': get_cpu_status(session, scope, use_bridge_ros2dds), 'vehicle': get_vehicle_status(session, scope, use_bridge_ros2dds)}
@app.websocket('/video')
async def handle_ws(websocket: WebSocket):
await websocket.accept()
global mjpeg_server
try:
while True:
if mjpeg_server is None or mjpeg_server.camera_image is None:
await asyncio.sleep(2)
else:
# Encode the frame as JPEG
_, buffer = cv2.imencode('.jpg', mjpeg_server.camera_image)
frame_bytes = buffer.tobytes()
await websocket.send_bytes(frame_bytes)
await asyncio.sleep(0.1)
except WebSocketDisconnect:
await websocket.close()
@app.get('/teleop/startup')
async def manage_teleop_startup(scope):
global manual_controller, mjpeg_server, mjpeg_server_thread
if manual_controller is not None:
manual_controller.stop_teleop()
manual_controller = ManualController(session, scope, use_bridge_ros2dds)
if mjpeg_server is not None:
mjpeg_server.change_scope(scope)
else:
mjpeg_server = MJPEG_server(session, scope, use_bridge_ros2dds)
# mjpeg_server_thread = threading.Thread(target = mjpeg_server.run)
# mjpeg_server_thread.start()
return {
'text': f'Startup manual control on {scope}.',
'mjpeg_host': 'localhost' if MJPEG_HOST == '0.0.0.0' else MJPEG_HOST,
'mjpeg_port': MJPEG_PORT,
}
@app.get('/teleop/gear')
async def manage_teleop_gear(scope, gear):
global manual_controller
if manual_controller is not None:
manual_controller.pub_gear(gear)
return f'Set gear {gear} to {scope}.'
else:
return 'Please startup the teleop first'
@app.get('/teleop/velocity')
async def manage_teleop_speed(scope, velocity):
global manual_controller
if manual_controller is not None:
manual_controller.update_control_command(float(velocity) * 1000 / 3600, None)
return f'Set speed {velocity} to {scope}.'
else:
return 'Please startup the teleop first'
@app.get('/teleop/turn')
async def manage_teleop_turn(scope, angle):
global manual_controller
if manual_controller is not None:
manual_controller.update_control_command(None, float(angle) * math.pi / 180)
return f'Set steering angle {angle}.'
else:
return 'Please startup the teleop first'
@app.get('/teleop/status')
async def manage_teleop_status():
global manual_controller
if manual_controller is not None:
return {
'velocity': round(manual_controller.current_velocity * 3600 / 1000, 2),
'gear': manual_controller.current_gear,
'steering': manual_controller.current_steer * 180 / math.pi,
}
else:
return {'velocity': '---', 'gear': '---', 'steering': '---'}
# @app.get("/map/startup")
# async def manage_map_startup(scope):
# global pose_service
# if pose_service is None:
# pose_service = PoseServer(session, scope)
# else:
# pose_service.change_scope(scope)
# return {
# "text": f"Startup manual control on {scope}."
# }
@app.get('/map/list')
async def get_vehilcle_list():
global pose_service
pose_service.findVehicles()
return list(pose_service.vehicles.keys())
@app.get('/map/pose')
async def get_vehicle_pose():
global pose_service
if pose_service is not None:
return pose_service.returnPose()
else:
return []
@app.get('/map/goalPose')
async def get_vehicle_goalpose():
global pose_service
if pose_service is not None:
return pose_service.returnGoalPose()
else:
return []
@app.get('/map/setGoal')
async def set_goal_pose(scope, lat, lon):
global pose_service
if pose_service is not None:
print(f'[API SERVER] Set Goal Pose of {scope} as (lat={lat}, lon={lon})')
pose_service.setGoal(scope, lat, lon)
return 'success'
else:
return 'fail'
@app.get('/map/engage')
async def set_engage(scope):
global pose_service
if pose_service is not None:
pose_service.engage(scope)
return 'success'
else:
return 'fail'