Skip to content

Commit

Permalink
feat: add SimpleDisposable
Browse files Browse the repository at this point in the history
  • Loading branch information
phi-friday committed Feb 25, 2024
1 parent 6018959 commit c81c685
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 18 deletions.
4 changes: 3 additions & 1 deletion src/async_wrapper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Any

from .convert import async_to_sync, sync_to_async, toggle_func
from .pipe import Pipe
from .pipe import Pipe, SimpleDisposable, create_disposable
from .queue import Queue, create_queue
from .task_group import TaskGroupWrapper, create_task_group_wrapper
from .wait import Completed, Waiter, wait_for
Expand All @@ -14,12 +14,14 @@
"Waiter",
"Completed",
"Pipe",
"SimpleDisposable",
"toggle_func",
"async_to_sync",
"sync_to_async",
"create_task_group_wrapper",
"create_queue",
"wait_for",
"create_disposable",
]

__version__: str
Expand Down
14 changes: 7 additions & 7 deletions src/async_wrapper/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
"QueueClosedError",
"QueueBrokenError",
"QueueRestrictedError",
"PipeError",
"PipeAlreadyDisposedError",
"DisposableError",
"AlreadyDisposedError",
]


Expand Down Expand Up @@ -73,13 +73,13 @@ class QueueRestrictedError(QueueError):
"""queue is restricted but used"""


class PipeError(Exception):
class DisposableError(Exception):
"""
Base exception for pipe-related errors.
Base exception for disposable-related errors.
This exception serves as the base class for various pipe-related exceptions.
This exception serves as the base class for various disposable-related exceptions.
"""


class PipeAlreadyDisposedError(PipeError):
"""Indicates that an attempt was made to use a pipe that has already been disposed of.""" # noqa: E501
class AlreadyDisposedError(DisposableError):
"""Indicates that an attempt was made to use a disposable that has already been disposed of.""" # noqa: E501
55 changes: 49 additions & 6 deletions src/async_wrapper/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import anyio
from typing_extensions import TypedDict, TypeVar, override

from async_wrapper.exception import PipeAlreadyDisposedError
from async_wrapper.exception import AlreadyDisposedError

if TYPE_CHECKING:
from anyio.abc import CapacityLimiter, Lock, Semaphore
Expand All @@ -26,7 +26,7 @@ class Synchronization(TypedDict, total=False):
limiter: CapacityLimiter


__all__ = ["Disposable", "Pipe"]
__all__ = ["Disposable", "SimpleDisposable", "Pipe", "create_disposable"]

InputT = TypeVar("InputT", infer_variance=True)
OutputT = TypeVar("OutputT", infer_variance=True)
Expand Down Expand Up @@ -58,6 +58,34 @@ async def dispose(self) -> Any:
"""Disposes the resource and releases any associated resources."""


class SimpleDisposable(Disposable[InputT, OutputT], Generic[InputT, OutputT]):
"""simple disposable impl."""

__slots__ = ("_func", "_dispose", "_is_disposed")

def __init__(
self, func: Callable[[InputT], Awaitable[OutputT]], *, dispose: bool = True
) -> None:
self._func = func
self._dispose = dispose
self._is_disposed = False

@property
def is_disposed(self) -> bool:
"""is disposed"""
return self._is_disposed

@override
async def next(self, value: InputT) -> OutputT:
if self._is_disposed:
raise AlreadyDisposedError("disposable already disposed")
return await self._func(value)

@override
async def dispose(self) -> Any:
self._is_disposed = True


class Pipe(Disposable[InputT, OutputT], Generic[InputT, OutputT]):
"""
Implements a pipe that can be used to communicate data between coroutines.
Expand Down Expand Up @@ -109,7 +137,7 @@ def is_disposed(self) -> bool:
@override
async def next(self, value: InputT) -> OutputT:
if self._is_disposed:
raise PipeAlreadyDisposedError("pipe already disposed")
raise AlreadyDisposedError("pipe already disposed")

output = await self._listener(value)

Expand Down Expand Up @@ -150,13 +178,28 @@ def subscribe(
dispose: Whether to dispose the listener when the pipe is disposed.
"""
if self._is_disposed:
raise PipeAlreadyDisposedError("pipe already disposed")
raise AlreadyDisposedError("pipe already disposed")

if not isinstance(listener, Disposable):
listener = Pipe(listener)
listener = SimpleDisposable(listener)
self._listeners.append((listener, dispose))


def create_disposable(
func: Callable[[InputT], Awaitable[OutputT]], *, dispose: bool = True
) -> SimpleDisposable[InputT, OutputT]:
"""SimpleDisposable shortcut
Args:
func: awaitable function.
dispose: dispose flag. Defaults to True.
Returns:
SimpleDisposable object
"""
return SimpleDisposable(func, dispose=dispose)


async def _enter_context(stack: AsyncExitStack, context: Synchronization) -> None:
semaphore = context.get("semaphore")
if semaphore is not None:
Expand All @@ -176,7 +219,7 @@ async def _call_next(
) -> None:
async with AsyncExitStack() as stack:
await _enter_context(stack, context)
with suppress(PipeAlreadyDisposedError):
with suppress(AlreadyDisposedError):
await disposable.next(value)


Expand Down
32 changes: 28 additions & 4 deletions tests/test_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from typing_extensions import TypeVar

from .base import Timer
from async_wrapper.exception import PipeAlreadyDisposedError
from async_wrapper.pipe import Pipe
from async_wrapper.exception import AlreadyDisposedError
from async_wrapper.pipe import Disposable, Pipe, SimpleDisposable, create_disposable

pytestmark = pytest.mark.anyio

Expand Down Expand Up @@ -329,13 +329,37 @@ async def hit(value: Any) -> None: # noqa: ARG001
await pipe.dispose()
assert pipe.is_disposed is True

with pytest.raises(PipeAlreadyDisposedError, match="pipe already disposed"):
with pytest.raises(AlreadyDisposedError, match="pipe already disposed"):
await pipe.next(1)


async def test_subscribe_after_disposed():
pipe = Pipe(return_self)
await pipe.dispose()
_, setter = use_value()
with pytest.raises(PipeAlreadyDisposedError, match="pipe already disposed"):
with pytest.raises(AlreadyDisposedError, match="pipe already disposed"):
pipe.subscribe(setter)


async def test_simple_disposable():
disposable = SimpleDisposable(return_self)
assert isinstance(disposable, Disposable)


async def test_construct_disposable():
disposable = create_disposable(return_self)
assert isinstance(disposable, Disposable)


async def test_simple_dispose():
disposable = create_disposable(return_self)
assert disposable.is_disposed is False
await disposable.dispose()
assert disposable.is_disposed is True


async def test_simple_next_after_disposed():
disposable: SimpleDisposable[Any, Any] = create_disposable(return_self)
await disposable.dispose()
with pytest.raises(AlreadyDisposedError, match="disposable already disposed"):
await disposable.next(1)

0 comments on commit c81c685

Please sign in to comment.