diff --git a/python/ray/serve/_private/replica_result.py b/python/ray/serve/_private/replica_result.py index c766ef56d82a..c909da112765 100644 --- a/python/ray/serve/_private/replica_result.py +++ b/python/ray/serve/_private/replica_result.py @@ -35,6 +35,20 @@ def add_done_callback(self, callback: Callable): def cancel(self): raise NotImplementedError + @abstractmethod + def to_object_ref(self, timeout_s: Optional[float]) -> ray.ObjectRef: + raise NotImplementedError + + @abstractmethod + async def to_object_ref_async(self) -> ray.ObjectRef: + raise NotImplementedError + + @abstractmethod + def to_object_ref_gen(self) -> ray.ObjectRefGenerator: + # NOTE(edoakes): there is only a sync version of this method because it + # does not block like `to_object_ref` (so there's also no timeout argument). + raise NotImplementedError + class ActorReplicaResult(ReplicaResult): def __init__( @@ -54,48 +68,10 @@ def __init__( else: self._obj_ref = obj_ref_or_gen - @property - def obj_ref(self) -> Optional[ray.ObjectRef]: - return self._obj_ref - - @property - def obj_ref_gen(self) -> Optional[ray.ObjectRefGenerator]: - return self._obj_ref_gen - - def resolve_gen_to_ref_if_necessary_sync( - self, timeout_s: Optional[float] = None - ) -> Optional[ray.ObjectRef]: - """Returns the object ref pointing to the result.""" - - # NOTE(edoakes): this section needs to be guarded with a lock and the resulting - # object ref cached in order to avoid calling `__next__()` to - # resolve to the underlying object ref more than once. - # See: https://github.com/ray-project/ray/issues/43879. - with self._object_ref_or_gen_sync_lock: - if self._obj_ref is None and not self._is_streaming: - # Populate _obj_ref - obj_ref = self._obj_ref_gen._next_sync(timeout_s=timeout_s) - - # Check for timeout - if obj_ref.is_nil(): - raise TimeoutError("Timed out resolving to ObjectRef.") - - self._obj_ref = obj_ref - - return self._obj_ref - - async def resolve_gen_to_ref_if_necessary_async(self) -> Optional[ray.ObjectRef]: - """Returns the object ref pointing to the result.""" - - # NOTE(edoakes): this section needs to be guarded with a lock and the resulting - # object ref cached in order to avoid calling `__anext__()` to - # resolve to the underlying object ref more than once. - # See: https://github.com/ray-project/ray/issues/43879. - with self._object_ref_or_gen_sync_lock: - if self._obj_ref is None and not self._is_streaming: - self._obj_ref = await self._obj_ref_gen.__anext__() - - return self._obj_ref + if self._is_streaming: + assert ( + self._obj_ref_gen is not None + ), "An ObjectRefGenerator must be passed for streaming requests." def _process_response(f: Union[Callable, Coroutine]): @wraps(f) @@ -120,44 +96,40 @@ async def async_wrapper(self, *args, **kwargs): @_process_response def get(self, timeout_s: Optional[float]): assert ( - self._obj_ref is not None or not self._is_streaming - ), "get() can only be called on a non-streaming ActorReplicaResult" + not self._is_streaming + ), "get() can only be called on a unary ActorReplicaResult." start_time_s = time.time() - self.resolve_gen_to_ref_if_necessary_sync(timeout_s) - + object_ref = self.to_object_ref(timeout_s=timeout_s) remaining_timeout_s = calculate_remaining_timeout( timeout_s=timeout_s, start_time_s=start_time_s, curr_time_s=time.time(), ) - return ray.get(self._obj_ref, timeout=remaining_timeout_s) + return ray.get(object_ref, timeout=remaining_timeout_s) @_process_response async def get_async(self): assert ( - self._obj_ref is not None or not self._is_streaming - ), "get_async() can only be called on a non-streaming ActorReplicaResult" + not self._is_streaming + ), "get_async() can only be called on a unary ActorReplicaResult." - await self.resolve_gen_to_ref_if_necessary_async() - return await self._obj_ref + return await (await self.to_object_ref_async()) @_process_response def __next__(self): - assert self._obj_ref_gen is not None, ( - "next() can only be called on an ActorReplicaResult initialized with a " - "ray.ObjectRefGenerator" - ) + assert ( + self._is_streaming + ), "next() can only be called on a streaming ActorReplicaResult." next_obj_ref = self._obj_ref_gen.__next__() return ray.get(next_obj_ref) @_process_response async def __anext__(self): - assert self._obj_ref_gen is not None, ( - "anext() can only be called on an ActorReplicaResult initialized with a " - "ray.ObjectRefGenerator" - ) + assert ( + self._is_streaming + ), "__anext__() can only be called on a streaming ActorReplicaResult." next_obj_ref = await self._obj_ref_gen.__anext__() return await next_obj_ref @@ -173,3 +145,44 @@ def cancel(self): ray.cancel(self._obj_ref_gen) else: ray.cancel(self._obj_ref) + + def to_object_ref(self, *, timeout_s: Optional[float] = None) -> ray.ObjectRef: + assert ( + not self._is_streaming + ), "to_object_ref can only be called on a unary ReplicaActorResult." + + # NOTE(edoakes): this section needs to be guarded with a lock and the resulting + # object ref cached in order to avoid calling `__next__()` to + # resolve to the underlying object ref more than once. + # See: https://github.com/ray-project/ray/issues/43879. + with self._object_ref_or_gen_sync_lock: + if self._obj_ref is None: + obj_ref = self._obj_ref_gen._next_sync(timeout_s=timeout_s) + if obj_ref.is_nil(): + raise TimeoutError("Timed out resolving to ObjectRef.") + + self._obj_ref = obj_ref + + return self._obj_ref + + async def to_object_ref_async(self) -> ray.ObjectRef: + assert ( + not self._is_streaming + ), "to_object_ref_async can only be called on a unary ReplicaActorResult." + + # NOTE(edoakes): this section needs to be guarded with a lock and the resulting + # object ref cached in order to avoid calling `__anext__()` to + # resolve to the underlying object ref more than once. + # See: https://github.com/ray-project/ray/issues/43879. + with self._object_ref_or_gen_sync_lock: + if self._obj_ref is None: + self._obj_ref = await self._obj_ref_gen.__anext__() + + return self._obj_ref + + def to_object_ref_gen(self) -> ray.ObjectRefGenerator: + assert ( + self._is_streaming + ), "to_object_ref_gen can only be called on a streaming ReplicaActorResult." + + return self._obj_ref_gen diff --git a/python/ray/serve/handle.py b/python/ray/serve/handle.py index cb85c42c53ba..7cfb24bc67f9 100644 --- a/python/ray/serve/handle.py +++ b/python/ray/serve/handle.py @@ -425,8 +425,7 @@ def result(self, *, timeout_s: Optional[float] = None) -> Any: if is_running_in_asyncio_loop(): raise RuntimeError( "Sync methods should not be called from within an `asyncio` event " - "loop. Use `await response` or `await response._to_object_ref()` " - "instead." + "loop. Use `await response` instead." ) start_time_s = time.time() @@ -452,7 +451,7 @@ async def _to_object_ref(self) -> ray.ObjectRef: ServeUsageTag.DEPLOYMENT_HANDLE_TO_OBJECT_REF_API_USED.record("1") replica_result = await self._fetch_future_result_async() - return await replica_result.resolve_gen_to_ref_if_necessary_async() + return await replica_result.to_object_ref_async() @DeveloperAPI def _to_object_ref_sync( @@ -478,8 +477,7 @@ def _to_object_ref_sync( if not _allow_running_in_asyncio_loop and is_running_in_asyncio_loop(): raise RuntimeError( "Sync methods should not be called from within an `asyncio` event " - "loop. Use `await response` or `await response._to_object_ref()` " - "instead." + "loop. Use `await response._to_object_ref()` instead." ) # First, fetch the result of the future @@ -492,7 +490,7 @@ def _to_object_ref_sync( start_time_s=start_time_s, curr_time_s=time.time(), ) - return replica_result.resolve_gen_to_ref_if_necessary_sync(remaining_timeout_s) + return replica_result.to_object_ref(timeout_s=remaining_timeout_s) @PublicAPI(stability="beta") @@ -553,7 +551,7 @@ async def __call__(self, limit: int) -> AsyncIterator[int]: def __await__(self): raise TypeError( "`DeploymentResponseGenerator` cannot be awaited directly. Use `async for` " - "or `_to_object_ref_gen` instead." + "or `await response.__anext__() instead`." ) def __aiter__(self) -> AsyncIterator[Any]: @@ -570,8 +568,7 @@ def __next__(self) -> Any: if is_running_in_asyncio_loop(): raise RuntimeError( "Sync methods should not be called from within an `asyncio` event " - "loop. Use `await response` or `await response._to_object_ref()` " - "instead." + "loop. Use `async for` or `await response.__anext__()` instead." ) replica_result = self._fetch_future_result_sync() @@ -589,7 +586,7 @@ async def _to_object_ref_gen(self) -> ObjectRefGenerator: ServeUsageTag.DEPLOYMENT_HANDLE_TO_OBJECT_REF_API_USED.record("1") replica_result = await self._fetch_future_result_async() - return replica_result.obj_ref_gen + return replica_result.to_object_ref_gen() @DeveloperAPI def _to_object_ref_gen_sync( @@ -612,12 +609,11 @@ def _to_object_ref_gen_sync( if not _allow_running_in_asyncio_loop and is_running_in_asyncio_loop(): raise RuntimeError( "Sync methods should not be called from within an `asyncio` event " - "loop. Use `await response` or `await response._to_object_ref()` " - "instead." + "loop. Use `await response._to_object_ref()` instead." ) replica_result = self._fetch_future_result_sync(_timeout_s) - return replica_result.obj_ref_gen + return replica_result.to_object_ref_gen() @PublicAPI(stability="beta") diff --git a/python/ray/serve/tests/test_actor_replica_wrapper.py b/python/ray/serve/tests/test_actor_replica_wrapper.py index 3c5b48287891..faa829c11a7e 100644 --- a/python/ray/serve/tests/test_actor_replica_wrapper.py +++ b/python/ray/serve/tests/test_actor_replica_wrapper.py @@ -121,11 +121,12 @@ async def test_send_request(setup_fake_replica, is_streaming: bool): ) replica_result = replica.send_request(pr) if is_streaming: - assert isinstance(replica_result.obj_ref_gen, ObjectRefGenerator) + assert isinstance(replica_result.to_object_ref_gen(), ObjectRefGenerator) for i in range(5): assert await replica_result.__anext__() == f"Hello-{i}" else: - assert isinstance(replica_result.obj_ref, ObjectRef) + assert isinstance(replica_result.to_object_ref(), ObjectRef) + assert isinstance(await replica_result.to_object_ref_async(), ObjectRef) assert await replica_result.get_async() == "Hello" @@ -157,12 +158,13 @@ async def test_send_request_with_rejection( if not accepted: assert replica_result is None elif is_streaming: - assert isinstance(replica_result.obj_ref_gen, ObjectRefGenerator) + assert isinstance(replica_result.to_object_ref_gen(), ObjectRefGenerator) for i in range(5): assert await replica_result.__anext__() == f"Hello-{i}" else: - assert isinstance(replica_result.obj_ref_gen, ObjectRefGenerator) - assert await replica_result.__anext__() == "Hello" + assert isinstance(replica_result.to_object_ref(), ObjectRef) + assert isinstance(await replica_result.to_object_ref_async(), ObjectRef) + assert await replica_result.get_async() == "Hello" @pytest.mark.asyncio diff --git a/python/ray/serve/tests/unit/test_router.py b/python/ray/serve/tests/unit/test_router.py index 87f71f47c720..c62cb0a99ee9 100644 --- a/python/ray/serve/tests/unit/test_router.py +++ b/python/ray/serve/tests/unit/test_router.py @@ -7,6 +7,7 @@ import pytest +import ray from ray._private.test_utils import async_wait_for_condition from ray._private.utils import get_or_create_event_loop from ray.exceptions import ActorDiedError, ActorUnavailableError @@ -61,6 +62,15 @@ def add_done_callback(self, callback: Callable): def cancel(self): raise NotImplementedError + def to_object_ref(self, timeout_s: Optional[float]) -> ray.ObjectRef: + raise NotImplementedError + + async def to_object_ref_async(self) -> ray.ObjectRef: + raise NotImplementedError + + def to_object_ref_gen(self) -> ray.ObjectRefGenerator: + raise NotImplementedError + class FakeReplica(ReplicaWrapper): def __init__(