Skip to content

Commit

Permalink
fix (#1100): FastAPI 0.106 compatibility (#1102)
Browse files Browse the repository at this point in the history
* fix (#1100): FastAPI 0.106 compatibility

* lint: fix anyio types

* lint: fix mypy

* lint: fix mypy

* fix: FastAPI 106 ge

* chore: update mypy

* lint: ignore anyio mypy
  • Loading branch information
Lancetnik authored Dec 27, 2023
1 parent 60ac362 commit a5caa68
Show file tree
Hide file tree
Showing 14 changed files with 170 additions and 61 deletions.
4 changes: 1 addition & 3 deletions docs/docs/en/nats/examples/direct.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ a `subject` sends messages to all consumers subscribed to it.

## Scaling

If one `subject` is being listened to by several consumers with the same `queue group`, the message will go to a random consumer each time.

Thus, *NATS* can independently balance the load on queue consumers. You can increase the processing speed of the message flow from the queue by simply launching additional instances of the consumer service. You don't need to make changes to the current infrastructure configuration: *NATS* will take care of how to distribute messages between your services.
{! includes/en/nats/scaling.md !}

## Example

Expand Down
4 changes: 1 addition & 3 deletions docs/docs/en/nats/examples/pattern.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ search:

## Scaling

If one `subject` is being listened to by several consumers with the same `queue group`, the message will go to a random consumer each time.

Thus, *NATS* can independently balance the load on queue consumers. You can increase the processing speed of the message flow from the queue by simply launching additional instances of the consumer service. You don't need to make changes to the current infrastructure configuration: *NATS* will take care of how to distribute messages between your services.
{! includes/en/nats/scaling.md !}

## Example

Expand Down
8 changes: 8 additions & 0 deletions docs/includes/en/nats/scaling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
If one `subject` is being listened to by several consumers with the same `queue group`, the message will go to a random consumer each time.

Thus, *NATS* can independently balance the load on queue consumers. You can increase the processing speed of the message flow from the queue by simply launching additional instances of the consumer service. You don't need to make changes to the current infrastructure configuration: *NATS* will take care of how to distribute messages between your services.

!!! tip
By defaul, all subscribers are consuming messages from subject in blocking mode. You can't process multiple messages from the same subject in the same time. So, you have some kind of block per subject.

But, all `NatsBroker` subscribers has `max_workers` argument allows you to consume messages in a per-subscriber pool. So, if you have subscriber like `#!python @broker.subscriber(..., max_workers=10)`, it means that you can process up to **10** by it in the same time.
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Simple and fast framework to create message brokers based microservices."""
__version__ = "0.3.10"
__version__ = "0.3.11"


INSTALL_YAML = """
Expand Down
1 change: 1 addition & 0 deletions faststream/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def json_dumps(*a: Any, **kw: Any) -> bytes:

major, minor, *_ = map(int, FASTAPI_VERSION.split("."))
FASTAPI_V2 = major > 0 or minor > 100
FASTAPI_V106 = major > 0 or minor >= 106

if FASTAPI_V2:
from fastapi._compat import _normalize_errors
Expand Down
17 changes: 8 additions & 9 deletions faststream/broker/fastapi/route.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from starlette.requests import Request
from starlette.routing import BaseRoute

from faststream._compat import raise_fastapi_validation_error
from faststream._compat import FASTAPI_V106, raise_fastapi_validation_error
from faststream.broker.core.asyncronous import BrokerAsyncUsecase
from faststream.broker.message import StreamMessage as NativeMessage
from faststream.broker.schemas import NameRequired
Expand All @@ -44,7 +44,6 @@ class StreamRoute(BaseRoute, Generic[MsgType, P_HandlerParams, T_HandlerReturn])
path : path of the route
broker : BrokerAsyncUsecase object representing the broker for the route
dependant : Dependable object representing the dependencies for the route
"""

handler: HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]
Expand Down Expand Up @@ -75,7 +74,6 @@ def __init__(
Returns:
None.
"""
self.path = path or ""
self.broker = broker
Expand Down Expand Up @@ -136,7 +134,6 @@ class StreamMessage(Request):
Methods:
__init__ : initializes the StreamMessage object
get_session : returns a callable function that handles the session of the message
"""

scope: AnyDict
Expand Down Expand Up @@ -194,7 +191,6 @@ def get_session(
Note:
This function is used to create a session for handling requests. It takes a dependant object, which represents the session, and a dependency overrides provider, which allows for overriding dependencies. It returns a callable that takes a native message and returns an awaitable sendable message. The session is created based on the dependant object and the message passed to the callable. The session is then used to call the function obtained from the dependant object, and the result is returned.
"""
assert dependant.call # nosec B101

Expand Down Expand Up @@ -257,7 +253,7 @@ async def app(message: NativeMessage[Any]) -> SendableMessage:
def get_app(
dependant: Dependant,
dependency_overrides_provider: Optional[Any] = None,
) -> Callable[[StreamMessage], Coroutine[Any, Any, SendableMessage]]:
) -> Callable[[StreamMessage], Coroutine[Any, Any, SendableMessage],]:
"""Creates a FastAPI application.
Args:
Expand All @@ -269,7 +265,6 @@ def get_app(
Raises:
AssertionError: If the code reaches an unreachable state.
"""

async def app(request: StreamMessage) -> SendableMessage:
Expand All @@ -283,16 +278,20 @@ async def app(request: StreamMessage) -> SendableMessage:
Raises:
AssertionError: If the code reaches an unreachable point.
"""
async with AsyncExitStack() as stack:
request.scope["fastapi_astack"] = stack
if FASTAPI_V106:
kwargs = {"async_exit_stack": stack}
else:
request.scope["fastapi_astack"] = stack
kwargs = {}

solved_result = await solve_dependencies(
request=request,
body=request._body,
dependant=dependant,
dependency_overrides_provider=dependency_overrides_provider,
**kwargs, # type: ignore[arg-type]
)

values, errors, _, _2, _3 = solved_result
Expand Down
11 changes: 9 additions & 2 deletions faststream/broker/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from contextlib import AsyncExitStack, suppress
from inspect import unwrap
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Expand Down Expand Up @@ -44,6 +45,9 @@
from faststream.utils.context.repository import context
from faststream.utils.functions import to_async

if TYPE_CHECKING:
from contextvars import Token


class BaseHandler(AsyncAPIOperation, Generic[MsgType]):
"""A base handler class for asynchronous API operations.
Expand Down Expand Up @@ -272,6 +276,7 @@ async def consume(self, msg: MsgType) -> SendableMessage: # type: ignore[overri
if not self.running:
return result_msg

log_context_tag: Optional["Token[Any]"] = None
async with AsyncExitStack() as stack:
stack.enter_context(self.lock)

Expand All @@ -295,7 +300,8 @@ async def consume(self, msg: MsgType) -> SendableMessage: # type: ignore[overri

if not logged: # pragma: no branch
log_context_tag = context.set_local(
"log_context", self.log_context_builder(message)
"log_context",
self.log_context_builder(message),
)

message.decoded_body = await decoder(message)
Expand Down Expand Up @@ -363,7 +369,8 @@ async def consume(self, msg: MsgType) -> SendableMessage: # type: ignore[overri
not self.running or processed
), "You have to consume message" # nosec B101

