Skip to content

Commit

Permalink
[serve] Add to_object_ref methods to ReplicaResult (ray-project#4…
Browse files Browse the repository at this point in the history
…8544)

Formalizes the interface for these methods since non-actor
`ReplicaResult` implementations like `LocalReplicaResult` won't
implement them and need to explicitly raise a good error message when
they occur.

---------

Signed-off-by: Edward Oakes <[email protected]>
Co-authored-by: Cindy Zhang <[email protected]>
  • Loading branch information
edoakes and zcin authored Nov 4, 2024
1 parent d5d03e6 commit 1eeb896
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 77 deletions.
131 changes: 72 additions & 59 deletions python/ray/serve/_private/replica_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@ def add_done_callback(self, callback: Callable):
def cancel(self):
raise NotImplementedError

@abstractmethod
def to_object_ref(self, timeout_s: Optional[float]) -> ray.ObjectRef:
raise NotImplementedError

@abstractmethod
async def to_object_ref_async(self) -> ray.ObjectRef:
raise NotImplementedError

@abstractmethod
def to_object_ref_gen(self) -> ray.ObjectRefGenerator:
# NOTE(edoakes): there is only a sync version of this method because it
# does not block like `to_object_ref` (so there's also no timeout argument).
raise NotImplementedError


class ActorReplicaResult(ReplicaResult):
def __init__(
Expand All @@ -54,48 +68,10 @@ def __init__(
else:
self._obj_ref = obj_ref_or_gen

@property
def obj_ref(self) -> Optional[ray.ObjectRef]:
return self._obj_ref

@property
def obj_ref_gen(self) -> Optional[ray.ObjectRefGenerator]:
return self._obj_ref_gen

def resolve_gen_to_ref_if_necessary_sync(
self, timeout_s: Optional[float] = None
) -> Optional[ray.ObjectRef]:
"""Returns the object ref pointing to the result."""

# NOTE(edoakes): this section needs to be guarded with a lock and the resulting
# object ref cached in order to avoid calling `__next__()` to
# resolve to the underlying object ref more than once.
# See: https://github.com/ray-project/ray/issues/43879.
with self._object_ref_or_gen_sync_lock:
if self._obj_ref is None and not self._is_streaming:
# Populate _obj_ref
obj_ref = self._obj_ref_gen._next_sync(timeout_s=timeout_s)

# Check for timeout
if obj_ref.is_nil():
raise TimeoutError("Timed out resolving to ObjectRef.")

self._obj_ref = obj_ref

return self._obj_ref

async def resolve_gen_to_ref_if_necessary_async(self) -> Optional[ray.ObjectRef]:
"""Returns the object ref pointing to the result."""

# NOTE(edoakes): this section needs to be guarded with a lock and the resulting
# object ref cached in order to avoid calling `__anext__()` to
# resolve to the underlying object ref more than once.
# See: https://github.com/ray-project/ray/issues/43879.
with self._object_ref_or_gen_sync_lock:
if self._obj_ref is None and not self._is_streaming:
self._obj_ref = await self._obj_ref_gen.__anext__()

return self._obj_ref
if self._is_streaming:
assert (
self._obj_ref_gen is not None
), "An ObjectRefGenerator must be passed for streaming requests."

def _process_response(f: Union[Callable, Coroutine]):
@wraps(f)
Expand All @@ -120,44 +96,40 @@ async def async_wrapper(self, *args, **kwargs):
@_process_response
def get(self, timeout_s: Optional[float]):
assert (
self._obj_ref is not None or not self._is_streaming
), "get() can only be called on a non-streaming ActorReplicaResult"
not self._is_streaming
), "get() can only be called on a unary ActorReplicaResult."

start_time_s = time.time()
self.resolve_gen_to_ref_if_necessary_sync(timeout_s)

object_ref = self.to_object_ref(timeout_s=timeout_s)
remaining_timeout_s = calculate_remaining_timeout(
timeout_s=timeout_s,
start_time_s=start_time_s,
curr_time_s=time.time(),
)
return ray.get(self._obj_ref, timeout=remaining_timeout_s)
return ray.get(object_ref, timeout=remaining_timeout_s)

@_process_response
async def get_async(self):
assert (
self._obj_ref is not None or not self._is_streaming
), "get_async() can only be called on a non-streaming ActorReplicaResult"
not self._is_streaming
), "get_async() can only be called on a unary ActorReplicaResult."

