From 331b8bfc6e25620c20856faac366975981b763d6 Mon Sep 17 00:00:00 2001 From: Eric Bezzam Date: Fri, 17 Nov 2023 10:51:09 +0100 Subject: [PATCH] Add and improve hardware utilities. --- lensless/hardware/aperture.py | 3 +- lensless/hardware/slm.py | 134 +++++++++++++------ lensless/hardware/utils.py | 238 +++++++++++++++++++++++++++++++++- scripts/sim/digicam_psf.py | 46 +++++-- 4 files changed, 370 insertions(+), 51 deletions(-) diff --git a/lensless/hardware/aperture.py b/lensless/hardware/aperture.py index 37e8e37b..c2e0e62b 100644 --- a/lensless/hardware/aperture.py +++ b/lensless/hardware/aperture.py @@ -183,6 +183,7 @@ def rect_aperture(slm_shape, pixel_pitch, apert_dim, center=None): apert_dim = np.array(apert_dim) top_left = center - apert_dim / 2 bottom_right = top_left + apert_dim + if ( top_left[0] < 0 or top_left[1] < 0 @@ -329,7 +330,7 @@ def _m_to_cell_idx(val, cell_m): :return: The cell index. :rtype: int """ - return int(val / cell_m) + return int(np.round(val / cell_m)) def prepare_index_vals(key, pixel_pitch): diff --git a/lensless/hardware/slm.py b/lensless/hardware/slm.py index 0819e415..7fcab8cb 100644 --- a/lensless/hardware/slm.py +++ b/lensless/hardware/slm.py @@ -9,12 +9,12 @@ import os import numpy as np from lensless.hardware.utils import check_username_hostname -from lensless.utils.io import get_dtype, get_ctypes +from lensless.utils.io import get_ctypes from slm_controller.hardware import SLMParam, slm_devices from waveprop.spherical import spherical_prop from waveprop.color import ColorSystem from waveprop.rs import angular_spectrum -from waveprop.slm import get_centers, get_color_filter +from waveprop.slm import get_centers from waveprop.devices import SLMParam as SLMParam_wp from scipy.ndimage import rotate as rotate_func @@ -35,7 +35,7 @@ } -def set_programmable_mask(pattern, device, rpi_username, rpi_hostname): +def set_programmable_mask(pattern, device, rpi_username, rpi_hostname, verbose=False): """ Set LCD pattern on Raspberry Pi. @@ -79,9 +79,12 @@ def set_programmable_mask(pattern, device, rpi_username, rpi_hostname): # copy pattern to Raspberry Pi remote_path = f"~/{pattern_fn}" - print(f"PUTTING {local_path} to {remote_path}") + if verbose: + print(f"PUTTING {local_path} to {remote_path}") - os.system('scp %s "%s@%s:%s" ' % (local_path, rpi_username, rpi_hostname, remote_path)) + os.system( + 'scp %s "%s@%s:%s" >/dev/null 2>&1' % (local_path, rpi_username, rpi_hostname, remote_path) + ) # # -- not sure why this doesn't work... permission denied # sftp = client.open_sftp() # sftp.put(local_path, remote_path, confirm=True) @@ -89,9 +92,11 @@ def set_programmable_mask(pattern, device, rpi_username, rpi_hostname): # run script on Raspberry Pi to set mask pattern command = f"{rpi_python} {script} --file_path {remote_path}" - print(f"COMMAND : {command}") + if verbose: + print(f"COMMAND : {command}") _stdin, _stdout, _stderr = client.exec_command(command) - print(_stdout.read().decode()) + if verbose: + print(_stdout.read().decode()) client.close() os.remove(local_path) @@ -104,6 +109,7 @@ def get_programmable_mask( rotate=None, flipud=False, nbits=8, + color_filter=None, ): """ Get mask as a numpy or torch array. Return same type. @@ -136,22 +142,21 @@ def get_programmable_mask( pixel_pitch = slm_param[SLMParam_wp.PITCH] centers = get_centers(n_active_slm_pixels, pixel_pitch=pixel_pitch) - if SLMParam_wp.COLOR_FILTER in slm_param.keys(): + if color_filter is None and SLMParam_wp.COLOR_FILTER in slm_param.keys(): color_filter = slm_param[SLMParam_wp.COLOR_FILTER] - if flipud: - color_filter = np.flipud(color_filter) - - cf = get_color_filter( - slm_dim=n_active_slm_pixels, - color_filter=color_filter, - shift=0, - flat=True, - ) + if isinstance(vals, torch.Tensor): + color_filter = torch.tensor(color_filter).to(vals) - else: + if color_filter is not None: - # monochrome - cf = None + if isinstance(color_filter, np.ndarray): + if flipud: + color_filter = np.flipud(color_filter) + elif isinstance(color_filter, torch.Tensor): + if flipud: + color_filter = torch.flip(color_filter, dims=(0,)) + else: + raise ValueError("color_filter must be numpy array or torch tensor") d1 = sensor.pitch _height_pixel, _width_pixel = (slm_param[SLMParam_wp.CELL_SIZE] / d1).astype(int) @@ -171,29 +176,44 @@ def get_programmable_mask( _center_pixel[1] + 1 - np.floor(_width_pixel / 2).astype(int), ) - if cf is not None: - _rect = np.tile(cf[i][:, np.newaxis, np.newaxis], (1, _height_pixel, _width_pixel)) - else: - _rect = np.ones((1, _height_pixel, _width_pixel)) + color_filter_idx = i // n_active_slm_pixels[1] % n_color_filter - if use_torch: - _rect = torch.tensor(_rect).to(slm_vals_flat) + # if color_filter is not None: + # _rect = np.tile(color_filter[color_filter_idx][0][:, np.newaxis, np.newaxis], (1, _height_pixel, _width_pixel)) + # else: + # _rect = np.ones((1, _height_pixel, _width_pixel)) + + # if use_torch: + # _rect = torch.tensor(_rect).to(slm_vals_flat) + # import pudb; pudb.set_trace() + + mask_val = slm_vals_flat[i] * color_filter[color_filter_idx][0] + if isinstance(mask_val, np.ndarray): + mask_val = mask_val[:, np.newaxis, np.newaxis] + elif isinstance(mask_val, torch.Tensor): + mask_val = mask_val.unsqueeze(-1).unsqueeze(-1) mask[ :, _center_top_left_pixel[0] : _center_top_left_pixel[0] + _height_pixel, _center_top_left_pixel[1] : _center_top_left_pixel[1] + _width_pixel, - ] = ( - slm_vals_flat[i] * _rect - ) - - # quantize mask - if use_torch: - mask = mask / torch.max(mask) - mask = torch.round(mask * (2**nbits - 1)) / (2**nbits - 1) - else: - mask = mask / np.max(mask) - mask = np.round(mask * (2**nbits - 1)) / (2**nbits - 1) + ] = mask_val + + # mask[ + # :, + # _center_top_left_pixel[0] : _center_top_left_pixel[0] + _height_pixel, + # _center_top_left_pixel[1] : _center_top_left_pixel[1] + _width_pixel, + # ] = ( + # slm_vals_flat[i] * _rect + # ) + + # # quantize mask + # if use_torch: + # mask = mask / torch.max(mask) + # mask = torch.round(mask * (2**nbits - 1)) / (2**nbits - 1) + # else: + # mask = mask / np.max(mask) + # mask = np.round(mask * (2**nbits - 1)) / (2**nbits - 1) # rotate if rotate is not None: @@ -205,6 +225,46 @@ def get_programmable_mask( return mask +def adafruit_sub2full( + subpattern, + center, +): + sub_shape = subpattern.shape + controllable_shape = (3, sub_shape[0] // 3, sub_shape[1]) + subpattern_rgb = subpattern.reshape(controllable_shape, order="F") + subpattern_rgb *= 255 + + # pad to full pattern + pattern = np.zeros((3, 128, 160), dtype=np.uint8) + topleft = [center[0] - controllable_shape[1] // 2, center[1] - controllable_shape[2] // 2] + pattern[ + :, + topleft[0] : topleft[0] + controllable_shape[1], + topleft[1] : topleft[1] + controllable_shape[2], + ] = subpattern_rgb.astype(np.uint8) + return pattern + + +def full2subpattern( + pattern, + shape, + center, + slm=None, +): + shape = np.array(shape) + center = np.array(center) + + # extract region + idx_1 = center[0] - shape[0] // 2 + idx_2 = center[1] - shape[1] // 2 + subpattern = pattern[:, idx_1 : idx_1 + shape[0], idx_2 : idx_2 + shape[1]] + subpattern = subpattern / 255.0 + if slm == "adafruit": + # flatten color channel along rows + subpattern = subpattern.reshape((-1, subpattern.shape[-1]), order="F") + return subpattern + + def get_intensity_psf( mask, waveprop=False, diff --git a/lensless/hardware/utils.py b/lensless/hardware/utils.py index b9e3234e..adadbc1c 100644 --- a/lensless/hardware/utils.py +++ b/lensless/hardware/utils.py @@ -3,9 +3,235 @@ import socket import subprocess import time - import paramiko +from pprint import pprint from paramiko.ssh_exception import AuthenticationException, BadHostKeyException, SSHException +from lensless.hardware.sensor import SensorOptions +import cv2 +from lensless.utils.image import print_image_info +from lensless.utils.io import load_image + + +import logging + +logging.getLogger("paramiko").setLevel(logging.WARNING) + + +def capture( + rpi_username, + rpi_hostname, + sensor, + bayer, + exp, + fn="capture", + iso=100, + config_pause=2, + sensor_mode="0", + nbits_out=12, + legacy=True, + rgb=False, + gray=False, + nbits=12, + down=None, + awb_gains=None, + rpi_python="~/LenslessPiCam/lensless_env/bin/python", + capture_script="~/LenslessPiCam/scripts/measure/on_device_capture.py", + verbose=False, + output_path=None, + **kwargs, +): + """ + Capture image. + + Parameters + ---------- + fn : str + File name captured image. + rpi_username : str + Username of Raspberry Pi. + rpi_hostname : str + Hostname of Raspberry Pi. + sensor : str + Sensor name + bayer : bool + Whether to return bayer data (larger file size to transfer back). + exp : int + Exposure time in microseconds. + iso : int + ISO. + config_pause : int + Time to pause after configuring camera. + sensor_mode : str + Sensor mode. + nbits_out : int + Number of bits of output image. + legacy : bool + Whether to use legacy capture software of Raspberry Pi. + rgb : bool + Whether to capture RGB image. + gray : bool + Whether to capture grayscale image. + nbits : int + Number of bits of image. + down : int + Downsample factor. + awb_gains : list + AWB gains (red, blue). + rpi_python : str + Path to Python on Raspberry Pi. + capture_script : str + Path to capture script on Raspberry Pi. + output_path : str + Path to save image. + verbose : bool + Whether to print extra info. + + """ + + # check_username_hostname(rpi_username, rpi_hostname) + assert sensor in SensorOptions.values(), f"Sensor must be one of {SensorOptions.values()}" + + # form command + remote_fn = "remote_capture" + pic_command = ( + f"{rpi_python} {capture_script} sensor={sensor} bayer={bayer} fn={remote_fn} exp={exp} iso={iso} " + f"config_pause={config_pause} sensor_mode={sensor_mode} nbits_out={nbits_out} " + f"legacy={legacy} rgb={rgb} gray={gray} " + ) + if nbits > 8: + pic_command += " sixteen=True" + if down: + pic_command += f" down={down}" + if awb_gains: + pic_command += f" awb_gains=[{awb_gains[0]},{awb_gains[1]}]" + + if verbose: + print(f"COMMAND : {pic_command}") + + # take picture + ssh = subprocess.Popen( + ["ssh", "%s@%s" % (rpi_username, rpi_hostname), pic_command], + shell=False, + # stdout=DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + result = ssh.stdout.readlines() + error = ssh.stderr.readlines() + + if error != [] and legacy: # new camera software seems to return error even if it works + print("ERROR: %s" % error) + return + if result == []: + error = ssh.stderr.readlines() + print("ERROR: %s" % error) + return + else: + result = [res.decode("UTF-8") for res in result] + result = [res for res in result if len(res) > 3] + result_dict = dict() + for res in result: + _key = res.split(":")[0].strip() + _val = "".join(res.split(":")[1:]).strip() + result_dict[_key] = _val + # result_dict = dict(map(lambda s: map(str.strip, s.split(":")), result)) + if verbose: + print("COMMAND OUTPUT : ") + pprint(result_dict) + + # copy over file + if ( + "RPi distribution" in result_dict.keys() + and "bullseye" in result_dict["RPi distribution"] + and not legacy + ): + + if bayer: + + # copy over DNG file + remotefile = f"~/{remote_fn}.dng" + localfile = f"{fn}.dng" + if output_path is not None: + localfile = os.path.join(output_path, localfile) + if verbose: + print(f"\nCopying over picture as {localfile}...") + os.system( + 'scp "%s@%s:%s" %s >/dev/null 2>&1' + % (rpi_username, rpi_hostname, remotefile, localfile) + ) + + img = load_image(localfile, verbose=True, bayer=bayer, nbits_out=nbits_out) + + # print image properties + print_image_info(img) + + # save as PNG + png_out = f"{fn}.png" + print(f"Saving RGB file as: {png_out}") + cv2.imwrite(png_out, cv2.cvtColor(img, cv2.COLOR_RGB2BGR)) + + else: + + remotefile = f"~/{remote_fn}.png" + localfile = f"{fn}.png" + if output_path is not None: + localfile = os.path.join(output_path, localfile) + if verbose: + print(f"\nCopying over picture as {localfile}...") + os.system( + 'scp "%s@%s:%s" %s >/dev/null 2>&1' + % (rpi_username, rpi_hostname, remotefile, localfile) + ) + + img = load_image(localfile, verbose=True) + + # legacy software running on RPi + else: + # copy over file + # more pythonic? https://stackoverflow.com/questions/250283/how-to-scp-in-python + remotefile = f"~/{remote_fn}.png" + localfile = f"{fn}.png" + if output_path is not None: + localfile = os.path.join(output_path, localfile) + if verbose: + print(f"\nCopying over picture as {localfile}...") + os.system( + 'scp "%s@%s:%s" %s >/dev/null 2>&1' + % (rpi_username, rpi_hostname, remotefile, localfile) + ) + + if rgb or gray: + img = load_image(localfile, verbose=verbose) + + else: + + if not bayer: + # red_gain = config.camera.red_gain + # blue_gain = config.camera.blue_gain + red_gain = awb_gains[0] + blue_gain = awb_gains[1] + else: + red_gain = None + blue_gain = None + + # load image + if verbose: + print("\nLoading picture...") + + img = load_image( + localfile, + verbose=True, + bayer=bayer, + blue_gain=blue_gain, + red_gain=red_gain, + nbits_out=nbits_out, + ) + + # write RGB data + if not bayer: + cv2.imwrite(localfile, cv2.cvtColor(img, cv2.COLOR_RGB2BGR)) + + return localfile, img def display( @@ -18,6 +244,7 @@ def display( pad=0, vshift=0, hshift=0, + verbose=False, **kwargs, ): """ @@ -43,16 +270,20 @@ def display( remote_tmp_file = "~/tmp_display.png" display_path = "~/LenslessPiCam_display/test.png" - os.system('scp %s "%s@%s:%s" ' % (fp, rpi_username, rpi_hostname, remote_tmp_file)) + os.system( + 'scp %s "%s@%s:%s" >/dev/null 2>&1' % (fp, rpi_username, rpi_hostname, remote_tmp_file) + ) # run script on Raspberry Pi to prepare image to display prep_command = f"{rpi_python} {script} --fp {remote_tmp_file} \ --pad {pad} --vshift {vshift} --hshift {hshift} --screen_res {screen_res[0]} {screen_res[1]} \ --brightness {brightness} --rot90 {rot90} --output_path {display_path} " - # print(f"COMMAND : {prep_command}") + if verbose: + print(f"COMMAND : {prep_command}") subprocess.Popen( ["ssh", "%s@%s" % (rpi_username, rpi_hostname), prep_command], shell=False, + # stdout=DEVNULL ) @@ -65,6 +296,7 @@ def check_username_hostname(username, hostname, timeout=10): client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: + # with suppress_stdout(): client.connect(hostname, username=username, timeout=timeout) except (BadHostKeyException, AuthenticationException, SSHException, socket.error) as e: raise ValueError(f"Could not connect to {username}@{hostname}\n{e}") diff --git a/scripts/sim/digicam_psf.py b/scripts/sim/digicam_psf.py index e40d499a..9b665c27 100644 --- a/scripts/sim/digicam_psf.py +++ b/scripts/sim/digicam_psf.py @@ -12,6 +12,7 @@ from lensless.hardware.sensor import VirtualSensor from lensless.hardware.slm import get_programmable_mask, get_intensity_psf from waveprop.devices import slm_dict +from waveprop.devices import SLMParam as SLMParam_wp @hydra.main(version_base=None, config_path="../../configs", config_name="sim_digicam_psf") @@ -76,21 +77,36 @@ def digicam_psf(config): start_time = time.time() slm_vals = pattern_sub / 255.0 + # prepare color filter + if SLMParam_wp.COLOR_FILTER in slm_param.keys(): + color_filter = slm_param[SLMParam_wp.COLOR_FILTER] + if config.use_torch: + color_filter = torch.from_numpy(color_filter.copy()).to( + device=torch_device, dtype=dtype + ) + else: + color_filter = color_filter.astype(dtype) + if config.digicam.slm == "adafruit": # flatten color channel along rows slm_vals = slm_vals.reshape((-1, slm_vals.shape[-1]), order="F") + # save extracted mask values + np.save(os.path.join(output_folder, "mask_vals.npy"), slm_vals) + if config.use_torch: slm_vals = torch.from_numpy(slm_vals).to(device=torch_device, dtype=dtype) else: slm_vals = slm_vals.astype(dtype) + # -- get mask mask = get_programmable_mask( vals=slm_vals, sensor=sensor, slm_param=slm_param, rotate=rotate_angle, flipud=config.sim.flipud, + color_filter=color_filter, ) if config.digicam.vertical_shift is not None: @@ -139,14 +155,27 @@ def digicam_psf(config): else: print("Could not load PSF image from: ", fp_psf) - fp = os.path.join(output_folder, "psf_plot.png") + fp = os.path.join(output_folder, "sim_psf_plot.png") + fig = plt.figure(frameon=False) + ax = plt.Axes(fig, [0.0, 0.0, 1.0, 1.0]) + ax.set_axis_off() + fig.add_axes(ax) + ax.imshow(psf_in_np) + ax.set_xticks([]) + ax.set_yticks([]) + plt.savefig(fp) + if psf_meas is not None: - _, ax = plt.subplots(1, 2) - ax[0].imshow(psf_in_np) - ax[0].set_title("Simulated") - plot_image(psf_meas, gamma=config.digicam.gamma, normalize=True, ax=ax[1]) - ax[1].set_title("Measured") - plt.savefig(fp) + + fig = plt.figure(frameon=False) + ax = plt.Axes(fig, [0.0, 0.0, 1.0, 1.0]) + ax.set_axis_off() + fig.add_axes(ax) + plot_image(psf_meas, gamma=config.digicam.gamma, normalize=True, ax=ax) + # remove axis values + ax.set_xticks([]) + ax.set_yticks([]) + plt.savefig(os.path.join(output_folder, "meas_psf_plot.png")) # plot overlayed fp = os.path.join(output_folder, "psf_overlay.png") @@ -158,9 +187,6 @@ def digicam_psf(config): plt.imshow(psf_in_np_norm, alpha=0.7) plt.imshow(psf_meas_norm, alpha=0.7) plt.savefig(fp) - else: - plt.imshow(psf_in_np) - plt.savefig(fp) # save PSF as png fp = os.path.join(output_folder, f"{bn}_SIM_psf.png")