Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NO_MERGE: hotfix: multicam support #11

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 195 additions & 2 deletions robothub_sdk/src/robothub_sdk/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,32 @@ def mono_left_video(self) -> Stream:
resolution=(camera.getResolutionWidth(), camera.getResolutionHeight()),
)

@cached_property
def color_left_preview(self) -> Stream:
camera = self.device.ensure_camera(dai.CameraBoardSocket.LEFT)
fps = int(camera.getFps())
return self.create(
camera,
camera.preview,
stream_type=StreamType.FRAME,
rate=fps,
description=f"Left camera preview {camera.getPreviewWidth()}x{camera.getPreviewHeight()}@{fps}",
resolution=(camera.getPreviewWidth(), camera.getPreviewHeight()),
)

@cached_property
def color_left_video(self) -> Stream:
camera = self.device.ensure_camera(dai.CameraBoardSocket.LEFT)
fps = int(camera.getFps())
return self.create(
camera,
camera.video,
stream_type=StreamType.FRAME,
rate=fps,
description=f"Left camera video {camera.getVideoWidth()}x{camera.getVideoHeight()}@{fps}",
resolution=(camera.getVideoWidth(), camera.getVideoHeight()),
)

@cached_property
def mono_right_video(self) -> Stream:
camera = self.device.ensure_camera(dai.CameraBoardSocket.RIGHT)
Expand All @@ -192,11 +218,72 @@ def mono_right_video(self) -> Stream:
resolution=(camera.getResolutionWidth(), camera.getResolutionHeight()),
)

@cached_property
def color_right_preview(self) -> Stream:
camera = self.device.ensure_camera(dai.CameraBoardSocket.RIGHT)
fps = int(camera.getFps())
return self.create(
camera,
camera.preview,
stream_type=StreamType.FRAME,
rate=fps,
description=f"Right camera preview {camera.getPreviewWidth()}x{camera.getPreviewHeight()}@{fps}",
resolution=(camera.getPreviewWidth(), camera.getPreviewHeight()),
)

@cached_property
def color_right_video(self) -> Stream:
camera = self.device.ensure_camera(dai.CameraBoardSocket.RIGHT)
fps = int(camera.getFps())
return self.create(
camera,
camera.video,
stream_type=StreamType.FRAME,
rate=fps,
description=f"Right camera video {camera.getVideoWidth()}x{camera.getVideoHeight()}@{fps}",
resolution=(camera.getVideoWidth(), camera.getVideoHeight()),
)

@cached_property
def color_cam_d_preview(self) -> Stream:
camera = self.device.ensure_camera(dai.CameraBoardSocket.CAM_D)
fps = int(camera.getFps())
return self.create(
camera,
camera.preview,
stream_type=StreamType.FRAME,
rate=fps,
description=f"Camera D preview {camera.getPreviewWidth()}x{camera.getPreviewHeight()}@{fps}",
resolution=(camera.getPreviewWidth(), camera.getPreviewHeight()),
)

@cached_property
def color_cam_d_video(self) -> Stream:
camera = self.device.ensure_camera(dai.CameraBoardSocket.CAM_D)
fps = int(camera.getFps())
return self.create(
camera,
camera.video,
stream_type=StreamType.FRAME,
rate=fps,
description=f"Camera D video {camera.getVideoWidth()}x{camera.getVideoHeight()}@{fps}",
resolution=(camera.getVideoWidth(), camera.getVideoHeight()),
)

statistics: Stream
color_preview: Stream
color_video: Stream
color_still: Stream

color_left_preview: Stream
color_left_video: Stream

color_right_preview: Stream
color_right_video: Stream

color_cam_d_preview: Stream
color_cam_d_video: Stream

mono_left_video: Stream
mono_right_video: Stream

Expand Down Expand Up @@ -236,8 +323,9 @@ def outputs(self) -> List[Stream]:
# TODO rename to cameras
class Nodes:
color_camera: ColorCamera = None
left_camera: MonoCamera = None
right_camera: MonoCamera = None
left_camera: Union[ColorCamera, MonoCamera] = None
right_camera: Union[ColorCamera, MonoCamera] = None
camera_d: ColorCamera = None

