Skip to content

Commit

Permalink
deploy: d1e626a
Browse files Browse the repository at this point in the history
  • Loading branch information
furkan-bilgin committed Nov 30, 2024
1 parent d0b86d5 commit 8a15380
Show file tree
Hide file tree
Showing 5 changed files with 281 additions and 31 deletions.
1 change: 1 addition & 0 deletions daq/base.html
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ <h3>Subclasses</h3>
<li><a title="enrgdaq.daq.jobs.handle_stats.DAQJobHandleStats" href="jobs/handle_stats.html#enrgdaq.daq.jobs.handle_stats.DAQJobHandleStats">DAQJobHandleStats</a></li>
<li><a title="enrgdaq.daq.jobs.healthcheck.DAQJobHealthcheck" href="jobs/healthcheck.html#enrgdaq.daq.jobs.healthcheck.DAQJobHealthcheck">DAQJobHealthcheck</a></li>
<li><a title="enrgdaq.daq.jobs.remote.DAQJobRemote" href="jobs/remote.html#enrgdaq.daq.jobs.remote.DAQJobRemote">DAQJobRemote</a></li>
<li><a title="enrgdaq.daq.jobs.remote_proxy.DAQJobRemoteProxy" href="jobs/remote_proxy.html#enrgdaq.daq.jobs.remote_proxy.DAQJobRemoteProxy">DAQJobRemoteProxy</a></li>
<li><a title="enrgdaq.daq.jobs.serve_http.DAQJobServeHTTP" href="jobs/serve_http.html#enrgdaq.daq.jobs.serve_http.DAQJobServeHTTP">DAQJobServeHTTP</a></li>
<li><a title="enrgdaq.daq.jobs.test_job.DAQJobTest" href="jobs/test_job.html#enrgdaq.daq.jobs.test_job.DAQJobTest">DAQJobTest</a></li>
<li><a title="enrgdaq.daq.store.base.DAQJobStore" href="store/base.html#enrgdaq.daq.store.base.DAQJobStore">DAQJobStore</a></li>
Expand Down
5 changes: 5 additions & 0 deletions daq/jobs/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ <h2 class="section-title" id="header-submodules">Sub-modules</h2>
<dd>
<div class="desc"></div>
</dd>
<dt><code class="name"><a title="enrgdaq.daq.jobs.remote_proxy" href="remote_proxy.html">enrgdaq.daq.jobs.remote_proxy</a></code></dt>
<dd>
<div class="desc"></div>
</dd>
<dt><code class="name"><a title="enrgdaq.daq.jobs.serve_http" href="serve_http.html">enrgdaq.daq.jobs.serve_http</a></code></dt>
<dd>
<div class="desc"></div>
Expand Down Expand Up @@ -92,6 +96,7 @@ <h2 class="section-title" id="header-submodules">Sub-modules</h2>
<li><code><a title="enrgdaq.daq.jobs.handle_stats" href="handle_stats.html">enrgdaq.daq.jobs.handle_stats</a></code></li>
<li><code><a title="enrgdaq.daq.jobs.healthcheck" href="healthcheck.html">enrgdaq.daq.jobs.healthcheck</a></code></li>
<li><code><a title="enrgdaq.daq.jobs.remote" href="remote.html">enrgdaq.daq.jobs.remote</a></code></li>
<li><code><a title="enrgdaq.daq.jobs.remote_proxy" href="remote_proxy.html">enrgdaq.daq.jobs.remote_proxy</a></code></li>
<li><code><a title="enrgdaq.daq.jobs.serve_http" href="serve_http.html">enrgdaq.daq.jobs.serve_http</a></code></li>
<li><code><a title="enrgdaq.daq.jobs.store" href="store/index.html">enrgdaq.daq.jobs.store</a></code></li>
<li><code><a title="enrgdaq.daq.jobs.test_job" href="test_job.html">enrgdaq.daq.jobs.test_job</a></code></li>
Expand Down
67 changes: 36 additions & 31 deletions daq/jobs/remote.html
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,6 @@ <h2 id="attributes">Attributes</h2>
<dd>Configuration instance.</dd>
<dt><strong><code>restart_offset</code></strong> :&ensp;<code>timedelta</code></dt>
<dd>Restart offset time.</dd>
<dt><strong><code>_zmq_pub_ctx</code></strong> :&ensp;<code>zmq.Context</code></dt>
<dd>ZMQ context for publishing.</dd>
<dt><strong><code>_zmq_sub_ctx</code></strong> :&ensp;<code>zmq.Context</code></dt>
<dd>ZMQ context for subscribing.</dd>
<dt><strong><code>_zmq_pub</code></strong> :&ensp;<code>zmq.Socket</code></dt>
<dd>ZMQ socket for publishing.</dd>
<dt><strong><code>_zmq_sub</code></strong> :&ensp;<code>Optional[zmq.Socket]</code></dt>
<dd>ZMQ socket for subscribing.</dd>
<dt><strong><code>_message_class_cache</code></strong> :&ensp;<code>dict</code></dt>
<dd>Cache for message classes.</dd>
<dt><strong><code>_remote_message_ids</code></strong> :&ensp;<code>set</code></dt>
Expand All @@ -88,10 +80,6 @@ <h2 id="attributes">Attributes</h2>
config_type (type): Configuration type for the job.
config (DAQJobRemoteConfig): Configuration instance.
restart_offset (timedelta): Restart offset time.
_zmq_pub_ctx (zmq.Context): ZMQ context for publishing.
_zmq_sub_ctx (zmq.Context): ZMQ context for subscribing.
_zmq_pub (zmq.Socket): ZMQ socket for publishing.
_zmq_sub (Optional[zmq.Socket]): ZMQ socket for subscribing.
_message_class_cache (dict): Cache for message classes.
_remote_message_ids (set): Set of remote message IDs.
_receive_thread (threading.Thread): Thread for receiving messages.
Expand All @@ -102,27 +90,25 @@ <h2 id="attributes">Attributes</h2>
config: DAQJobRemoteConfig
restart_offset = timedelta(seconds=5)

