Skip to content

Commit

Permalink
feat: add restart_offset and update __del__ of DAQJobRemote
Browse files Browse the repository at this point in the history
- destroy context instead of closing sockets
  • Loading branch information
furkan-bilgin committed Nov 9, 2024
1 parent dea9268 commit 31328a0
Showing 1 changed file with 19 additions and 9 deletions.
28 changes: 19 additions & 9 deletions src/daq/jobs/remote.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pickle
import threading
import time
from datetime import timedelta
from typing import Optional

import msgspec
Expand Down Expand Up @@ -32,6 +33,11 @@ class DAQJobRemote(DAQJob):
allowed_message_in_types = [DAQJobMessage] # accept all message types
config_type = DAQJobRemoteConfig
config: DAQJobRemoteConfig
restart_offset = timedelta(seconds=5)

_zmq_pub_ctx: zmq.Context
_zmq_sub_ctx: zmq.Context

_zmq_pub: zmq.Socket
_zmq_sub: Optional[zmq.Socket]
_message_class_cache: dict[str, type[DAQJobMessage]]
Expand All @@ -40,9 +46,9 @@ class DAQJobRemote(DAQJob):

def __init__(self, config: DAQJobRemoteConfig):
super().__init__(config)
self._zmq_context = zmq.Context()
self._zmq_pub_ctx = zmq.Context()
self._logger.debug(f"Listening on {config.zmq_local_url}")
self._zmq_pub = self._zmq_context.socket(zmq.PUB)
self._zmq_pub = self._zmq_pub_ctx.socket(zmq.PUB)
self._zmq_pub.bind(config.zmq_local_url)
self._zmq_sub = None

Expand Down Expand Up @@ -70,9 +76,9 @@ def handle_message(self, message: DAQJobMessage) -> bool:
self._zmq_pub.send(self._pack_message(message))
return True

def _create_zmq_sub(self, remote_urls: list[str]):
ctx = zmq.Context()
zmq_sub = ctx.socket(zmq.SUB)
def _create_zmq_sub(self, remote_urls: list[str]) -> zmq.Socket:
self._zmq_sub_ctx = zmq.Context()
zmq_sub = self._zmq_sub_ctx.socket(zmq.SUB)
for remote_url in remote_urls:
self._logger.debug(f"Connecting to {remote_url}")
zmq_sub.connect(remote_url)
Expand All @@ -83,7 +89,10 @@ def _start_receive_thread(self, remote_urls: list[str]):
self._zmq_sub = self._create_zmq_sub(remote_urls)

while True:
message = self._zmq_sub.recv()
try:
message = self._zmq_sub.recv()
except zmq.ContextTerminated:
break
self._logger.debug(f"Received {len(message)} bytes")
recv_message = self._unpack_message(message)
recv_message.is_remote = True
Expand Down Expand Up @@ -132,8 +141,9 @@ def _unpack_message(self, message: bytes) -> DAQJobMessage:
return res

def __del__(self):
if self._zmq_sub is not None:
self._zmq_sub.close()
self._zmq_pub.close()
if self._zmq_sub_ctx is not None:
self._zmq_sub_ctx.destroy()
if self._zmq_pub_ctx is not None:
self._zmq_pub_ctx.destroy()

return super().__del__()

0 comments on commit 31328a0

Please sign in to comment.