id: str
name: str
Expand Down Expand Up @@ -412,9 +500,114 @@ def ensure_camera(self, camera: dai.CameraBoardSocket = dai.CameraBoardSocket.RG
return self.nodes.left_camera
if camera == dai.CameraBoardSocket.RIGHT and self.nodes.right_camera:
return self.nodes.right_camera
if camera == dai.CameraBoardSocket.CAM_D and self.nodes.camera_d:
return self.nodes.camera_d

return self.configure_camera(camera)

def configure_color_camera(
self,
camera: dai.CameraBoardSocket = dai.CameraBoardSocket.RGB,
fps=25,
orientation: dai.CameraImageOrientation = None,
res: CameraResolution = CameraResolution.MIN_RESOLUTION,
color_order=dai.ColorCameraProperties.ColorOrder.BGR,
preview_size: Tuple[int, int] = None,
video_size: Tuple[int, int] = None,
still_size: Tuple[int, int] = None,
isp_scale: Tuple[int, int] = None,
) -> Union[ColorCamera, MonoCamera]:
if camera == dai.CameraBoardSocket.RGB:
cam_rgb = self.nodes.color_camera or self.pipeline.createColorCamera()
self.nodes.color_camera = cam_rgb
cam_rgb.setBoardSocket(dai.CameraBoardSocket.RGB)

depthai_res = res.for_socket(dai.CameraBoardSocket.RGB)
print(f"setting {camera} resolution to {depthai_res}")
cam_rgb.setResolution(depthai_res)
if isp_scale is not None:
cam_rgb.setIspScale(*isp_scale)
if preview_size is not None:
cam_rgb.setPreviewSize(*preview_size)
if video_size is not None:
cam_rgb.setVideoSize(*video_size)
if still_size is not None:
cam_rgb.setStillSize(*still_size)
cam_rgb.setInterleaved(False)
cam_rgb.setColorOrder(color_order)
cam_rgb.setFps(fps)
if orientation is not None:
cam_rgb.setImageOrientation(orientation)
return cam_rgb
if camera == dai.CameraBoardSocket.LEFT:
mono_left = self.nodes.left_camera or self.pipeline.createColorCamera()
self.nodes.left_camera = mono_left
mono_left.setBoardSocket(dai.CameraBoardSocket.LEFT)

depthai_res = res.for_socket(dai.CameraBoardSocket.RGB)
print(f"setting {camera} resolution to {depthai_res}")
mono_left.setResolution(depthai_res)
if isp_scale is not None:
mono_left.setIspScale(*isp_scale)
if preview_size is not None:
mono_left.setPreviewSize(*preview_size)
if video_size is not None:
mono_left.setVideoSize(*video_size)
if still_size is not None:
mono_left.setStillSize(*still_size)
mono_left.setInterleaved(False)
mono_left.setColorOrder(color_order)
mono_left.setFps(fps)
if orientation is not None:
mono_left.setImageOrientation(orientation)
return mono_left
if camera == dai.CameraBoardSocket.RIGHT:
mono_right = self.nodes.right_camera or self.pipeline.createColorCamera()
self.nodes.right_camera = mono_right
mono_right.setBoardSocket(dai.CameraBoardSocket.RIGHT)

depthai_res = res.for_socket(dai.CameraBoardSocket.RGB)
print(f"setting {camera} resolution to {depthai_res}")
mono_right.setResolution(depthai_res)
if isp_scale is not None:
mono_right.setIspScale(*isp_scale)
if preview_size is not None:
mono_right.setPreviewSize(*preview_size)
if video_size is not None:
mono_right.setVideoSize(*video_size)
if still_size is not None:
mono_right.setStillSize(*still_size)
mono_right.setInterleaved(False)
mono_right.setColorOrder(color_order)
mono_right.setFps(fps)
if orientation is not None:
mono_right.setImageOrientation(orientation)
return mono_right
if camera == dai.CameraBoardSocket.CAM_D:
cam_d = self.nodes.camera_d or self.pipeline.createColorCamera()
self.nodes.camera_d = cam_d
cam_d.setBoardSocket(dai.CameraBoardSocket.CAM_D)

depthai_res = res.for_socket(dai.CameraBoardSocket.RGB)
print(f"setting {camera} resolution to {depthai_res}")
cam_d.setResolution(depthai_res)
if isp_scale is not None:
cam_d.setIspScale(*isp_scale)
if preview_size is not None:
cam_d.setPreviewSize(*preview_size)
if video_size is not None:
cam_d.setVideoSize(*video_size)
if still_size is not None:
cam_d.setStillSize(*still_size)
cam_d.setInterleaved(False)
cam_d.setColorOrder(color_order)
cam_d.setFps(fps)
if orientation is not None:
cam_d.setImageOrientation(orientation)
return cam_d

raise ValueError(f"Unsupported camera value {camera.name}")

def configure_camera(
self,
camera: dai.CameraBoardSocket = dai.CameraBoardSocket.RGB,
Expand Down