From 846eafcb68622f0fafe68f8fbf1cc9f600a46d84 Mon Sep 17 00:00:00 2001 From: Pete R Jemian Date: Mon, 26 Feb 2024 10:51:49 -0600 Subject: [PATCH] refactor and combine for EPICS scan_id PV (#280) * MNT #229 refactor * STY #229 per 'ruff check' --- bluesky/instrument/epics_signal_config.py | 19 ++++++--- bluesky/instrument/framework/initialize.py | 45 +++------------------- 2 files changed, 19 insertions(+), 45 deletions(-) diff --git a/bluesky/instrument/epics_signal_config.py b/bluesky/instrument/epics_signal_config.py index 5ddfbf68..823aa9b7 100644 --- a/bluesky/instrument/epics_signal_config.py +++ b/bluesky/instrument/epics_signal_config.py @@ -9,16 +9,16 @@ import logging +from ophyd.signal import EpicsSignalBase + logger = logging.getLogger(__name__) logger.info(__file__) -from . import iconfig -from ophyd.signal import EpicsSignal -from ophyd.signal import EpicsSignalBase +from . import iconfig # noqa -# set default timeout for all EpicsSignal connections & communications -# always first, before ANY ophyd EPICS-based signals are created +# Set default timeout for all EpicsSignal connections & communications. +# Always call first, before ANY ophyd EPICS-based signals are created. TIMEOUT = 60 if not EpicsSignalBase._EpicsSignalBase__any_instantiated: EpicsSignalBase.set_defaults( @@ -33,7 +33,10 @@ logger.info("Using RunEngine metadata for scan_id") scan_id_epics = None else: + from ophyd.signal import EpicsSignal + logger.info("Using EPICS PV %s for scan_id", pvname) + # Must not call _before_ default timeouts are set. scan_id_epics = EpicsSignal(pvname, name="scan_id_epics") @@ -41,17 +44,21 @@ def epics_scan_id_source(*args, **kwargs): """ Callback function for RunEngine. Returns *next* scan_id to be used. + * Ignore args and kwargs. * Get current scan_id from PV. * Apply lower limit of zero. * Increment. * Set PV with new value. * Return new value. + + Exception will be raised if PV is not connected when next + ``bluesky.plan_stubs.open_run()`` is called. """ if scan_id_epics is None: raise RuntimeError( "epics_scan_id_source() called when" " 'RUN_ENGINE_SCAN_ID_PV' is" - "undefined in 'iconfig.yml' file." + " undefined in 'iconfig.yml' file." ) new_scan_id = max(scan_id_epics.get(), 0) + 1 scan_id_epics.put(new_scan_id) diff --git a/bluesky/instrument/framework/initialize.py b/bluesky/instrument/framework/initialize.py index 1efb7abd..49fdb551 100644 --- a/bluesky/instrument/framework/initialize.py +++ b/bluesky/instrument/framework/initialize.py @@ -109,47 +109,14 @@ def get_md_path(): # diagnostics # RE.msg_hook = ts_msg_hook -# set default timeout for all EpicsSignal connections & communications -TIMEOUT = 60 -if not EpicsSignalBase._EpicsSignalBase__any_instantiated: - EpicsSignalBase.set_defaults( - auto_monitor=True, - timeout=iconfig.get("PV_READ_TIMEOUT", TIMEOUT), - write_timeout=iconfig.get("PV_WRITE_TIMEOUT", TIMEOUT), - connection_timeout=iconfig.get("PV_CONNECTION_TIMEOUT", TIMEOUT), - ) - -# Create a registry of ophyd devices -registry = Registry(auto_register=True) - -_pv = iconfig.get("RUN_ENGINE_SCAN_ID_PV") -if _pv is None: - logger.info("Using RunEngine metadata for scan_id") -else: - from ophyd import EpicsSignal - - logger.info("Using EPICS PV %s for scan_id", _pv) - scan_id_epics = EpicsSignal(_pv, name="scan_id_epics") - - def epics_scan_id_source(_md): - """ - Callback function for RunEngine. Returns *next* scan_id to be used. - - * Ignore metadata dictionary passed as argument. - * Get current scan_id from PV. - * Apply lower limit of zero. - * Increment (so that scan_id numbering starts from 1). - * Set PV with new value. - * Return new value. - - Exception will be raised if PV is not connected when next - ``bps.open_run()`` is called. - """ - new_scan_id = max(scan_id_epics.get(), 0) + 1 - scan_id_epics.put(new_scan_id) - return new_scan_id +if iconfig.get("RUN_ENGINE_SCAN_ID_PV") is not None: + from ..epics_signal_config import epics_scan_id_source + from ..epics_signal_config import scan_id_epics # tell RunEngine to use the EPICS PV to provide the scan_id. RE.scan_id_source = epics_scan_id_source scan_id_epics.wait_for_connection() RE.md["scan_id"] = scan_id_epics.get() + +# Create a registry of ophyd devices +registry = Registry(auto_register=True)