context.reset_local("log_context", log_context_tag)
if log_context_tag is not None:
context.reset_local("log_context", log_context_tag)

return result_msg

Expand Down
2 changes: 2 additions & 0 deletions faststream/nats/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ def subscriber( # type: ignore[override]
middlewares: Optional[Sequence[Callable[[Msg], BaseMiddleware]]] = None,
filter: Filter[NatsMessage] = default_filter,
no_ack: bool = False,
max_workers: int = 1,
# AsyncAPI information
title: Optional[str] = None,
description: Optional[str] = None,
Expand Down Expand Up @@ -394,6 +395,7 @@ def subscriber( # type: ignore[override]
description=description,
include_in_schema=include_in_schema,
graceful_timeout=self.graceful_timeout,
max_workers=max_workers,
log_context_builder=partial(
self._get_log_context,
stream=stream.name if stream else "",
Expand Down
3 changes: 2 additions & 1 deletion faststream/nats/broker.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ class NatsBroker(
pending_bytes_limit: Optional[int] = None,
# Core arguments
max_msgs: int = 0,
ack_first: bool = False,
# JS arguments
ack_first: bool = False,
stream: Union[str, JStream, None] = None,
durable: Optional[str] = None,
config: Optional[api.ConsumerConfig] = None,
Expand All @@ -254,6 +254,7 @@ class NatsBroker(
filter: Filter[NatsMessage] = default_filter,
retry: bool = False,
no_ack: bool = False,
max_workers: int = 1,
# AsyncAPI information
title: Optional[str] = None,
description: Optional[str] = None,
Expand Down
Loading

0 comments on commit a5caa68

Please sign in to comment.