await self.resolve_gen_to_ref_if_necessary_async()
return await self._obj_ref
return await (await self.to_object_ref_async())

@_process_response
def __next__(self):
assert self._obj_ref_gen is not None, (
"next() can only be called on an ActorReplicaResult initialized with a "
"ray.ObjectRefGenerator"
)
assert (
self._is_streaming
), "next() can only be called on a streaming ActorReplicaResult."

next_obj_ref = self._obj_ref_gen.__next__()
return ray.get(next_obj_ref)

@_process_response
async def __anext__(self):
assert self._obj_ref_gen is not None, (
"anext() can only be called on an ActorReplicaResult initialized with a "
"ray.ObjectRefGenerator"
)
assert (
self._is_streaming
), "__anext__() can only be called on a streaming ActorReplicaResult."

next_obj_ref = await self._obj_ref_gen.__anext__()
return await next_obj_ref
Expand All @@ -173,3 +145,44 @@ def cancel(self):
ray.cancel(self._obj_ref_gen)
else:
ray.cancel(self._obj_ref)

def to_object_ref(self, *, timeout_s: Optional[float] = None) -> ray.ObjectRef:
assert (
not self._is_streaming
), "to_object_ref can only be called on a unary ReplicaActorResult."

# NOTE(edoakes): this section needs to be guarded with a lock and the resulting
# object ref cached in order to avoid calling `__next__()` to
# resolve to the underlying object ref more than once.
# See: https://github.com/ray-project/ray/issues/43879.
with self._object_ref_or_gen_sync_lock:
if self._obj_ref is None:
obj_ref = self._obj_ref_gen._next_sync(timeout_s=timeout_s)
if obj_ref.is_nil():
raise TimeoutError("Timed out resolving to ObjectRef.")

self._obj_ref = obj_ref

return self._obj_ref

async def to_object_ref_async(self) -> ray.ObjectRef:
assert (
not self._is_streaming
), "to_object_ref_async can only be called on a unary ReplicaActorResult."

# NOTE(edoakes): this section needs to be guarded with a lock and the resulting
# object ref cached in order to avoid calling `__anext__()` to
# resolve to the underlying object ref more than once.
# See: https://github.com/ray-project/ray/issues/43879.
with self._object_ref_or_gen_sync_lock:
if self._obj_ref is None:
self._obj_ref = await self._obj_ref_gen.__anext__()

return self._obj_ref

def to_object_ref_gen(self) -> ray.ObjectRefGenerator:
assert (
self._is_streaming
), "to_object_ref_gen can only be called on a streaming ReplicaActorResult."

return self._obj_ref_gen
22 changes: 9 additions & 13 deletions python/ray/serve/handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,7 @@ def result(self, *, timeout_s: Optional[float] = None) -> Any:
if is_running_in_asyncio_loop():
raise RuntimeError(
"Sync methods should not be called from within an `asyncio` event "
"loop. Use `await response` or `await response._to_object_ref()` "
"instead."
"loop. Use `await response` instead."
)

start_time_s = time.time()
Expand All @@ -452,7 +451,7 @@ async def _to_object_ref(self) -> ray.ObjectRef:
ServeUsageTag.DEPLOYMENT_HANDLE_TO_OBJECT_REF_API_USED.record("1")

replica_result = await self._fetch_future_result_async()
return await replica_result.resolve_gen_to_ref_if_necessary_async()
return await replica_result.to_object_ref_async()

