Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set end_time in ThreadBasedCyclicSendTask.start() #1871

Merged
merged 1 commit into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions can/broadcastmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ def __init__(
"""
super().__init__(messages, period)
self.duration = duration
self.end_time: Optional[float] = None


class RestartableCyclicTaskABC(CyclicSendTaskABC, abc.ABC):
Expand Down Expand Up @@ -299,9 +300,6 @@ def __init__(
self.send_lock = lock
self.stopped = True
self.thread: Optional[threading.Thread] = None
self.end_time: Optional[float] = (
time.perf_counter() + duration if duration else None
)
self.on_error = on_error
self.modifier_callback = modifier_callback

Expand Down Expand Up @@ -341,6 +339,10 @@ def start(self) -> None:
self.thread = threading.Thread(target=self._run, name=name)
self.thread.daemon = True

self.end_time: Optional[float] = (
time.perf_counter() + self.duration if self.duration else None
)

if self.event and PYWIN32:
PYWIN32.set_timer(self.event, self.period_ms)

Expand All @@ -356,6 +358,7 @@ def _run(self) -> None:

while not self.stopped:
if self.end_time is not None and time.perf_counter() >= self.end_time:
self.stop()
break

try:
Expand Down
32 changes: 28 additions & 4 deletions test/simplecyclic_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,25 +155,49 @@ def test_stopping_perodic_tasks(self):
def test_restart_perodic_tasks(self):
period = 0.01
safe_timeout = period * 5 if not IS_PYPY else 1.0
duration = 0.3

msg = can.Message(
is_extended_id=False, arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5, 6, 7]
)

def _read_all_messages(_bus: can.interfaces.virtual.VirtualBus) -> None:
sleep(safe_timeout)
while not _bus.queue.empty():
_bus.recv(timeout=period)
sleep(safe_timeout)

with can.ThreadSafeBus(interface="virtual", receive_own_messages=True) as bus:
task = bus.send_periodic(msg, period)
self.assertIsInstance(task, can.broadcastmanager.RestartableCyclicTaskABC)
self.assertIsInstance(task, can.broadcastmanager.ThreadBasedCyclicSendTask)

# Test that the task is sending messages
sleep(safe_timeout)
assert not bus.queue.empty(), "messages should have been transmitted"

# Stop the task and check that messages are no longer being sent
bus.stop_all_periodic_tasks(remove_tasks=False)
_read_all_messages(bus)
assert bus.queue.empty(), "messages should not have been transmitted"

# Restart the task and check that messages are being sent again
task.start()
sleep(safe_timeout)
while not bus.queue.empty():
bus.recv(timeout=period)
sleep(safe_timeout)
assert not bus.queue.empty(), "messages should have been transmitted"

# Stop the task and check that messages are no longer being sent
bus.stop_all_periodic_tasks(remove_tasks=False)
_read_all_messages(bus)
assert bus.queue.empty(), "messages should not have been transmitted"

# Restart the task with limited duration and wait until it stops
task.duration = duration
task.start()
sleep(duration + safe_timeout)
assert task.stopped
assert time.time() > task.end_time
assert not bus.queue.empty(), "messages should have been transmitted"
_read_all_messages(bus)
assert bus.queue.empty(), "messages should not have been transmitted"

# Restart the task and check that messages are being sent again
Expand Down
Loading