Skip to content

Commit

Permalink
Clean up asyncio usage
Browse files Browse the repository at this point in the history
  • Loading branch information
brentyi committed Nov 6, 2023
1 parent be93f41 commit 49d9feb
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
12 changes: 5 additions & 7 deletions src/viser/infra/_async_message_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ async def window_generator(
# Wait until there are new messages available.
most_recent_message_id = self.message_counter - 1
while last_sent_id >= most_recent_message_id:
next_message = self.message_event.wait()
next_message = asyncio.create_task(self.message_event.wait())
flush_wait = asyncio.create_task(self._flush_event.wait())
send_window = False
try:
flush_wait = self._flush_event.wait()
done, pending = await asyncio.wait( # type: ignore
done, pending = await asyncio.wait(
[flush_wait, next_message],
timeout=window.max_time_until_ready(),
return_when=asyncio.FIRST_COMPLETED,
Expand Down Expand Up @@ -156,17 +156,15 @@ async def wait_and_append_to_window(
self.append_to_window(await message)
return True

message = asyncio.shield(message)
flush_wait = asyncio.shield(flush_event.wait())
(done, pending) = await asyncio.wait( # type: ignore
flush_wait = asyncio.create_task(flush_event.wait())
done, pending = await asyncio.wait(
[message, flush_wait],
timeout=self.max_time_until_ready(),
return_when=asyncio.FIRST_COMPLETED,
)
del pending
if flush_wait in done:
flush_event.clear()
flush_wait.cancel()
if message in cast(Set[Any], done): # Cast to prevent type narrowing.
self.append_to_window(await message)
return True
Expand Down
4 changes: 3 additions & 1 deletion src/viser/infra/_infra.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,9 @@ def handle_incoming(message: Message) -> None:
# queue get() tasks, which suppresses a "Task was destroyed but it is
# pending" error.
await client_state.message_buffer.put(DONE_SENTINEL)
self._flush_event_from_client_id.pop(client_id)

# Trigger then delete the flush event.
self._flush_event_from_client_id.pop(client_id).set()

# Disconnection callbacks.
for cb in self._client_disconnect_cb:
Expand Down

0 comments on commit 49d9feb

Please sign in to comment.