_zmq_pub_ctx: zmq.Context
_zmq_sub_ctx: zmq.Context

_zmq_pub: zmq.Socket
_zmq_sub: Optional[zmq.Socket]
_message_class_cache: dict[str, type[DAQJobMessage]]
_remote_message_ids: set[str]
_receive_thread: threading.Thread

def __init__(self, config: DAQJobRemoteConfig, **kwargs):
super().__init__(config, **kwargs)

self._zmq_pub_ctx = zmq.Context()
self._logger.debug(f&#34;Listening on {config.zmq_local_url}&#34;)
self._zmq_pub = self._zmq_pub_ctx.socket(zmq.PUB)
self._zmq_pub.bind(config.zmq_local_url)
if config.zmq_proxy_pub_url is not None:
self._zmq_pub_ctx = zmq.Context()
self._zmq_pub = self._zmq_pub_ctx.socket(zmq.PUB)
self._zmq_pub.connect(config.zmq_proxy_pub_url)
else:
self._zmq_pub_ctx = None
self._zmq_pub = None
self._zmq_sub = None

self._receive_thread = threading.Thread(
target=self._start_receive_thread,
args=(config.zmq_remote_urls,),
args=(config.zmq_proxy_sub_urls,),
daemon=True,
)
self._message_class_cache = {}
Expand All @@ -142,6 +128,8 @@ <h2 id="attributes">Attributes</h2>
or not super().handle_message(message)
# Ignore if the message is remote, meaning it was sent by another Supervisor
or message.is_remote
# Ignore if we are not connected to the proxy
or self._zmq_pub is None
):
return True # Silently ignore

Expand All @@ -162,7 +150,7 @@ <h2 id="attributes">Attributes</h2>
&#34;&#34;&#34;
Create a ZMQ subscriber socket.

Args:
Args:g
remote_urls (list[str]): List of remote URLs to connect to.

Returns:
Expand Down Expand Up @@ -199,6 +187,17 @@ <h2 id="attributes">Attributes</h2>
except zmq.ContextTerminated:
break
recv_message = self._unpack_message(message)
if (
recv_message.daq_job_info is not None
and recv_message.daq_job_info.supervisor_config is not None
and self.info.supervisor_config is not None
and recv_message.daq_job_info.supervisor_config.supervisor_id
== self.info.supervisor_config.supervisor_id
):
self._logger.warning(
f&#34;Received own message &#39;{type(recv_message).__name__}&#39; on topic &#39;{topic.decode()}&#39;, ignoring message. This should NOT happen. Check the config.&#34;
)
continue
self._logger.debug(
f&#34;Received {len(message)} bytes for message &#39;{type(recv_message).__name__}&#39; on topic &#39;{topic.decode()}&#39;&#34;
)
Expand Down Expand Up @@ -333,7 +332,7 @@ <h3>Inherited members</h3>
</dd>
<dt id="enrgdaq.daq.jobs.remote.DAQJobRemoteConfig"><code class="flex name class">
<span>class <span class="ident">DAQJobRemoteConfig</span></span>
<span>(</span><span>zmq_local_url: str,<br>zmq_remote_urls: list[str],<br>topics: list[str] = &lt;factory&gt;,<br>*,<br>verbosity: <a title="enrgdaq.daq.models.LogVerbosity" href="../models.html#enrgdaq.daq.models.LogVerbosity">LogVerbosity</a> = LogVerbosity.INFO,<br>remote_config: <a title="enrgdaq.daq.models.DAQRemoteConfig" href="../models.html#enrgdaq.daq.models.DAQRemoteConfig">DAQRemoteConfig</a> | None = &lt;factory&gt;,<br>daq_job_type: str)</span>
<span>(</span><span>zmq_proxy_sub_urls: list[str],<br>topics: list[str] = &lt;factory&gt;,<br>use_xsub: bool = False,<br>zmq_proxy_pub_url: str | None = None,<br>*,<br>verbosity: <a title="enrgdaq.daq.models.LogVerbosity" href="../models.html#enrgdaq.daq.models.LogVerbosity">LogVerbosity</a> = LogVerbosity.INFO,<br>remote_config: <a title="enrgdaq.daq.models.DAQRemoteConfig" href="../models.html#enrgdaq.daq.models.DAQRemoteConfig">DAQRemoteConfig</a> | None = &lt;factory&gt;,<br>daq_job_type: str)</span>
</code></dt>
<dd>
<div class="desc"><p>Configuration for DAQJobRemote.</p>
Expand All @@ -360,9 +359,10 @@ <h2 id="attributes">Attributes</h2>
topics (list[str]): List of topics to subscribe to.
&#34;&#34;&#34;