@DeveloperAPI
def _to_object_ref_sync(
Expand All @@ -478,8 +477,7 @@ def _to_object_ref_sync(
if not _allow_running_in_asyncio_loop and is_running_in_asyncio_loop():
raise RuntimeError(
"Sync methods should not be called from within an `asyncio` event "
"loop. Use `await response` or `await response._to_object_ref()` "
"instead."
"loop. Use `await response._to_object_ref()` instead."
)

# First, fetch the result of the future
Expand All @@ -492,7 +490,7 @@ def _to_object_ref_sync(
start_time_s=start_time_s,
curr_time_s=time.time(),
)
return replica_result.resolve_gen_to_ref_if_necessary_sync(remaining_timeout_s)
return replica_result.to_object_ref(timeout_s=remaining_timeout_s)


@PublicAPI(stability="beta")
Expand Down Expand Up @@ -553,7 +551,7 @@ async def __call__(self, limit: int) -> AsyncIterator[int]:
def __await__(self):
raise TypeError(
"`DeploymentResponseGenerator` cannot be awaited directly. Use `async for` "
"or `_to_object_ref_gen` instead."
"or `await response.__anext__() instead`."
)

def __aiter__(self) -> AsyncIterator[Any]:
Expand All @@ -570,8 +568,7 @@ def __next__(self) -> Any:
if is_running_in_asyncio_loop():
raise RuntimeError(
"Sync methods should not be called from within an `asyncio` event "
"loop. Use `await response` or `await response._to_object_ref()` "
"instead."
"loop. Use `async for` or `await response.__anext__()` instead."
)

replica_result = self._fetch_future_result_sync()
Expand All @@ -589,7 +586,7 @@ async def _to_object_ref_gen(self) -> ObjectRefGenerator:
ServeUsageTag.DEPLOYMENT_HANDLE_TO_OBJECT_REF_API_USED.record("1")

replica_result = await self._fetch_future_result_async()
return replica_result.obj_ref_gen
return replica_result.to_object_ref_gen()

@DeveloperAPI
def _to_object_ref_gen_sync(
Expand All @@ -612,12 +609,11 @@ def _to_object_ref_gen_sync(
if not _allow_running_in_asyncio_loop and is_running_in_asyncio_loop():
raise RuntimeError(
"Sync methods should not be called from within an `asyncio` event "
"loop. Use `await response` or `await response._to_object_ref()` "
"instead."
"loop. Use `await response._to_object_ref()` instead."
)

replica_result = self._fetch_future_result_sync(_timeout_s)
return replica_result.obj_ref_gen
return replica_result.to_object_ref_gen()


@PublicAPI(stability="beta")
Expand Down
12 changes: 7 additions & 5 deletions python/ray/serve/tests/test_actor_replica_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,12 @@ async def test_send_request(setup_fake_replica, is_streaming: bool):
)
replica_result = replica.send_request(pr)
if is_streaming:
assert isinstance(replica_result.obj_ref_gen, ObjectRefGenerator)
assert isinstance(replica_result.to_object_ref_gen(), ObjectRefGenerator)
for i in range(5):
assert await replica_result.__anext__() == f"Hello-{i}"
else:
assert isinstance(replica_result.obj_ref, ObjectRef)
assert isinstance(replica_result.to_object_ref(), ObjectRef)
assert isinstance(await replica_result.to_object_ref_async(), ObjectRef)
assert await replica_result.get_async() == "Hello"


Expand Down Expand Up @@ -157,12 +158,13 @@ async def test_send_request_with_rejection(
if not accepted:
assert replica_result is None
elif is_streaming:
assert isinstance(replica_result.obj_ref_gen, ObjectRefGenerator)
assert isinstance(replica_result.to_object_ref_gen(), ObjectRefGenerator)
for i in range(5):
assert await replica_result.__anext__() == f"Hello-{i}"
else:
assert isinstance(replica_result.obj_ref_gen, ObjectRefGenerator)
assert await replica_result.__anext__() == "Hello"
assert isinstance(replica_result.to_object_ref(), ObjectRef)
assert isinstance(await replica_result.to_object_ref_async(), ObjectRef)
assert await replica_result.get_async() == "Hello"


@pytest.mark.asyncio
Expand Down
10 changes: 10 additions & 0 deletions python/ray/serve/tests/unit/test_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import pytest

import ray
from ray._private.test_utils import async_wait_for_condition
from ray._private.utils import get_or_create_event_loop
from ray.exceptions import ActorDiedError, ActorUnavailableError
Expand Down Expand Up @@ -61,6 +62,15 @@ def add_done_callback(self, callback: Callable):
def cancel(self):
raise NotImplementedError

def to_object_ref(self, timeout_s: Optional[float]) -> ray.ObjectRef:
raise NotImplementedError

async def to_object_ref_async(self) -> ray.ObjectRef:
raise NotImplementedError

def to_object_ref_gen(self) -> ray.ObjectRefGenerator:
raise NotImplementedError


class FakeReplica(ReplicaWrapper):
def __init__(
Expand Down

0 comments on commit 1eeb896

Please sign in to comment.