Skip to content

Commit

Permalink
deploy: b26cb36
Browse files Browse the repository at this point in the history
  • Loading branch information
furkan-bilgin committed Dec 4, 2024
1 parent 8a15380 commit b964160
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 36 deletions.
74 changes: 41 additions & 33 deletions daq/jobs/handle_stats.html
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ <h2 class="section-title" id="header-classes">Classes</h2>
<dl>
<dt id="enrgdaq.daq.jobs.handle_stats.DAQJobHandleStats"><code class="flex name class">
<span>class <span class="ident">DAQJobHandleStats</span></span>
<span>(</span><span>config: Any,<br>supervisor_config: <a title="enrgdaq.models.SupervisorConfig" href="../../models.html#enrgdaq.models.SupervisorConfig">SupervisorConfig</a> | None = None)</span>
<span>(</span><span>config: <a title="enrgdaq.daq.jobs.handle_stats.DAQJobHandleStatsConfig" href="#enrgdaq.daq.jobs.handle_stats.DAQJobHandleStatsConfig">DAQJobHandleStatsConfig</a>,<br>**kwargs)</span>
</code></dt>
<dd>
<div class="desc"><p>Handles statistics for DAQ jobs.</p>
Expand All @@ -59,14 +59,33 @@ <h2 class="section-title" id="header-classes">Classes</h2>
config_type = DAQJobHandleStatsConfig
config: DAQJobHandleStatsConfig

_stats: dict[str, DAQJobStatsDict]

def __init__(self, config: DAQJobHandleStatsConfig, **kwargs):
super().__init__(config, **kwargs)
self._stats = {}

def start(self):
while True:
self.consume(nowait=False)
start_time = datetime.now()
self.consume()
self._save_stats()
sleep_for(DAQ_JOB_HANDLE_STATS_SLEEP_INTERVAL_SECONDS, start_time)

def handle_message(self, message: DAQJobMessageStats) -&gt; bool:
if not super().handle_message(message):
return False

# Ignore if the message has no supervisor info
if not message.daq_job_info or not message.daq_job_info.supervisor_config:
return True

self._stats[message.daq_job_info.supervisor_config.supervisor_id] = (
message.stats
)
return True

def _save_stats(self):
keys = [
&#34;supervisor&#34;,
&#34;daq_job&#34;,
Expand All @@ -90,38 +109,27 @@ <h2 class="section-title" id="header-classes">Classes</h2>
record.count,
]

if message.daq_job_info and message.daq_job_info.supervisor_config:
supervisor_id = message.daq_job_info.supervisor_config.supervisor_id
else:
supervisor_id = &#34;N/A&#34;
data_to_send = []
for daq_job_type, msg in message.stats.items():
data_to_send.append(
[
supervisor_id,
daq_job_type.__name__,
str(msg.is_alive).lower(),
*unpack_record(msg.message_in_stats),
*unpack_record(msg.message_out_stats),
*unpack_record(msg.restart_stats),
]
)

if message.daq_job_info and message.daq_job_info.supervisor_config:
tag = message.daq_job_info.supervisor_config.supervisor_id
else:
tag = None

self._put_message_out(
DAQJobMessageStoreTabular(
store_config=self.config.store_config,
keys=keys,
data=data_to_send,
tag=tag,
)
)
for supervisor_id, stats in self._stats.items():
data_to_send = []
for daq_job_type, msg in stats.items():
data_to_send.append(
[
supervisor_id,
daq_job_type.__name__,
str(msg.is_alive).lower(),
*unpack_record(msg.message_in_stats),
*unpack_record(msg.message_out_stats),
*unpack_record(msg.restart_stats),
]
)

return True</code></pre>
self._put_message_out(
DAQJobMessageStoreTabular(
store_config=self.config.store_config,
keys=keys,
data=data_to_send,
)
)</code></pre>
</details>
<h3>Ancestors</h3>
<ul class="hlist">
Expand Down
4 changes: 1 addition & 3 deletions daq/jobs/remote.html
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,8 @@ <h2 id="attributes">Attributes</h2>

def handle_message(self, message: DAQJobMessage) -&gt; bool:
if (
# Do not send stats messages to the remote
isinstance(message, DAQJobMessageStats)
# Ignore if we already received the message
or message.id in self._remote_message_ids
message.id in self._remote_message_ids
# Ignore if the message is not allowed by the DAQ Job
or not super().handle_message(message)
# Ignore if the message is remote, meaning it was sent by another Supervisor
Expand Down

0 comments on commit b964160

Please sign in to comment.