zmq_local_url: str
zmq_remote_urls: list[str]
topics: list[str] = []</code></pre>
zmq_proxy_sub_urls: list[str]
topics: list[str] = []
use_xsub: bool = False
zmq_proxy_pub_url: Optional[str] = None</code></pre>
</details>
<h3>Ancestors</h3>
<ul class="hlist">
Expand All @@ -376,11 +376,15 @@ <h3>Instance variables</h3>
<dd>
<div class="desc"></div>
</dd>
<dt id="enrgdaq.daq.jobs.remote.DAQJobRemoteConfig.zmq_local_url"><code class="name">var <span class="ident">zmq_local_url</span> : str</code></dt>
<dt id="enrgdaq.daq.jobs.remote.DAQJobRemoteConfig.use_xsub"><code class="name">var <span class="ident">use_xsub</span> : bool</code></dt>
<dd>
<div class="desc"></div>
</dd>
<dt id="enrgdaq.daq.jobs.remote.DAQJobRemoteConfig.zmq_proxy_pub_url"><code class="name">var <span class="ident">zmq_proxy_pub_url</span> : str | None</code></dt>
<dd>
<div class="desc"></div>
</dd>
<dt id="enrgdaq.daq.jobs.remote.DAQJobRemoteConfig.zmq_remote_urls"><code class="name">var <span class="ident">zmq_remote_urls</span> : list[str]</code></dt>
<dt id="enrgdaq.daq.jobs.remote.DAQJobRemoteConfig.zmq_proxy_sub_urls"><code class="name">var <span class="ident">zmq_proxy_sub_urls</span> : list[str]</code></dt>
<dd>
<div class="desc"></div>
</dd>
Expand Down Expand Up @@ -415,8 +419,9 @@ <h4><code><a title="enrgdaq.daq.jobs.remote.DAQJobRemote" href="#enrgdaq.daq.job
<h4><code><a title="enrgdaq.daq.jobs.remote.DAQJobRemoteConfig" href="#enrgdaq.daq.jobs.remote.DAQJobRemoteConfig">DAQJobRemoteConfig</a></code></h4>
<ul class="">
<li><code><a title="enrgdaq.daq.jobs.remote.DAQJobRemoteConfig.topics" href="#enrgdaq.daq.jobs.remote.DAQJobRemoteConfig.topics">topics</a></code></li>
<li><code><a title="enrgdaq.daq.jobs.remote.DAQJobRemoteConfig.zmq_local_url" href="#enrgdaq.daq.jobs.remote.DAQJobRemoteConfig.zmq_local_url">zmq_local_url</a></code></li>
<li><code><a title="enrgdaq.daq.jobs.remote.DAQJobRemoteConfig.zmq_remote_urls" href="#enrgdaq.daq.jobs.remote.DAQJobRemoteConfig.zmq_remote_urls">zmq_remote_urls</a></code></li>
<li><code><a title="enrgdaq.daq.jobs.remote.DAQJobRemoteConfig.use_xsub" href="#enrgdaq.daq.jobs.remote.DAQJobRemoteConfig.use_xsub">use_xsub</a></code></li>
<li><code><a title="enrgdaq.daq.jobs.remote.DAQJobRemoteConfig.zmq_proxy_pub_url" href="#enrgdaq.daq.jobs.remote.DAQJobRemoteConfig.zmq_proxy_pub_url">zmq_proxy_pub_url</a></code></li>
<li><code><a title="enrgdaq.daq.jobs.remote.DAQJobRemoteConfig.zmq_proxy_sub_urls" href="#enrgdaq.daq.jobs.remote.DAQJobRemoteConfig.zmq_proxy_sub_urls">zmq_proxy_sub_urls</a></code></li>
</ul>
</li>
</ul>
Expand Down
Loading

0 comments on commit 8a15380

Please sign in to comment.