Skip to content

Commit

Permalink
Merge pull request #127 from richardsheridan/explicit_context
Browse files Browse the repository at this point in the history
Implement explicit context objects instead of cache scopes
  • Loading branch information
richardsheridan authored Oct 8, 2021
2 parents c922b49 + acdd599 commit 8bce917
Show file tree
Hide file tree
Showing 13 changed files with 478 additions and 339 deletions.
8 changes: 4 additions & 4 deletions docs/source/examples/single_use_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ async def amain():
trio_parallel.current_default_worker_limiter().total_tokens = 4

print("single use worker behavior:")
async with trio_parallel.cache_scope(retire=after_single_use):
async with trio_parallel.open_worker_context(retire=after_single_use) as ctx:
async with trio.open_nursery() as nursery:
for i in range(40):
nursery.start_soon(trio_parallel.run_sync, worker, i)
nursery.start_soon(ctx.run_sync, worker, i)

print("dual use worker behavior:")
async with trio_parallel.cache_scope(retire=after_dual_use):
async with trio_parallel.open_worker_context(retire=after_dual_use) as ctx:
async with trio.open_nursery() as nursery:
for i in range(40):
nursery.start_soon(trio_parallel.run_sync, worker, i)
nursery.start_soon(ctx.run_sync, worker, i)

print("default behavior:")
async with trio.open_nursery() as nursery:
Expand Down
23 changes: 19 additions & 4 deletions docs/source/history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,31 @@ Release history

.. towncrier release notes start
trio-parallel 1.0.0a2 (2021-10-08)
----------------------------------

Features
~~~~~~~~

- Opportunistically use ``cloudpickle`` to serialize jobs and results. (`#115 <https://github.com/richardsheridan/trio-parallel/issues/115>`__)
- Timeout arguments of :func:`open_worker_context`, ``idle_timeout`` and ``grace_period``,
now work like trio timeouts, accepting any non-negative `~float` value. (`#116 <https://github.com/richardsheridan/trio-parallel/issues/116>`__)
- Worker process startup is now faster, by importing trio lazily (`#117 <https://github.com/richardsheridan/trio-parallel/issues/117>`__)
- :func:`open_worker_context` now returns a context object that can be used to run
functions explicitly in a certain context (:meth:`WorkerContext.run_sync`) rather
than implicitly altering the behavior of :func:`trio_parallel.run_sync`. (`#127 <https://github.com/richardsheridan/trio-parallel/issues/127>`__)


trio-parallel 1.0.0a1 (2021-09-05)
----------------------------------

Features
~~~~~~~~

- Added configuration options for the grace periods permitted to worker caches upon
shutdown. This includes a new keyword argument for :func:`cache_scope` and the new top
level function :func:`default_shutdown_grace_period`. (`#108 <https://github.com/richardsheridan/trio-parallel/issues/108>`__)
- :func:`cache_scope` gained a new argument, ``init``, and ``retire`` is no longer
shutdown. This includes a new keyword argument for :func:`open_worker_context` and
a new top level function :func:`atexit_shutdown_grace_period`. (`#108 <https://github.com/richardsheridan/trio-parallel/issues/108>`__)
- :func:`open_worker_context` gained a new argument, ``init``, and ``retire`` is no longer
called before the first job in the worker. (`#110 <https://github.com/richardsheridan/trio-parallel/issues/110>`__)


Expand All @@ -24,7 +39,7 @@ trio-parallel 1.0.0a0 (2021-07-22)
Features
~~~~~~~~

- The behavior and lifetime of worker processes can now be customized with the :func:`cache_scope` context manager. (`#19 <https://github.com/richardsheridan/trio-parallel/issues/19>`__)
- The behavior and lifetime of worker processes can now be customized with the :func:`open_worker_context` context manager. (`#19 <https://github.com/richardsheridan/trio-parallel/issues/19>`__)


trio-parallel 0.5.1 (2021-05-05)
Expand Down
10 changes: 7 additions & 3 deletions docs/source/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@ Running CPU-bound functions in parallel

