diff --git a/docs/docs/en/getting-started/dependencies/index.md b/docs/docs/en/getting-started/dependencies/index.md index c707b73ad7..75e55c4196 100644 --- a/docs/docs/en/getting-started/dependencies/index.md +++ b/docs/docs/en/getting-started/dependencies/index.md @@ -19,6 +19,7 @@ By default, it applies to all event handlers, unless you disabled the same optio !!! warning Setting the `apply_types=False` flag not only disables type casting but also `Depends` and `Context`. + If you want to disable only type casting, use `validate=False` instead. This flag can be useful if you are using **FastStream** within another framework and you need to use its native dependency system. diff --git a/docs/docs/en/kafka/ack.md b/docs/docs/en/kafka/ack.md index 6a58fbee5f..a5bb5e0662 100644 --- a/docs/docs/en/kafka/ack.md +++ b/docs/docs/en/kafka/ack.md @@ -49,3 +49,5 @@ If you wish to interrupt the processing of a message at any call stack level and ``` This way, **FastStream** interrupts the current message processing and acknowledges it immediately. Similarly, you can raise `NackMessage` as well to prevent the message from being committed. + +{!> includes/en/no_ack.md !} diff --git a/docs/docs/en/nats/jetstream/ack.md b/docs/docs/en/nats/jetstream/ack.md index 9e0688a0aa..836ddbf9ac 100644 --- a/docs/docs/en/nats/jetstream/ack.md +++ b/docs/docs/en/nats/jetstream/ack.md @@ -56,3 +56,5 @@ If you want to interrupt message processing at any call stack, you can raise `fa ``` This way, **FastStream** interrupts the current message proccessing and acknowledges it immediately. Also, you can raise `NackMessage` and `RejectMessage` too. + +{!> includes/en/no_ack.md !} diff --git a/docs/docs/en/rabbit/ack.md b/docs/docs/en/rabbit/ack.md index 5fe3af419c..dd49612916 100644 --- a/docs/docs/en/rabbit/ack.md +++ b/docs/docs/en/rabbit/ack.md @@ -68,3 +68,5 @@ If you want to interrupt message processing at any call stack, you can raise `fa ``` This way, **FastStream** interrupts the current message proccessing and acknowledges it immediately. Also, you can raise `NackMessage` and `RejectMessage` too. + +{!> includes/en/no_ack.md !} diff --git a/docs/docs/en/redis/streams/ack.md b/docs/docs/en/redis/streams/ack.md index 2d5142f651..0d5ea7fc5e 100644 --- a/docs/docs/en/redis/streams/ack.md +++ b/docs/docs/en/redis/streams/ack.md @@ -36,3 +36,5 @@ If the need arises to instantly interrupt message processing at any point in the ``` By raising `AckMessage`, **FastStream** will halt the current message processing routine and immediately acknowledge it. Analogously, raising `NackMessage` would prevent the message from being acknowledged and could lead to its subsequent reprocessing by the same or a different consumer. + +{!> includes/en/no_ack.md !} diff --git a/docs/includes/en/no_ack.md b/docs/includes/en/no_ack.md new file mode 100644 index 0000000000..aa32de2e19 --- /dev/null +++ b/docs/includes/en/no_ack.md @@ -0,0 +1,3 @@ +!!! tip + If you want to disable **FastStream** Acknowledgement logic at all, you can use + `#!python @broker.subscriber(..., no_ack=True)` option. This way you should always process a message (ack/nack/terminate/etc) by yourself. \ No newline at end of file diff --git a/faststream/broker/core/abc.py b/faststream/broker/core/abc.py index cb1bfc8d8d..eb239609ca 100644 --- a/faststream/broker/core/abc.py +++ b/faststream/broker/core/abc.py @@ -2,10 +2,12 @@ import os import warnings from abc import ABC, abstractmethod +from functools import partial from itertools import chain from types import TracebackType from typing import ( Any, + AsyncContextManager, Awaitable, Callable, Generic, @@ -31,7 +33,7 @@ from faststream.broker.message import StreamMessage from faststream.broker.middlewares import BaseMiddleware, CriticalLogMiddleware from faststream.broker.publisher import BasePublisher -from faststream.broker.push_back_watcher import BaseWatcher +from faststream.broker.push_back_watcher import WatcherContext from faststream.broker.router import BrokerRouter from faststream.broker.types import ( ConnectionType, @@ -53,7 +55,11 @@ from faststream.security import BaseSecurity from faststream.types import AnyDict, F_Return, F_Spec from faststream.utils import apply_types, context -from faststream.utils.functions import get_function_positional_arguments, to_async +from faststream.utils.functions import ( + fake_context, + get_function_positional_arguments, + to_async, +) class BrokerUsecase( @@ -119,6 +125,7 @@ def __init__( asyncapi_url: Union[str, List[str], None] = None, # broker kwargs apply_types: bool = True, + validate: bool = True, logger: Optional[logging.Logger] = access_logger, log_level: int = logging.INFO, log_fmt: Optional[str] = "%(asctime)s %(levelname)s - %(message)s", @@ -139,6 +146,7 @@ def __init__( description: A description of the broker. tags: Tags associated with the broker. apply_types: Whether to apply types to messages. + validate: Whether to cast types using Pydantic validation. logger: The logger to use. log_level: The log level to use. log_fmt: The log format to use. @@ -159,6 +167,7 @@ def __init__( self._connection = None self._is_apply_types = apply_types + self._is_validate = validate self.handlers = {} self._publishers = {} empty_middleware: Sequence[Callable[[MsgType], BaseMiddleware]] = () @@ -253,8 +262,10 @@ def _wrap_handler( *, retry: Union[bool, int] = False, extra_dependencies: Sequence[Depends] = (), + no_ack: bool = False, _raw: bool = False, _get_dependant: Optional[Any] = None, + _process_kwargs: Optional[AnyDict] = None, ) -> Tuple[ HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn], CallModel[Any, Any], @@ -265,6 +276,7 @@ def _wrap_handler( func: The handler function to wrap. retry: Whether to retry the handler function if it fails. Can be a boolean or an integer specifying the number of retries. extra_dependencies: Additional dependencies for the handler function. + no_ack: Whether not to ack/nack/reject messages. _raw: Whether to use the raw handler function. _get_dependant: The dependant function to use. **broker_log_context_kwargs: Additional keyword arguments for the broker log context. @@ -280,7 +292,7 @@ def _wrap_handler( """ build_dep = cast( Callable[[Callable[F_Spec, F_Return]], CallModel[F_Spec, F_Return]], - _get_dependant or build_call_model, + _get_dependant or partial(build_call_model, cast=self._is_validate), ) if isinstance(func, HandlerCallWrapper): @@ -304,8 +316,8 @@ def _wrap_handler( if getattr(dependant, "flat_params", None) is None: # handle FastAPI Dependant dependant = _patch_fastapi_dependant(dependant) - if self._is_apply_types is True and not _raw: - f = apply_types(None)(f, dependant) # type: ignore[arg-type] + if self._is_apply_types and not _raw: + f = apply_types(None, cast=self._is_validate)(f, dependant) # type: ignore[arg-type] decode_f = self._wrap_decode_message( func=f, @@ -319,10 +331,16 @@ def _wrap_handler( process_f = self._process_message( func=decode_f, - watcher=get_watcher(self.logger, retry), + watcher=( + partial(WatcherContext, watcher=get_watcher(self.logger, retry)) # type: ignore[arg-type] + if not no_ack + else fake_context + ), + **(_process_kwargs or {}), ) - process_f = set_message_context(process_f) + if self._is_apply_types: + process_f = set_message_context(process_f) handler_call.set_wrapped(process_f) return handler_call, dependant @@ -386,13 +404,15 @@ def _abc__close( def _process_message( self, func: Callable[[StreamMessage[MsgType]], Awaitable[T_HandlerReturn]], - watcher: BaseWatcher, - ) -> Callable[[StreamMessage[MsgType]], Awaitable[WrappedReturn[T_HandlerReturn]],]: + watcher: Callable[..., AsyncContextManager[None]], + **kwargs: Any, + ) -> Callable[[StreamMessage[MsgType]], Awaitable[WrappedReturn[T_HandlerReturn]]]: """Processes a message using a given function and watcher. Args: func: A callable that takes a StreamMessage of type MsgType and returns an Awaitable of type T_HandlerReturn. watcher: An instance of BaseWatcher. + disable_watcher: Whether to use watcher context. Returns: A callable that takes a StreamMessage of type MsgType and returns an Awaitable of type WrappedReturn[T_HandlerReturn]. diff --git a/faststream/broker/core/asyncronous.py b/faststream/broker/core/asyncronous.py index 66f42611b5..fce9cf33aa 100644 --- a/faststream/broker/core/asyncronous.py +++ b/faststream/broker/core/asyncronous.py @@ -4,6 +4,7 @@ from types import TracebackType from typing import ( Any, + AsyncContextManager, Awaitable, Callable, Mapping, @@ -24,7 +25,6 @@ from faststream.broker.handler import AsyncHandler from faststream.broker.message import StreamMessage from faststream.broker.middlewares import BaseMiddleware -from faststream.broker.push_back_watcher import BaseWatcher from faststream.broker.types import ( AsyncCustomDecoder, AsyncCustomParser, @@ -39,7 +39,7 @@ ) from faststream.broker.wrapper import HandlerCallWrapper from faststream.log import access_logger -from faststream.types import SendableMessage +from faststream.types import AnyDict, SendableMessage from faststream.utils.functions import to_async @@ -167,19 +167,21 @@ async def close( def _process_message( self, func: Callable[[StreamMessage[MsgType]], Awaitable[T_HandlerReturn]], - watcher: BaseWatcher, + watcher: Callable[..., AsyncContextManager[None]], + **kwargs: Any, ) -> Callable[[StreamMessage[MsgType]], Awaitable[WrappedReturn[T_HandlerReturn]],]: """Process a message. Args: - func: A callable function that takes a StreamMessage and returns an Awaitable - watcher: An instance of BaseWatcher + func: A callable function that takes a StreamMessage and returns an Awaitable. + watcher: An instance of BaseWatcher. + disable_watcher: Whether to use watcher context. Returns: - A callable function that takes a StreamMessage and returns an Awaitable + A callable function that takes a StreamMessage and returns an Awaitable. Raises: - NotImplementedError: If the method is not implemented + NotImplementedError: If the method is not implemented. !!! note The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai) @@ -270,6 +272,7 @@ def __init__( self, *args: Any, apply_types: bool = True, + validate: bool = True, logger: Optional[logging.Logger] = access_logger, log_level: int = logging.INFO, log_fmt: Optional[str] = "%(asctime)s %(levelname)s - %(message)s", @@ -284,6 +287,7 @@ def __init__( Args: *args: Variable length arguments apply_types: Whether to apply types or not + validate: Whether to cast types using Pydantic validation. logger: Logger object for logging log_level: Log level for logging log_fmt: Log format for logging @@ -299,6 +303,7 @@ def __init__( super().__init__( *args, apply_types=apply_types, + validate=validate, logger=logger, log_level=log_level, log_fmt=log_fmt, @@ -425,8 +430,10 @@ def _wrap_handler( *, retry: Union[bool, int] = False, extra_dependencies: Sequence[Depends] = (), + no_ack: bool = False, _raw: bool = False, _get_dependant: Optional[Any] = None, + _process_kwargs: Optional[AnyDict] = None, ) -> Tuple[ HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn], CallModel[P_HandlerParams, T_HandlerReturn], @@ -437,6 +444,7 @@ def _wrap_handler( func: The handler function to wrap. retry: Whether to retry the handler function if it fails. Can be a boolean or an integer specifying the number of retries. extra_dependencies: Additional dependencies to inject into the handler function. + no_ack: Whether not to ack/nack/reject messages. _raw: Whether to return the raw response from the handler function. _get_dependant: An optional object to use as the dependant for the handler function. **broker_log_context_kwargs: Additional keyword arguments to pass to the broker log context. @@ -451,6 +459,7 @@ def _wrap_handler( func, retry=retry, extra_dependencies=extra_dependencies, + no_ack=no_ack, _raw=_raw, _get_dependant=_get_dependant, ) diff --git a/faststream/broker/push_back_watcher.py b/faststream/broker/push_back_watcher.py index 89ed2169a6..b01fd927ad 100644 --- a/faststream/broker/push_back_watcher.py +++ b/faststream/broker/push_back_watcher.py @@ -311,8 +311,8 @@ class WatcherContext: def __init__( self, - watcher: BaseWatcher, message: Union[SyncStreamMessage[MsgType], StreamMessage[MsgType]], + watcher: BaseWatcher, **extra_ack_args: Any, ): """Initialize a new instance of the class. diff --git a/faststream/kafka/broker.py b/faststream/kafka/broker.py index 77aba1a38e..f246506bf4 100644 --- a/faststream/kafka/broker.py +++ b/faststream/kafka/broker.py @@ -2,6 +2,7 @@ from types import TracebackType from typing import ( Any, + AsyncContextManager, Awaitable, Callable, Dict, @@ -25,7 +26,6 @@ from faststream.broker.core.asyncronous import BrokerAsyncUsecase, default_filter from faststream.broker.message import StreamMessage from faststream.broker.middlewares import BaseMiddleware -from faststream.broker.push_back_watcher import BaseWatcher, WatcherContext from faststream.broker.types import ( AsyncPublisherProtocol, CustomDecoder, @@ -206,7 +206,8 @@ async def start(self) -> None: def _process_message( self, func: Callable[[KafkaMessage], Awaitable[T_HandlerReturn]], - watcher: BaseWatcher, + watcher: Callable[..., AsyncContextManager[None]], + **kwargs: Any, ) -> Callable[[KafkaMessage], Awaitable[WrappedReturn[T_HandlerReturn]],]: """ Wrap a message processing function with a watcher and publisher. @@ -214,6 +215,7 @@ def _process_message( Args: func (Callable[[KafkaMessage], Awaitable[T_HandlerReturn]]): The message processing function. watcher (BaseWatcher): The message watcher. + disable_watcher: Whether to use watcher context. Returns: Callable[[KafkaMessage], Awaitable[WrappedReturn[T_HandlerReturn]]]: The wrapped message processing function. @@ -237,7 +239,7 @@ async def process_wrapper( The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai) """ - async with WatcherContext(watcher, message): + async with watcher(message): r = await func(message) pub_response: Optional[AsyncPublisherProtocol] @@ -309,6 +311,7 @@ def subscriber( # type: ignore[override] batch: bool = False, max_records: Optional[int] = None, batch_timeout_ms: int = 200, + no_ack: bool = False, # AsyncAPI information title: Optional[str] = None, description: Optional[str] = None, @@ -358,6 +361,7 @@ def subscriber( # type: ignore[override] batch (bool): Whether to process messages in batches. max_records (Optional[int]): Maximum number of records to process in each batch. batch_timeout_ms (int): Batch timeout in milliseconds. + no_ack (bool): Whether not to ack/nack/reject messages. title (Optional[str]): AsyncAPI title. description (Optional[str]): AsyncAPI description. **original_kwargs: Additional keyword arguments. @@ -441,6 +445,7 @@ def consumer_wrapper( handler_call, dependant = self._wrap_handler( func=func, extra_dependencies=dependencies, + no_ack=no_ack, **original_kwargs, ) diff --git a/faststream/kafka/broker.pyi b/faststream/kafka/broker.pyi index 4e21c418c8..33f8f27525 100644 --- a/faststream/kafka/broker.pyi +++ b/faststream/kafka/broker.pyi @@ -3,6 +3,7 @@ from asyncio import AbstractEventLoop from types import TracebackType from typing import ( Any, + AsyncContextManager, Awaitable, Callable, Dict, @@ -31,7 +32,6 @@ from faststream.asyncapi import schema as asyncapi from faststream.broker.core.asyncronous import BrokerAsyncUsecase, default_filter from faststream.broker.message import StreamMessage from faststream.broker.middlewares import BaseMiddleware -from faststream.broker.push_back_watcher import BaseWatcher from faststream.broker.security import BaseSecurity from faststream.broker.types import ( CustomDecoder, @@ -91,6 +91,7 @@ class KafkaBroker( loop: Optional[AbstractEventLoop] = None, # broker args apply_types: bool = True, + validate: bool = True, dependencies: Sequence[Depends] = (), decoder: Optional[CustomDecoder[KafkaMessage]] = None, parser: Optional[CustomParser[aiokafka.ConsumerRecord, KafkaMessage]] = None, @@ -187,7 +188,8 @@ class KafkaBroker( func: Callable[ [StreamMessage[aiokafka.ConsumerRecord]], Awaitable[T_HandlerReturn] ], - watcher: BaseWatcher, + watcher: Callable[..., AsyncContextManager[None]], + **kwargs: Any, ) -> Callable[ [StreamMessage[aiokafka.ConsumerRecord]], Awaitable[WrappedReturn[T_HandlerReturn]], @@ -243,6 +245,7 @@ class KafkaBroker( max_records: Optional[int] = None, batch_timeout_ms: int = 200, retry: Union[bool, int] = False, + no_ack: bool = False, # AsyncAPI information title: Optional[str] = None, description: Optional[str] = None, @@ -306,6 +309,7 @@ class KafkaBroker( max_records: Optional[int] = None, batch_timeout_ms: int = 200, retry: Union[bool, int] = False, + no_ack: bool = False, # AsyncAPI information title: Optional[str] = None, description: Optional[str] = None, diff --git a/faststream/kafka/fastapi.pyi b/faststream/kafka/fastapi.pyi index e5707ea408..c9a58ba476 100644 --- a/faststream/kafka/fastapi.pyi +++ b/faststream/kafka/fastapi.pyi @@ -112,6 +112,7 @@ class KafkaRouter(StreamRouter[ConsumerRecord]): ), # broker kwargs apply_types: bool = True, + validate: bool = True, decoder: Optional[CustomDecoder[KafkaMessage]] = None, parser: Optional[CustomParser[aiokafka.ConsumerRecord, KafkaMessage]] = None, middlewares: Optional[ @@ -191,6 +192,7 @@ class KafkaRouter(StreamRouter[ConsumerRecord]): max_records: Optional[int] = None, batch_timeout_ms: int = 200, retry: Union[bool, int] = False, + no_ack: bool = False, # AsyncAPI information title: Optional[str] = None, description: Optional[str] = None, @@ -252,6 +254,7 @@ class KafkaRouter(StreamRouter[ConsumerRecord]): max_records: Optional[int] = None, batch_timeout_ms: int = 200, retry: Union[bool, int] = False, + no_ack: bool = False, # AsyncAPI information title: Optional[str] = None, description: Optional[str] = None, diff --git a/faststream/kafka/router.pyi b/faststream/kafka/router.pyi index 10500ccf4c..0af0cb4677 100644 --- a/faststream/kafka/router.pyi +++ b/faststream/kafka/router.pyi @@ -114,6 +114,7 @@ class KafkaRouter(BrokerRouter[str, aiokafka.ConsumerRecord]): max_records: Optional[int] = None, batch_timeout_ms: int = 200, retry: Union[bool, int] = False, + no_ack: bool = False, # AsyncAPI information title: Optional[str] = None, description: Optional[str] = None, diff --git a/faststream/nats/broker.py b/faststream/nats/broker.py index a78acb15c6..ea485303a8 100644 --- a/faststream/nats/broker.py +++ b/faststream/nats/broker.py @@ -3,6 +3,7 @@ from types import TracebackType from typing import ( Any, + AsyncContextManager, Awaitable, Callable, Dict, @@ -32,7 +33,6 @@ from faststream.broker.core.asyncronous import BrokerAsyncUsecase, default_filter from faststream.broker.message import StreamMessage from faststream.broker.middlewares import BaseMiddleware -from faststream.broker.push_back_watcher import BaseWatcher, WatcherContext from faststream.broker.types import ( AsyncPublisherProtocol, CustomDecoder, @@ -204,17 +204,15 @@ async def start(self) -> None: def _process_message( self, - func: Callable[ - [StreamMessage[Msg]], - Awaitable[T_HandlerReturn], - ], - watcher: BaseWatcher, + func: Callable[[StreamMessage[Msg]], Awaitable[T_HandlerReturn]], + watcher: Callable[..., AsyncContextManager[None]], + **kwargs: Any, ) -> Callable[[StreamMessage[Msg]], Awaitable[WrappedReturn[T_HandlerReturn]],]: @wraps(func) async def process_wrapper( message: StreamMessage[Msg], ) -> WrappedReturn[T_HandlerReturn]: - async with WatcherContext(watcher, message): + async with watcher(message): r = await func(message) pub_response: Optional[AsyncPublisherProtocol] @@ -290,6 +288,7 @@ def subscriber( # type: ignore[override] decoder: Optional[CustomDecoder[NatsMessage]] = None, middlewares: Optional[Sequence[Callable[[Msg], BaseMiddleware]]] = None, filter: Filter[NatsMessage] = default_filter, + no_ack: bool = False, # AsyncAPI information title: Optional[str] = None, description: Optional[str] = None, @@ -387,6 +386,7 @@ def consumer_wrapper( handler_call, dependant = self._wrap_handler( func, extra_dependencies=dependencies, + no_ack=no_ack, **original_kwargs, ) diff --git a/faststream/nats/broker.pyi b/faststream/nats/broker.pyi index 1c8349b3d6..80cd78963f 100644 --- a/faststream/nats/broker.pyi +++ b/faststream/nats/broker.pyi @@ -1,7 +1,18 @@ import logging import ssl from types import TracebackType -from typing import Any, Awaitable, Callable, Dict, List, Optional, Sequence, Type, Union +from typing import ( + Any, + AsyncContextManager, + Awaitable, + Callable, + Dict, + List, + Optional, + Sequence, + Type, + Union, +) from fast_depends.dependencies import Depends from nats.aio.client import ( @@ -23,16 +34,13 @@ from nats.aio.client import ( ) from nats.aio.msg import Msg from nats.js import api -from nats.js.client import ( - JetStreamContext, -) +from nats.js.client import JetStreamContext from faststream._compat import override from faststream.asyncapi import schema as asyncapi from faststream.broker.core.asyncronous import BrokerAsyncUsecase, default_filter from faststream.broker.message import StreamMessage from faststream.broker.middlewares import BaseMiddleware -from faststream.broker.push_back_watcher import BaseWatcher from faststream.broker.types import ( CustomDecoder, CustomParser, @@ -100,6 +108,7 @@ class NatsBroker( flush_timeout: Optional[float] = None, # broker args apply_types: bool = True, + validate: bool = True, dependencies: Sequence[Depends] = (), decoder: Optional[CustomDecoder[NatsMessage]] = None, parser: Optional[CustomParser[Msg, NatsMessage]] = None, @@ -202,11 +211,9 @@ class NatsBroker( async def start(self) -> None: ... def _process_message( self, - func: Callable[ - [StreamMessage[Msg]], - Awaitable[T_HandlerReturn], - ], - watcher: BaseWatcher, + func: Callable[[StreamMessage[Msg]], Awaitable[T_HandlerReturn]], + watcher: Callable[..., AsyncContextManager[None]], + **kwargs: Any, ) -> Callable[[StreamMessage[Msg]], Awaitable[WrappedReturn[T_HandlerReturn]],]: ... def _log_connection_broken( self, @@ -245,6 +252,7 @@ class NatsBroker( middlewares: Optional[Sequence[Callable[[Msg], BaseMiddleware]]] = None, filter: Filter[NatsMessage] = default_filter, retry: bool = False, + no_ack: bool = False, # AsyncAPI information title: Optional[str] = None, description: Optional[str] = None, diff --git a/faststream/nats/fastapi.pyi b/faststream/nats/fastapi.pyi index f3bcdc70a8..8ff6e8fc94 100644 --- a/faststream/nats/fastapi.pyi +++ b/faststream/nats/fastapi.pyi @@ -207,6 +207,7 @@ class NatsRouter(StreamRouter[Msg]): middlewares: Optional[Sequence[Callable[[Msg], BaseMiddleware]]] = None, filter: Filter[NatsMessage] = default_filter, retry: bool = False, + no_ack: bool = False, # AsyncAPI information title: Optional[str] = None, description: Optional[str] = None, diff --git a/faststream/nats/router.pyi b/faststream/nats/router.pyi index 2cf9339a02..0561f787ba 100644 --- a/faststream/nats/router.pyi +++ b/faststream/nats/router.pyi @@ -86,6 +86,7 @@ class NatsRouter(BaseRouter): middlewares: Optional[Sequence[Callable[[Msg], BaseMiddleware]]] = None, filter: Filter[NatsMessage] = default_filter, retry: bool = False, + no_ack: bool = False, # AsyncAPI information title: Optional[str] = None, description: Optional[str] = None, diff --git a/faststream/rabbit/broker.py b/faststream/rabbit/broker.py index 01557c5e3f..1a2587412a 100644 --- a/faststream/rabbit/broker.py +++ b/faststream/rabbit/broker.py @@ -1,7 +1,18 @@ import warnings from functools import partial, wraps from types import TracebackType -from typing import Any, Awaitable, Callable, Dict, Optional, Sequence, Type, Union, cast +from typing import ( + Any, + AsyncContextManager, + Awaitable, + Callable, + Dict, + Optional, + Sequence, + Type, + Union, + cast, +) import aio_pika import aiormq @@ -14,7 +25,6 @@ from faststream.broker.core.asyncronous import BrokerAsyncUsecase, default_filter from faststream.broker.message import StreamMessage from faststream.broker.middlewares import BaseMiddleware -from faststream.broker.push_back_watcher import BaseWatcher, WatcherContext from faststream.broker.types import ( AsyncPublisherProtocol, CustomDecoder, @@ -300,6 +310,7 @@ def subscriber( # type: ignore[override] Sequence[Callable[[aio_pika.IncomingMessage], BaseMiddleware]] ] = None, filter: Filter[RabbitMessage] = default_filter, + no_ack: bool = False, # AsyncAPI information title: Optional[str] = None, description: Optional[str] = None, @@ -316,6 +327,7 @@ def subscriber( # type: ignore[override] queue (Union[str, RabbitQueue]): The name of the RabbitMQ queue. exchange (Union[str, RabbitExchange, None], optional): The name of the RabbitMQ exchange. Defaults to None. consume_args (Optional[AnyDict], optional): Additional arguments for message consumption. + no_ack (bool): Whether not to ack/nack/reject messages. title (Optional[str]): Title for AsyncAPI docs. description (Optional[str]): Description for AsyncAPI docs. @@ -370,6 +382,7 @@ def consumer_wrapper( handler_call, dependant = self._wrap_handler( func, extra_dependencies=dependencies, + no_ack=no_ack, **original_kwargs, ) @@ -477,7 +490,8 @@ def _process_message( func: Callable[ [StreamMessage[aio_pika.IncomingMessage]], Awaitable[T_HandlerReturn] ], - watcher: BaseWatcher, + watcher: Callable[..., AsyncContextManager[None]], + **kwargs: Any, ) -> Callable[ [StreamMessage[aio_pika.IncomingMessage]], Awaitable[WrappedReturn[T_HandlerReturn]], @@ -488,6 +502,7 @@ def _process_message( Args: func (Callable): The handler function for processing the message. watcher (BaseWatcher): The message watcher for tracking message processing. + disable_watcher: Whether to use watcher context. Returns: Callable: A wrapper function for processing messages. @@ -511,7 +526,7 @@ async def process_wrapper( The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai) """ - async with WatcherContext(watcher, message): + async with watcher(message): r = await func(message) pub_response: Optional[AsyncPublisherProtocol] diff --git a/faststream/rabbit/broker.pyi b/faststream/rabbit/broker.pyi index 40eb571e88..6cadbaf453 100644 --- a/faststream/rabbit/broker.pyi +++ b/faststream/rabbit/broker.pyi @@ -1,7 +1,17 @@ import logging from ssl import SSLContext from types import TracebackType -from typing import Any, Awaitable, Callable, Dict, Optional, Sequence, Type, Union +from typing import ( + Any, + AsyncContextManager, + Awaitable, + Callable, + Dict, + Optional, + Sequence, + Type, + Union, +) import aio_pika import aiormq @@ -14,7 +24,6 @@ from faststream.asyncapi import schema as asyncapi from faststream.broker.core.asyncronous import BrokerAsyncUsecase, default_filter from faststream.broker.message import StreamMessage from faststream.broker.middlewares import BaseMiddleware -from faststream.broker.push_back_watcher import BaseWatcher from faststream.broker.types import ( CustomDecoder, CustomParser, @@ -65,6 +74,7 @@ class RabbitBroker( max_consumers: Optional[int] = None, # broker args apply_types: bool = True, + validate: bool = True, dependencies: Sequence[Depends] = (), decoder: Optional[CustomDecoder[RabbitMessage]] = None, parser: Optional[CustomParser[aio_pika.IncomingMessage, RabbitMessage]] = None, @@ -140,6 +150,7 @@ class RabbitBroker( Sequence[Callable[[aio_pika.IncomingMessage], BaseMiddleware]] ] = None, retry: Union[bool, int] = False, + no_ack: bool = False, # AsyncAPI information title: Optional[str] = None, description: Optional[str] = None, @@ -214,7 +225,8 @@ class RabbitBroker( func: Callable[ [StreamMessage[aio_pika.IncomingMessage]], Awaitable[T_HandlerReturn] ], - watcher: BaseWatcher, + watcher: Callable[..., AsyncContextManager[None]], + **kwargs: Any, ) -> Callable[ [StreamMessage[aio_pika.IncomingMessage]], Awaitable[WrappedReturn[T_HandlerReturn]], diff --git a/faststream/rabbit/fastapi.pyi b/faststream/rabbit/fastapi.pyi index 5d6068c0f3..34092e6a26 100644 --- a/faststream/rabbit/fastapi.pyi +++ b/faststream/rabbit/fastapi.pyi @@ -137,6 +137,7 @@ class RabbitRouter(StreamRouter[IncomingMessage]): Sequence[Callable[[aio_pika.IncomingMessage], BaseMiddleware]] ] = None, retry: Union[bool, int] = False, + no_ack: bool = False, # AsyncAPI information title: Optional[str] = None, description: Optional[str] = None, diff --git a/faststream/rabbit/router.pyi b/faststream/rabbit/router.pyi index c41512706d..6441d68152 100644 --- a/faststream/rabbit/router.pyi +++ b/faststream/rabbit/router.pyi @@ -66,6 +66,7 @@ class RabbitRouter(BrokerRouter[int, aio_pika.IncomingMessage]): ] ] = None, retry: Union[bool, int] = False, + no_ack: bool = False, # AsyncAPI information title: Optional[str] = None, description: Optional[str] = None, diff --git a/faststream/redis/broker.py b/faststream/redis/broker.py index a778125405..a42eef4e7f 100644 --- a/faststream/redis/broker.py +++ b/faststream/redis/broker.py @@ -2,6 +2,7 @@ from types import TracebackType from typing import ( Any, + AsyncContextManager, Awaitable, Callable, Dict, @@ -21,7 +22,6 @@ from faststream.broker.core.asyncronous import BrokerAsyncUsecase, default_filter from faststream.broker.message import StreamMessage from faststream.broker.middlewares import BaseMiddleware -from faststream.broker.push_back_watcher import BaseWatcher, WatcherContext from faststream.broker.types import ( AsyncPublisherProtocol, CustomDecoder, @@ -141,18 +141,15 @@ async def start(self) -> None: def _process_message( self, - func: Callable[ - [StreamMessage[Any]], - Awaitable[T_HandlerReturn], - ], - watcher: BaseWatcher, + func: Callable[[StreamMessage[Any]], Awaitable[T_HandlerReturn]], + watcher: Callable[..., AsyncContextManager[None]], + **kwargs: Any, ) -> Callable[[StreamMessage[Any]], Awaitable[WrappedReturn[T_HandlerReturn]],]: @wraps(func) async def process_wrapper( message: StreamMessage[Any], ) -> WrappedReturn[T_HandlerReturn]: - async with WatcherContext( - watcher, + async with watcher( message, redis=self._connection, ): diff --git a/faststream/redis/broker.pyi b/faststream/redis/broker.pyi index b0bcfcc14e..1d62be341a 100644 --- a/faststream/redis/broker.pyi +++ b/faststream/redis/broker.pyi @@ -2,6 +2,7 @@ import logging from types import TracebackType from typing import ( Any, + AsyncContextManager, Awaitable, Callable, Dict, @@ -21,7 +22,6 @@ from faststream.asyncapi import schema as asyncapi from faststream.broker.core.asyncronous import BrokerAsyncUsecase, default_filter from faststream.broker.message import StreamMessage from faststream.broker.middlewares import BaseMiddleware -from faststream.broker.push_back_watcher import BaseWatcher from faststream.broker.types import ( CustomDecoder, CustomParser, @@ -77,6 +77,15 @@ class RedisBroker( parser_class: Type[BaseParser] = DefaultParser, connection_class: Type[Connection] = Connection, encoder_class: Type[Encoder] = Encoder, + # broker args + apply_types: bool = True, + validate: bool = True, + dependencies: Sequence[Depends] = (), + parser: Optional[CustomParser[AnyRedisDict, RedisMessage]] = None, + decoder: Optional[CustomDecoder[RedisMessage]] = None, + middlewares: Optional[ + Sequence[Callable[[AnyRedisDict], BaseMiddleware]] + ] = None, # AsyncAPI args asyncapi_url: Optional[str] = None, protocol: Optional[str] = None, @@ -148,11 +157,9 @@ class RedisBroker( async def start(self) -> None: ... def _process_message( self, - func: Callable[ - [StreamMessage[Any]], - Awaitable[T_HandlerReturn], - ], - watcher: BaseWatcher, + func: Callable[[StreamMessage[Any]], Awaitable[T_HandlerReturn]], + watcher: Callable[..., AsyncContextManager[None]], + **kwargs: Any, ) -> Callable[[StreamMessage[Any]], Awaitable[WrappedReturn[T_HandlerReturn]],]: ... @override def subscriber( # type: ignore[override] @@ -169,6 +176,7 @@ class RedisBroker( Sequence[Callable[[AnyRedisDict], BaseMiddleware]] ] = None, filter: Filter[RedisMessage] = default_filter, + no_ack: bool = False, # AsyncAPI information title: Optional[str] = None, description: Optional[str] = None, diff --git a/faststream/redis/fastapi.pyi b/faststream/redis/fastapi.pyi index d1db3b967f..1f2e5ccafe 100644 --- a/faststream/redis/fastapi.pyi +++ b/faststream/redis/fastapi.pyi @@ -73,6 +73,12 @@ class RedisRouter(StreamRouter[AnyRedisDict]): parser_class: Type[BaseParser] = DefaultParser, connection_class: Type[Connection] = Connection, encoder_class: Type[Encoder] = Encoder, + # broker args + parser: Optional[CustomParser[AnyRedisDict, RedisMessage]] = None, + decoder: Optional[CustomDecoder[RedisMessage]] = None, + middlewares: Optional[ + Sequence[Callable[[AnyRedisDict], BaseMiddleware]] + ] = None, # AsyncAPI args asyncapi_url: Optional[str] = None, protocol: Optional[str] = None, @@ -127,6 +133,7 @@ class RedisRouter(StreamRouter[AnyRedisDict]): Sequence[Callable[[AnyRedisDict], BaseMiddleware]] ] = None, filter: Filter[RedisMessage] = default_filter, + no_ack: bool = False, # AsyncAPI information title: Optional[str] = None, description: Optional[str] = None, diff --git a/faststream/redis/router.pyi b/faststream/redis/router.pyi index 3cd71e3243..5af410ff91 100644 --- a/faststream/redis/router.pyi +++ b/faststream/redis/router.pyi @@ -68,6 +68,7 @@ class RedisRouter(BrokerRouter[int, AnyRedisDict]): Sequence[Callable[[AnyRedisDict], BaseMiddleware]] ] = None, filter: Filter[RedisMessage] = default_filter, + no_ack: bool = False, # AsyncAPI information title: Optional[str] = None, description: Optional[str] = None, diff --git a/tests/brokers/base/consume.py b/tests/brokers/base/consume.py index c41301b22f..074c0677da 100644 --- a/tests/brokers/base/consume.py +++ b/tests/brokers/base/consume.py @@ -2,7 +2,9 @@ from unittest.mock import MagicMock import pytest +from pydantic import BaseModel +from faststream import Context, Depends from faststream.broker.core.abc import BrokerUsecase from faststream.exceptions import StopConsume @@ -180,6 +182,39 @@ async def handler2(m): mock.handler.assert_called_once_with({"msg": "hello"}) mock.handler2.assert_called_once_with("hello") + async def test_consume_validate_false( + self, + queue: str, + consume_broker: BrokerUsecase, + event: asyncio.Event, + mock: MagicMock, + ): + consume_broker._is_apply_types = True + consume_broker._is_validate = False + + class Foo(BaseModel): + x: int + + def dependency() -> str: + return "100" + + @consume_broker.subscriber(queue) + async def handler(m: Foo, dep: int = Depends(dependency), broker=Context()): + mock(m, dep, broker) + event.set() + + await consume_broker.start() + await asyncio.wait( + ( + asyncio.create_task(consume_broker.publish({"x": 1}, queue)), + asyncio.create_task(event.wait()), + ), + timeout=3, + ) + + assert event.is_set() + mock.assert_called_once_with({"x": 1}, "100", consume_broker) + @pytest.mark.asyncio class BrokerRealConsumeTestcase(BrokerConsumeTestcase): diff --git a/tests/brokers/kafka/test_consume.py b/tests/brokers/kafka/test_consume.py index 1a06faa451..2ce3f2b972 100644 --- a/tests/brokers/kafka/test_consume.py +++ b/tests/brokers/kafka/test_consume.py @@ -171,3 +171,35 @@ async def handler(msg: KafkaMessage): assert not m.mock.called assert event.is_set() + + @pytest.mark.asyncio + @pytest.mark.slow + async def test_consume_no_ack( + self, + queue: str, + full_broker: KafkaBroker, + event: asyncio.Event, + ): + @full_broker.subscriber(queue, group_id="test", no_ack=True) + async def handler(msg: KafkaMessage): + event.set() + + await full_broker.start() + with patch.object( + AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit) + ) as m: + await asyncio.wait( + ( + asyncio.create_task( + full_broker.publish( + "hello", + queue, + ) + ), + asyncio.create_task(event.wait()), + ), + timeout=10, + ) + m.mock.assert_not_called() + + assert event.is_set() diff --git a/tests/brokers/nats/test_consume.py b/tests/brokers/nats/test_consume.py index 62584ca815..9fc8deb810 100644 --- a/tests/brokers/nats/test_consume.py +++ b/tests/brokers/nats/test_consume.py @@ -191,3 +191,29 @@ async def handler(msg: NatsMessage): m.mock.assert_called_once() assert event.is_set() + + @pytest.mark.asyncio + async def test_consume_no_ack( + self, queue: str, full_broker: NatsBroker, event: asyncio.Event + ): + @full_broker.subscriber(queue, no_ack=True) + async def handler(msg: NatsMessage): + event.set() + + await full_broker.start() + with patch.object(Msg, "ack", spy_decorator(Msg.ack)) as m: + await asyncio.wait( + ( + asyncio.create_task( + full_broker.publish( + "hello", + queue, + ) + ), + asyncio.create_task(event.wait()), + ), + timeout=3, + ) + m.mock.assert_not_called() + + assert event.is_set() diff --git a/tests/brokers/rabbit/test_consume.py b/tests/brokers/rabbit/test_consume.py index 7d0fe0d94d..3f4725099a 100644 --- a/tests/brokers/rabbit/test_consume.py +++ b/tests/brokers/rabbit/test_consume.py @@ -331,3 +331,32 @@ async def handler(msg: RabbitMessage): assert not m2.mock.called assert event.is_set() + + @pytest.mark.asyncio + async def test_consume_no_ack( + self, + queue: str, + exchange: RabbitExchange, + full_broker: RabbitBroker, + event: asyncio.Event, + ): + @full_broker.subscriber(queue, exchange=exchange, retry=1, no_ack=True) + async def handler(msg: RabbitMessage): + event.set() + + await full_broker.start() + with patch.object( + IncomingMessage, "ack", spy_decorator(IncomingMessage.ack) + ) as m: + await asyncio.wait( + ( + asyncio.create_task( + full_broker.publish("hello", queue=queue, exchange=exchange) + ), + asyncio.create_task(event.wait()), + ), + timeout=3, + ) + m.mock.assert_not_called() + + assert event.is_set() diff --git a/tests/brokers/test_pushback.py b/tests/brokers/test_pushback.py index 1195d79be9..07f7a51e4d 100644 --- a/tests/brokers/test_pushback.py +++ b/tests/brokers/test_pushback.py @@ -20,8 +20,8 @@ async def test_push_back_correct(async_mock: AsyncMock, message): watcher = CounterWatcher(3) context = WatcherContext( - watcher, - message, + message=message, + watcher=watcher, ) async with context: @@ -37,8 +37,8 @@ async def test_push_back_endless_correct(async_mock: AsyncMock, message): watcher = EndlessWatcher() context = WatcherContext( - watcher, - message, + message=message, + watcher=watcher, ) async with context: @@ -53,8 +53,8 @@ async def test_push_back_watcher(async_mock: AsyncMock, message): watcher = CounterWatcher(3) context = WatcherContext( - watcher, - message, + message=message, + watcher=watcher, ) async_mock.side_effect = ValueError("Ooops!") @@ -74,8 +74,8 @@ async def test_push_endless_back_watcher(async_mock: AsyncMock, message): watcher = EndlessWatcher() context = WatcherContext( - watcher, - message, + message=message, + watcher=watcher, ) async_mock.side_effect = ValueError("Ooops!") @@ -95,8 +95,8 @@ async def test_ignore_skip(async_mock: AsyncMock, message): watcher = CounterWatcher(3) context = WatcherContext( - watcher, - message, + message=message, + watcher=watcher, ) async_mock.side_effect = SkipMessage()