diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index c220597872..8f63974667 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -376,6 +376,7 @@ search: - [StreamRouter](api/faststream/broker/fastapi/router/StreamRouter.md) - message - [AckStatus](api/faststream/broker/message/AckStatus.md) + - [SourceType](api/faststream/broker/message/SourceType.md) - [StreamMessage](api/faststream/broker/message/StreamMessage.md) - [decode_message](api/faststream/broker/message/decode_message.md) - [encode_message](api/faststream/broker/message/encode_message.md) diff --git a/docs/docs/en/api/faststream/broker/message/SourceType.md b/docs/docs/en/api/faststream/broker/message/SourceType.md new file mode 100644 index 0000000000..fd242902f9 --- /dev/null +++ b/docs/docs/en/api/faststream/broker/message/SourceType.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.broker.message.SourceType diff --git a/faststream/broker/core/usecase.py b/faststream/broker/core/usecase.py index 7069dd2652..7bcf35f708 100644 --- a/faststream/broker/core/usecase.py +++ b/faststream/broker/core/usecase.py @@ -20,6 +20,7 @@ from faststream._compat import is_test_env from faststream.broker.core.logging import LoggingBroker +from faststream.broker.message import SourceType from faststream.broker.middlewares.logging import CriticalLogMiddleware from faststream.broker.proto import SetupAble from faststream.broker.subscriber.proto import SubscriberProto @@ -376,6 +377,7 @@ async def request( parsed_msg: StreamMessage[Any] = await producer._parser(published_msg) parsed_msg._decoded_body = await producer._decoder(parsed_msg) + parsed_msg._source_type = SourceType.Response return await return_msg(parsed_msg) @abstractmethod diff --git a/faststream/broker/message.py b/faststream/broker/message.py index e06e912593..7c1dcae73a 100644 --- a/faststream/broker/message.py +++ b/faststream/broker/message.py @@ -35,6 +35,14 @@ class AckStatus(str, Enum): rejected = "rejected" +class SourceType(str, Enum): + Consume = "Consume" + """Message consumed by basic subscriber flow.""" + + Response = "Response" + """RPC response consumed.""" + + def gen_cor_id() -> str: """Generate random string to use as ID.""" return str(uuid4()) @@ -60,6 +68,7 @@ class StreamMessage(Generic[MsgType]): processed: bool = field(default=False, init=False) committed: Optional[AckStatus] = field(default=None, init=False) + _source_type: SourceType = field(default=SourceType.Consume) _decoded_body: Optional["DecodedMessage"] = field(default=None, init=False) async def ack(self) -> None: diff --git a/faststream/confluent/publisher/usecase.py b/faststream/confluent/publisher/usecase.py index 60fc8329df..0f7712139f 100644 --- a/faststream/confluent/publisher/usecase.py +++ b/faststream/confluent/publisher/usecase.py @@ -17,7 +17,7 @@ from confluent_kafka import Message from typing_extensions import override -from faststream.broker.message import gen_cor_id +from faststream.broker.message import SourceType, gen_cor_id from faststream.broker.publisher.usecase import PublisherUsecase from faststream.broker.types import MsgType from faststream.exceptions import NOT_CONNECTED_YET @@ -124,6 +124,7 @@ async def request( parsed_msg = await self._producer._parser(published_msg) parsed_msg._decoded_body = await self._producer._decoder(parsed_msg) + parsed_msg._source_type = SourceType.Response return await return_msg(parsed_msg) raise AssertionError("unreachable") diff --git a/faststream/kafka/publisher/usecase.py b/faststream/kafka/publisher/usecase.py index 709aea898b..aa95525254 100644 --- a/faststream/kafka/publisher/usecase.py +++ b/faststream/kafka/publisher/usecase.py @@ -17,7 +17,7 @@ from aiokafka import ConsumerRecord from typing_extensions import Annotated, Doc, override -from faststream.broker.message import gen_cor_id +from faststream.broker.message import SourceType, gen_cor_id from faststream.broker.publisher.usecase import PublisherUsecase from faststream.broker.types import MsgType from faststream.exceptions import NOT_CONNECTED_YET @@ -177,6 +177,7 @@ async def request( parsed_msg = await self._producer._parser(published_msg) parsed_msg._decoded_body = await self._producer._decoder(parsed_msg) + parsed_msg._source_type = SourceType.Response return await return_msg(parsed_msg) raise AssertionError("unreachable") diff --git a/faststream/nats/publisher/usecase.py b/faststream/nats/publisher/usecase.py index 83f9a7c0e4..8d74bbbe4b 100644 --- a/faststream/nats/publisher/usecase.py +++ b/faststream/nats/publisher/usecase.py @@ -15,7 +15,7 @@ from nats.aio.msg import Msg from typing_extensions import Annotated, Doc, override -from faststream.broker.message import gen_cor_id +from faststream.broker.message import SourceType, gen_cor_id from faststream.broker.publisher.usecase import PublisherUsecase from faststream.exceptions import NOT_CONNECTED_YET from faststream.utils.functions import return_input @@ -212,6 +212,7 @@ async def request( parsed_msg = await self._producer._parser(published_msg) parsed_msg._decoded_body = await self._producer._decoder(parsed_msg) + parsed_msg._source_type = SourceType.Response return await return_msg(parsed_msg) raise AssertionError("unreachable") diff --git a/faststream/rabbit/publisher/usecase.py b/faststream/rabbit/publisher/usecase.py index f03b3b4a72..041991542a 100644 --- a/faststream/rabbit/publisher/usecase.py +++ b/faststream/rabbit/publisher/usecase.py @@ -15,7 +15,7 @@ from aio_pika import IncomingMessage from typing_extensions import Annotated, Doc, TypedDict, Unpack, deprecated, override -from faststream.broker.message import gen_cor_id +from faststream.broker.message import SourceType, gen_cor_id from faststream.broker.publisher.usecase import PublisherUsecase from faststream.exceptions import NOT_CONNECTED_YET from faststream.rabbit.schemas import BaseRMQInformation, RabbitQueue @@ -373,6 +373,7 @@ async def request( parsed_msg = await self._producer._parser(published_msg) parsed_msg._decoded_body = await self._producer._decoder(parsed_msg) + parsed_msg._source_type = SourceType.Response return await return_msg(parsed_msg) raise AssertionError("unreachable") diff --git a/faststream/redis/publisher/usecase.py b/faststream/redis/publisher/usecase.py index cc9a523439..f517dbee5f 100644 --- a/faststream/redis/publisher/usecase.py +++ b/faststream/redis/publisher/usecase.py @@ -7,7 +7,7 @@ from typing_extensions import Annotated, Doc, deprecated, override -from faststream.broker.message import gen_cor_id +from faststream.broker.message import SourceType, gen_cor_id from faststream.broker.publisher.usecase import PublisherUsecase from faststream.exceptions import NOT_CONNECTED_YET from faststream.redis.message import UnifyRedisDict @@ -268,6 +268,7 @@ async def request( parsed_msg = await self._producer._parser(published_msg) parsed_msg._decoded_body = await self._producer._decoder(parsed_msg) + parsed_msg._source_type = SourceType.Response return await return_msg(parsed_msg) raise AssertionError("unreachable") @@ -481,6 +482,7 @@ async def request( parsed_msg = await self._producer._parser(published_msg) parsed_msg._decoded_body = await self._producer._decoder(parsed_msg) + parsed_msg._source_type = SourceType.Response return await return_msg(parsed_msg) raise AssertionError("unreachable") @@ -762,6 +764,7 @@ async def request( parsed_msg = await self._producer._parser(published_msg) parsed_msg._decoded_body = await self._producer._decoder(parsed_msg) + parsed_msg._source_type = SourceType.Response return await return_msg(parsed_msg) raise AssertionError("unreachable")