Skip to content

Commit

Permalink
Implement validate and no_ack subscriber options (#926) (#988)
Browse files Browse the repository at this point in the history
* Implement validate and no_ack subscriber options (#926)

* Fix errors from static-analysis.sh

* Fix nullcontext class with python version < 3.10

* Fix docstrings, move validate option testcase to the base

* fix tests

* Update consume.py

* Move validate option to broker init

* Fix test

* refactor: new process_message logic

* docs: add no_ack instruction

---------

Co-authored-by: Davor Runje <[email protected]>
Co-authored-by: Pastukhov Nikita <[email protected]>
  • Loading branch information
3 people authored Dec 2, 2023
1 parent 78e665b commit bff46c2
Show file tree
Hide file tree
Showing 30 changed files with 298 additions and 70 deletions.
1 change: 1 addition & 0 deletions docs/docs/en/getting-started/dependencies/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ By default, it applies to all event handlers, unless you disabled the same optio

!!! warning
Setting the `apply_types=False` flag not only disables type casting but also `Depends` and `Context`.
If you want to disable only type casting, use `validate=False` instead.

This flag can be useful if you are using **FastStream** within another framework and you need to use its native dependency system.

Expand Down
2 changes: 2 additions & 0 deletions docs/docs/en/kafka/ack.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,5 @@ If you wish to interrupt the processing of a message at any call stack level and
```

This way, **FastStream** interrupts the current message processing and acknowledges it immediately. Similarly, you can raise `NackMessage` as well to prevent the message from being committed.

{!> includes/en/no_ack.md !}
2 changes: 2 additions & 0 deletions docs/docs/en/nats/jetstream/ack.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,5 @@ If you want to interrupt message processing at any call stack, you can raise `fa
```

This way, **FastStream** interrupts the current message proccessing and acknowledges it immediately. Also, you can raise `NackMessage` and `RejectMessage` too.

{!> includes/en/no_ack.md !}
2 changes: 2 additions & 0 deletions docs/docs/en/rabbit/ack.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,5 @@ If you want to interrupt message processing at any call stack, you can raise `fa
```

This way, **FastStream** interrupts the current message proccessing and acknowledges it immediately. Also, you can raise `NackMessage` and `RejectMessage` too.

{!> includes/en/no_ack.md !}
2 changes: 2 additions & 0 deletions docs/docs/en/redis/streams/ack.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@ If the need arises to instantly interrupt message processing at any point in the
```

By raising `AckMessage`, **FastStream** will halt the current message processing routine and immediately acknowledge it. Analogously, raising `NackMessage` would prevent the message from being acknowledged and could lead to its subsequent reprocessing by the same or a different consumer.

{!> includes/en/no_ack.md !}
3 changes: 3 additions & 0 deletions docs/includes/en/no_ack.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
!!! tip
If you want to disable **FastStream** Acknowledgement logic at all, you can use
`#!python @broker.subscriber(..., no_ack=True)` option. This way you should always process a message (ack/nack/terminate/etc) by yourself.
38 changes: 29 additions & 9 deletions faststream/broker/core/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
import os
import warnings
from abc import ABC, abstractmethod
from functools import partial
from itertools import chain
from types import TracebackType
from typing import (
Any,
AsyncContextManager,
Awaitable,
Callable,
Generic,
Expand All @@ -31,7 +33,7 @@
from faststream.broker.message import StreamMessage
from faststream.broker.middlewares import BaseMiddleware, CriticalLogMiddleware
from faststream.broker.publisher import BasePublisher
from faststream.broker.push_back_watcher import BaseWatcher
from faststream.broker.push_back_watcher import WatcherContext
from faststream.broker.router import BrokerRouter
from faststream.broker.types import (
ConnectionType,
Expand All @@ -53,7 +55,11 @@
from faststream.security import BaseSecurity
from faststream.types import AnyDict, F_Return, F_Spec
from faststream.utils import apply_types, context
from faststream.utils.functions import get_function_positional_arguments, to_async
from faststream.utils.functions import (
fake_context,
get_function_positional_arguments,
to_async,
)


class BrokerUsecase(
Expand Down Expand Up @@ -119,6 +125,7 @@ def __init__(
asyncapi_url: Union[str, List[str], None] = None,
# broker kwargs
apply_types: bool = True,
validate: bool = True,
logger: Optional[logging.Logger] = access_logger,
log_level: int = logging.INFO,
log_fmt: Optional[str] = "%(asctime)s %(levelname)s - %(message)s",
Expand All @@ -139,6 +146,7 @@ def __init__(
description: A description of the broker.
tags: Tags associated with the broker.
apply_types: Whether to apply types to messages.
validate: Whether to cast types using Pydantic validation.
logger: The logger to use.
log_level: The log level to use.
log_fmt: The log format to use.
Expand All @@ -159,6 +167,7 @@ def __init__(

self._connection = None
self._is_apply_types = apply_types
self._is_validate = validate
self.handlers = {}
self._publishers = {}
empty_middleware: Sequence[Callable[[MsgType], BaseMiddleware]] = ()
Expand Down Expand Up @@ -253,8 +262,10 @@ def _wrap_handler(
*,
retry: Union[bool, int] = False,
extra_dependencies: Sequence[Depends] = (),
no_ack: bool = False,
_raw: bool = False,
_get_dependant: Optional[Any] = None,
_process_kwargs: Optional[AnyDict] = None,
) -> Tuple[
HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn],
CallModel[Any, Any],
Expand All @@ -265,6 +276,7 @@ def _wrap_handler(
func: The handler function to wrap.
retry: Whether to retry the handler function if it fails. Can be a boolean or an integer specifying the number of retries.
extra_dependencies: Additional dependencies for the handler function.
no_ack: Whether not to ack/nack/reject messages.
_raw: Whether to use the raw handler function.
_get_dependant: The dependant function to use.
**broker_log_context_kwargs: Additional keyword arguments for the broker log context.
Expand All @@ -280,7 +292,7 @@ def _wrap_handler(
"""
build_dep = cast(
Callable[[Callable[F_Spec, F_Return]], CallModel[F_Spec, F_Return]],
_get_dependant or build_call_model,
_get_dependant or partial(build_call_model, cast=self._is_validate),
)

if isinstance(func, HandlerCallWrapper):
Expand All @@ -304,8 +316,8 @@ def _wrap_handler(
if getattr(dependant, "flat_params", None) is None: # handle FastAPI Dependant
dependant = _patch_fastapi_dependant(dependant)

if self._is_apply_types is True and not _raw:
f = apply_types(None)(f, dependant) # type: ignore[arg-type]
if self._is_apply_types and not _raw:
f = apply_types(None, cast=self._is_validate)(f, dependant) # type: ignore[arg-type]

decode_f = self._wrap_decode_message(
func=f,
Expand All @@ -319,10 +331,16 @@ def _wrap_handler(

process_f = self._process_message(
func=decode_f,
watcher=get_watcher(self.logger, retry),
watcher=(
partial(WatcherContext, watcher=get_watcher(self.logger, retry)) # type: ignore[arg-type]
if not no_ack
else fake_context
),
**(_process_kwargs or {}),
)

process_f = set_message_context(process_f)
if self._is_apply_types:
process_f = set_message_context(process_f)

handler_call.set_wrapped(process_f)
return handler_call, dependant
Expand Down Expand Up @@ -386,13 +404,15 @@ def _abc__close(
def _process_message(
self,
func: Callable[[StreamMessage[MsgType]], Awaitable[T_HandlerReturn]],
watcher: BaseWatcher,
) -> Callable[[StreamMessage[MsgType]], Awaitable[WrappedReturn[T_HandlerReturn]],]:
watcher: Callable[..., AsyncContextManager[None]],
**kwargs: Any,
) -> Callable[[StreamMessage[MsgType]], Awaitable[WrappedReturn[T_HandlerReturn]]]:
"""Processes a message using a given function and watcher.
Args:
func: A callable that takes a StreamMessage of type MsgType and returns an Awaitable of type T_HandlerReturn.
watcher: An instance of BaseWatcher.
disable_watcher: Whether to use watcher context.
Returns:
A callable that takes a StreamMessage of type MsgType and returns an Awaitable of type WrappedReturn[T_HandlerReturn].
Expand Down
23 changes: 16 additions & 7 deletions faststream/broker/core/asyncronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from types import TracebackType
from typing import (
Any,
AsyncContextManager,
Awaitable,
Callable,
Mapping,
Expand All @@ -24,7 +25,6 @@
from faststream.broker.handler import AsyncHandler
from faststream.broker.message import StreamMessage
from faststream.broker.middlewares import BaseMiddleware
from faststream.broker.push_back_watcher import BaseWatcher
from faststream.broker.types import (
AsyncCustomDecoder,
AsyncCustomParser,
Expand All @@ -39,7 +39,7 @@
)
from faststream.broker.wrapper import HandlerCallWrapper
from faststream.log import access_logger
from faststream.types import SendableMessage
from faststream.types import AnyDict, SendableMessage
from faststream.utils.functions import to_async


Expand Down Expand Up @@ -167,19 +167,21 @@ async def close(
def _process_message(
self,
func: Callable[[StreamMessage[MsgType]], Awaitable[T_HandlerReturn]],
watcher: BaseWatcher,
watcher: Callable[..., AsyncContextManager[None]],
**kwargs: Any,
) -> Callable[[StreamMessage[MsgType]], Awaitable[WrappedReturn[T_HandlerReturn]],]:
"""Process a message.
Args:
func: A callable function that takes a StreamMessage and returns an Awaitable
watcher: An instance of BaseWatcher
func: A callable function that takes a StreamMessage and returns an Awaitable.
watcher: An instance of BaseWatcher.
disable_watcher: Whether to use watcher context.
Returns:
A callable function that takes a StreamMessage and returns an Awaitable
A callable function that takes a StreamMessage and returns an Awaitable.
Raises:
NotImplementedError: If the method is not implemented
NotImplementedError: If the method is not implemented.
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Expand Down Expand Up @@ -270,6 +272,7 @@ def __init__(
self,
*args: Any,
apply_types: bool = True,
validate: bool = True,
logger: Optional[logging.Logger] = access_logger,
log_level: int = logging.INFO,
log_fmt: Optional[str] = "%(asctime)s %(levelname)s - %(message)s",
Expand All @@ -284,6 +287,7 @@ def __init__(
Args:
*args: Variable length arguments
apply_types: Whether to apply types or not
validate: Whether to cast types using Pydantic validation.
logger: Logger object for logging
log_level: Log level for logging
log_fmt: Log format for logging
Expand All @@ -299,6 +303,7 @@ def __init__(
super().__init__(
*args,
apply_types=apply_types,
validate=validate,
logger=logger,
log_level=log_level,
log_fmt=log_fmt,
Expand Down Expand Up @@ -425,8 +430,10 @@ def _wrap_handler(
*,
retry: Union[bool, int] = False,
extra_dependencies: Sequence[Depends] = (),
no_ack: bool = False,
_raw: bool = False,
_get_dependant: Optional[Any] = None,
_process_kwargs: Optional[AnyDict] = None,
) -> Tuple[
HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn],
CallModel[P_HandlerParams, T_HandlerReturn],
Expand All @@ -437,6 +444,7 @@ def _wrap_handler(
func: The handler function to wrap.
retry: Whether to retry the handler function if it fails. Can be a boolean or an integer specifying the number of retries.
extra_dependencies: Additional dependencies to inject into the handler function.
no_ack: Whether not to ack/nack/reject messages.
_raw: Whether to return the raw response from the handler function.
_get_dependant: An optional object to use as the dependant for the handler function.
**broker_log_context_kwargs: Additional keyword arguments to pass to the broker log context.
Expand All @@ -451,6 +459,7 @@ def _wrap_handler(
func,
retry=retry,
extra_dependencies=extra_dependencies,
no_ack=no_ack,
_raw=_raw,
_get_dependant=_get_dependant,
)
Expand Down
2 changes: 1 addition & 1 deletion faststream/broker/push_back_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ class WatcherContext:

def __init__(
self,
watcher: BaseWatcher,
message: Union[SyncStreamMessage[MsgType], StreamMessage[MsgType]],
watcher: BaseWatcher,
**extra_ack_args: Any,
):
"""Initialize a new instance of the class.
Expand Down
11 changes: 8 additions & 3 deletions faststream/kafka/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from types import TracebackType
from typing import (
Any,
AsyncContextManager,
Awaitable,
Callable,
Dict,
Expand All @@ -25,7 +26,6 @@
from faststream.broker.core.asyncronous import BrokerAsyncUsecase, default_filter
from faststream.broker.message import StreamMessage
from faststream.broker.middlewares import BaseMiddleware
from faststream.broker.push_back_watcher import BaseWatcher, WatcherContext
from faststream.broker.types import (
AsyncPublisherProtocol,
CustomDecoder,
Expand Down Expand Up @@ -206,14 +206,16 @@ async def start(self) -> None:
def _process_message(
self,
func: Callable[[KafkaMessage], Awaitable[T_HandlerReturn]],
watcher: BaseWatcher,
watcher: Callable[..., AsyncContextManager[None]],
**kwargs: Any,
) -> Callable[[KafkaMessage], Awaitable[WrappedReturn[T_HandlerReturn]],]:
"""
Wrap a message processing function with a watcher and publisher.
Args:
func (Callable[[KafkaMessage], Awaitable[T_HandlerReturn]]): The message processing function.
watcher (BaseWatcher): The message watcher.
disable_watcher: Whether to use watcher context.
Returns:
Callable[[KafkaMessage], Awaitable[WrappedReturn[T_HandlerReturn]]]: The wrapped message processing function.
Expand All @@ -237,7 +239,7 @@ async def process_wrapper(
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
async with WatcherContext(watcher, message):
async with watcher(message):
r = await func(message)

pub_response: Optional[AsyncPublisherProtocol]
Expand Down Expand Up @@ -309,6 +311,7 @@ def subscriber( # type: ignore[override]
batch: bool = False,
max_records: Optional[int] = None,
batch_timeout_ms: int = 200,
no_ack: bool = False,
# AsyncAPI information
title: Optional[str] = None,
description: Optional[str] = None,
Expand Down Expand Up @@ -358,6 +361,7 @@ def subscriber( # type: ignore[override]
batch (bool): Whether to process messages in batches.
max_records (Optional[int]): Maximum number of records to process in each batch.
batch_timeout_ms (int): Batch timeout in milliseconds.
no_ack (bool): Whether not to ack/nack/reject messages.
title (Optional[str]): AsyncAPI title.
description (Optional[str]): AsyncAPI description.
**original_kwargs: Additional keyword arguments.
Expand Down Expand Up @@ -441,6 +445,7 @@ def consumer_wrapper(
handler_call, dependant = self._wrap_handler(
func=func,
extra_dependencies=dependencies,
no_ack=no_ack,
**original_kwargs,
)

Expand Down
Loading

0 comments on commit bff46c2

Please sign in to comment.