From 79f4bf60108977372af9b90f520c18b23b74aec4 Mon Sep 17 00:00:00 2001 From: Furkan Date: Thu, 7 Nov 2024 20:40:58 +0300 Subject: [PATCH] refactor: refactor variable names in `DAQJobRemote` --- src/daq/jobs/remote.py | 25 +++++++++++++------------ src/tests/test_remote.py | 2 +- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/daq/jobs/remote.py b/src/daq/jobs/remote.py index 2524bd2..6f7ac01 100644 --- a/src/daq/jobs/remote.py +++ b/src/daq/jobs/remote.py @@ -1,6 +1,7 @@ import pickle import threading import time +from typing import Optional import msgspec import zmq @@ -31,8 +32,8 @@ class DAQJobRemote(DAQJob): allowed_message_in_types = [DAQJobMessage] # accept all message types config_type = DAQJobRemoteConfig config: DAQJobRemoteConfig - _zmq_local: zmq.Socket - _zmq_remotes: dict[str, zmq.Socket] + _zmq_pub: zmq.Socket + _zmq_sub: Optional[zmq.Socket] _message_class_cache: dict[str, type[DAQJobMessage]] _remote_message_ids: set[str] _receive_thread: threading.Thread @@ -41,9 +42,9 @@ def __init__(self, config: DAQJobRemoteConfig): super().__init__(config) self._zmq_context = zmq.Context() self._logger.debug(f"Listening on {config.zmq_local_url}") - self._zmq_local = self._zmq_context.socket(zmq.PUB) - self._zmq_remotes = {} - self._zmq_local.bind(config.zmq_local_url) + self._zmq_pub = self._zmq_context.socket(zmq.PUB) + self._zmq_pub.bind(config.zmq_local_url) + self._zmq_sub = None self._receive_thread = threading.Thread( target=self._start_receive_thread, @@ -66,7 +67,7 @@ def handle_message(self, message: DAQJobMessage) -> bool: ): return True # Silently ignore - self._zmq_local.send(self._pack_message(message)) + self._zmq_pub.send(self._pack_message(message)) return True def _create_zmq_sub(self, remote_urls: list[str]): @@ -79,11 +80,11 @@ def _create_zmq_sub(self, remote_urls: list[str]): return zmq_sub def _start_receive_thread(self, remote_urls: list[str]): - zmq_sub = self._create_zmq_sub(remote_urls) + self._zmq_sub = self._create_zmq_sub(remote_urls) while True: - message = zmq_sub.recv() - self._logger.debug(f"Received {len(message)} bytes from") + message = self._zmq_sub.recv() + self._logger.debug(f"Received {len(message)} bytes") recv_message = self._unpack_message(message) recv_message.is_remote = True # remote message_in -> message_out @@ -131,8 +132,8 @@ def _unpack_message(self, message: bytes) -> DAQJobMessage: return res def __del__(self): - for remote_url in self._zmq_remotes.keys(): - self._zmq_remotes[remote_url].close() - self._zmq_local.close() + if self._zmq_sub is not None: + self._zmq_sub.close() + self._zmq_pub.close() return super().__del__() diff --git a/src/tests/test_remote.py b/src/tests/test_remote.py index f5a5e74..13686dd 100644 --- a/src/tests/test_remote.py +++ b/src/tests/test_remote.py @@ -22,7 +22,7 @@ def setUp(self, MockZmqContext): zmq_remote_urls=["tcp://localhost:5556"], ) self.daq_job_remote = DAQJobRemote(self.config) - self.daq_job_remote._zmq_local = self.mock_sender + self.daq_job_remote._zmq_pub = self.mock_sender def test_handle_message(self): message = DAQJobMessage(