.. autofunction:: current_default_worker_limiter

.. autofunction:: default_shutdown_grace_period
.. autofunction:: atexit_shutdown_grace_period

Configuring workers
-------------------

.. autofunction:: cache_scope
.. autofunction:: open_worker_context
:async-with: ctx

.. autoclass:: WorkerType
.. autoclass:: WorkerContext()
:members:

.. autoclass:: WorkerType()

Exceptions and warnings
-----------------------
Expand Down
1 change: 0 additions & 1 deletion newsfragments/115.feature.rst

This file was deleted.

2 changes: 0 additions & 2 deletions newsfragments/116.feature.rst

This file was deleted.

1 change: 0 additions & 1 deletion newsfragments/117.feature.rst

This file was deleted.

5 changes: 3 additions & 2 deletions trio_parallel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

from ._impl import (
run_sync,
cache_scope,
open_worker_context,
WorkerContext,
WorkerType,
current_default_worker_limiter,
default_shutdown_grace_period,
atexit_shutdown_grace_period,
)
from ._abc import BrokenWorkerError
77 changes: 55 additions & 22 deletions trio_parallel/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
trio-parallel API minimal, we can put in new workers and options without needing
frontend rewrites."""

from abc import ABC, abstractmethod
from collections import deque
from typing import Optional, Callable
from abc import ABC, abstractmethod, ABCMeta
from typing import Optional, Callable, TypeVar, Type, Any, Deque

from outcome import Outcome

Expand All @@ -19,28 +18,12 @@ class BrokenWorkerError(RuntimeError):
This error is not typically encountered in normal use, and indicates a severe
failure of either trio-parallel or the code that was executing in the worker.
Some example failures may include segfaults, being killed by an external signal,
or failing to cleanly shut down within a specified ``grace_period``. (See
:func:`atexit_shutdown_grace_period` and :func:`open_worker_context`.)
"""


class WorkerCache(deque, ABC):
@abstractmethod
def prune(self):
"""Clean up any resources associated with workers that have timed out
while idle in the cache."""

@abstractmethod
def shutdown(self, timeout):
"""Stop and clean up any resources associated with all cached workers.
Args:
timeout: Time in seconds to wait for graceful shutdown before
raising.
Raises:
BrokenWorkerError: Raised if any workers fail to respond to a graceful
shutdown signal within ``grace_period``."""


class AbstractWorker(ABC):
@abstractmethod
def __init__(
Expand Down Expand Up @@ -79,3 +62,53 @@ def shutdown(self):
@abstractmethod
async def wait(self):
"""Wait for the worker to terminate."""


class WorkerCache(Deque[AbstractWorker], ABC):
@abstractmethod
def prune(self):
"""Clean up any resources associated with workers that have timed out
while idle in the cache."""

@abstractmethod
def shutdown(self, timeout):
"""Stop and clean up any resources associated with all cached workers.
Args:
timeout: Time in seconds to wait for graceful shutdown before
raising.
Raises:
BrokenWorkerError: Raised if any workers fail to respond to a graceful
shutdown signal within ``grace_period``."""


# vendored from trio so that we can lazy import trio
T = TypeVar("T")


class NoPublicConstructor(ABCMeta):
"""Metaclass that ensures a private constructor.
If a class uses this metaclass like this::
class SomeClass(metaclass=NoPublicConstructor):
pass
The metaclass will ensure that no sub class can be created, and that no instance
can be initialized.
If you try to instantiate your class (SomeClass()), a TypeError will be thrown.
Raises
------
- TypeError if a sub class or an instance is created.
"""

def __call__(cls, *args, **kwargs):
raise TypeError(
f"{cls.__module__}.{cls.__qualname__} has no public constructor"
)

def _create(cls: Type[T], *args: Any, **kwargs: Any) -> T:
return super().__call__(*args, **kwargs) # type: ignore
Loading

0 comments on commit 8bce917

Please sign in to comment.