diff --git a/docs/docs/en/getting-started/context/extra.md b/docs/docs/en/getting-started/context/extra.md index 703906d269..963a251dbf 100644 --- a/docs/docs/en/getting-started/context/extra.md +++ b/docs/docs/en/getting-started/context/extra.md @@ -29,3 +29,9 @@ By default, context fields are **NOT CAST** to the type specified in their annot If you require this functionality, you can enable the appropriate flag. {! includes/getting_started/context/cast.md !} + +## Initial Value + +Also, `Context` provides you with a `initial` option to setup base context value without previous `set_global` call. + +{! includes/getting_started/context/initial.md !} diff --git a/docs/docs/en/getting-started/subscription/annotation.md b/docs/docs/en/getting-started/subscription/annotation.md index e7ca77912a..5d9e59e9e1 100644 --- a/docs/docs/en/getting-started/subscription/annotation.md +++ b/docs/docs/en/getting-started/subscription/annotation.md @@ -38,3 +38,6 @@ But it doesn't looks like a correct message validation, does it? For this reason, **FastStream** supports per-argument message serialization: you can declare multiple arguments with various types and your message will unpack to them: {! includes/getting_started/subscription/annotation/3.md !} + +!!! tip + By default **FastStream** uses `json.loads` to decode and `json.dumps` to encode your messages. But if you prefer [**orjson**](https://github.com/ijl/orjson){.external-link target="_blank"}, just install it and framework will use it automatically. diff --git a/docs/docs_src/getting_started/context/kafka/initial.py b/docs/docs_src/getting_started/context/kafka/initial.py new file mode 100644 index 0000000000..577f6a00d9 --- /dev/null +++ b/docs/docs_src/getting_started/context/kafka/initial.py @@ -0,0 +1,11 @@ +from faststream import Context +from faststream.kafka import KafkaBroker + +broker = KafkaBroker() + +@broker.subscriber("test-topic") +async def handle( + msg: str, + collector: list[str] = Context(initial=list), +): + collector.append(msg) diff --git a/docs/docs_src/getting_started/context/nats/initial.py b/docs/docs_src/getting_started/context/nats/initial.py new file mode 100644 index 0000000000..227dcaf075 --- /dev/null +++ b/docs/docs_src/getting_started/context/nats/initial.py @@ -0,0 +1,11 @@ +from faststream import Context +from faststream.nats import NatsBroker + +broker = NatsBroker() + +@broker.subscriber("test-subject") +async def handle( + msg: str, + collector: list[str] = Context(initial=list), +): + collector.append(msg) diff --git a/docs/docs_src/getting_started/context/rabbit/initial.py b/docs/docs_src/getting_started/context/rabbit/initial.py new file mode 100644 index 0000000000..ad358992d9 --- /dev/null +++ b/docs/docs_src/getting_started/context/rabbit/initial.py @@ -0,0 +1,11 @@ +from faststream import Context +from faststream.rabbit import RabbitBroker + +broker = RabbitBroker() + +@broker.subscriber("test-queue") +async def handle( + msg: str, + collector: list[str] = Context(initial=list), +): + collector.append(msg) diff --git a/docs/docs_src/getting_started/context/redis/initial.py b/docs/docs_src/getting_started/context/redis/initial.py new file mode 100644 index 0000000000..0d0254b1ce --- /dev/null +++ b/docs/docs_src/getting_started/context/redis/initial.py @@ -0,0 +1,11 @@ +from faststream import Context +from faststream.redis import RedisBroker + +broker = RedisBroker() + +@broker.subscriber("test-channel") +async def handle( + msg: str, + collector: list[str] = Context(initial=list), +): + collector.append(msg) diff --git a/docs/includes/getting_started/context/initial.md b/docs/includes/getting_started/context/initial.md new file mode 100644 index 0000000000..7fdedf8948 --- /dev/null +++ b/docs/includes/getting_started/context/initial.md @@ -0,0 +1,19 @@ +=== "Kafka" + ```python linenums="1" hl_lines="4 6" + {!> docs_src/getting_started/context/kafka/initial.py [ln:6-11] !} + ``` + +=== "RabbitMQ" + ```python linenums="1" hl_lines="4 6" + {!> docs_src/getting_started/context/rabbit/initial.py [ln:6-11] !} + ``` + +=== "NATS" + ```python linenums="1" hl_lines="4 6" + {!> docs_src/getting_started/context/nats/initial.py [ln:6-11] !} + ``` + +=== "Redis" + ```python linenums="1" hl_lines="4 6" + {!> docs_src/getting_started/context/redis/initial.py [ln:6-11] !} + ``` diff --git a/faststream/__about__.py b/faststream/__about__.py index bb1195d71c..e78fb870aa 100644 --- a/faststream/__about__.py +++ b/faststream/__about__.py @@ -1,5 +1,5 @@ """Simple and fast framework to create message brokers based microservices.""" -__version__ = "0.3.9" +__version__ = "0.3.10" INSTALL_YAML = """ diff --git a/faststream/broker/core/asyncronous.py b/faststream/broker/core/asyncronous.py index 40e186b3ca..cba05279c1 100644 --- a/faststream/broker/core/asyncronous.py +++ b/faststream/broker/core/asyncronous.py @@ -160,10 +160,7 @@ def _process_message( func: Callable[[StreamMessage[MsgType]], Awaitable[T_HandlerReturn]], watcher: Callable[..., AsyncContextManager[None]], **kwargs: Any, - ) -> Callable[ - [StreamMessage[MsgType]], - Awaitable[WrappedReturn[T_HandlerReturn]], - ]: + ) -> Callable[[StreamMessage[MsgType]], Awaitable[WrappedReturn[T_HandlerReturn]],]: """Process a message. Args: diff --git a/faststream/broker/handler.py b/faststream/broker/handler.py index d69fb88759..a7dfdc6991 100644 --- a/faststream/broker/handler.py +++ b/faststream/broker/handler.py @@ -359,7 +359,9 @@ async def consume(self, msg: MsgType) -> SendableMessage: # type: ignore[overri if IS_OPTIMIZED: # pragma: no cover break - assert not self.running or processed, "You have to consume message" # nosec B101 + assert ( + not self.running or processed + ), "You have to consume message" # nosec B101 context.reset_local("log_context", log_context_tag) diff --git a/faststream/kafka/broker.py b/faststream/kafka/broker.py index 6f3baaf036..bba10a43e7 100644 --- a/faststream/kafka/broker.py +++ b/faststream/kafka/broker.py @@ -202,10 +202,7 @@ def _process_message( func: Callable[[KafkaMessage], Awaitable[T_HandlerReturn]], watcher: Callable[..., AsyncContextManager[None]], **kwargs: Any, - ) -> Callable[ - [KafkaMessage], - Awaitable[WrappedReturn[T_HandlerReturn]], - ]: + ) -> Callable[[KafkaMessage], Awaitable[WrappedReturn[T_HandlerReturn]],]: """Wrap a message processing function with a watcher and publisher. Args: diff --git a/faststream/log/formatter.py b/faststream/log/formatter.py index a14116c3e1..68981c46e3 100644 --- a/faststream/log/formatter.py +++ b/faststream/log/formatter.py @@ -24,7 +24,9 @@ class ColourizedFormatter(logging.Formatter): """ - level_name_colors: DefaultDict[str, Callable[[str], str]] = defaultdict( # noqa: RUF012 + level_name_colors: DefaultDict[ + str, Callable[[str], str] + ] = defaultdict( # noqa: RUF012 lambda: str, **{ str(logging.DEBUG): lambda level_name: click.style( diff --git a/faststream/nats/broker.py b/faststream/nats/broker.py index ae39138a81..602644fa9a 100644 --- a/faststream/nats/broker.py +++ b/faststream/nats/broker.py @@ -232,10 +232,7 @@ def _process_message( func: Callable[[StreamMessage[Msg]], Awaitable[T_HandlerReturn]], watcher: Callable[..., AsyncContextManager[None]], **kwargs: Any, - ) -> Callable[ - [StreamMessage[Msg]], - Awaitable[WrappedReturn[T_HandlerReturn]], - ]: + ) -> Callable[[StreamMessage[Msg]], Awaitable[WrappedReturn[T_HandlerReturn]],]: @wraps(func) async def process_wrapper( message: StreamMessage[Msg], @@ -411,11 +408,7 @@ def subscriber( # type: ignore[override] def consumer_wrapper( func: Callable[P_HandlerParams, T_HandlerReturn], - ) -> HandlerCallWrapper[ - Msg, - P_HandlerParams, - T_HandlerReturn, - ]: + ) -> HandlerCallWrapper[Msg, P_HandlerParams, T_HandlerReturn,]: handler_call, dependant = self._wrap_handler( func, extra_dependencies=dependencies, diff --git a/faststream/nats/broker.pyi b/faststream/nats/broker.pyi index cff88a96aa..cbf6f4cdc3 100644 --- a/faststream/nats/broker.pyi +++ b/faststream/nats/broker.pyi @@ -215,10 +215,7 @@ class NatsBroker( func: Callable[[StreamMessage[Msg]], Awaitable[T_HandlerReturn]], watcher: Callable[..., AsyncContextManager[None]], **kwargs: Any, - ) -> Callable[ - [StreamMessage[Msg]], - Awaitable[WrappedReturn[T_HandlerReturn]], - ]: ... + ) -> Callable[[StreamMessage[Msg]], Awaitable[WrappedReturn[T_HandlerReturn]],]: ... def _log_connection_broken( self, error_cb: Optional[ErrorCallback] = None, diff --git a/faststream/nats/parser.py b/faststream/nats/parser.py index b872f342c9..e994777af1 100644 --- a/faststream/nats/parser.py +++ b/faststream/nats/parser.py @@ -38,10 +38,7 @@ async def parse_message( async def parse_message( self, message: Union[Msg, List[Msg]], *, path: Optional[AnyDict] = None - ) -> Union[ - StreamMessage[Msg], - StreamMessage[List[Msg]], - ]: + ) -> Union[StreamMessage[Msg], StreamMessage[List[Msg]],]: if isinstance(message, list): return NatsMessage( is_js=self.is_js, diff --git a/faststream/redis/broker.py b/faststream/redis/broker.py index 6c28798bf1..baa2d6c509 100644 --- a/faststream/redis/broker.py +++ b/faststream/redis/broker.py @@ -167,10 +167,7 @@ def _process_message( func: Callable[[StreamMessage[Any]], Awaitable[T_HandlerReturn]], watcher: Callable[..., AsyncContextManager[None]], **kwargs: Any, - ) -> Callable[ - [StreamMessage[Any]], - Awaitable[WrappedReturn[T_HandlerReturn]], - ]: + ) -> Callable[[StreamMessage[Any]], Awaitable[WrappedReturn[T_HandlerReturn]],]: @wraps(func) async def process_wrapper( message: StreamMessage[Any], @@ -256,11 +253,7 @@ def subscriber( # type: ignore[override] def consumer_wrapper( func: Callable[P_HandlerParams, T_HandlerReturn], - ) -> HandlerCallWrapper[ - AnyRedisDict, - P_HandlerParams, - T_HandlerReturn, - ]: + ) -> HandlerCallWrapper[AnyRedisDict, P_HandlerParams, T_HandlerReturn,]: handler_call, dependant = self._wrap_handler( func, extra_dependencies=dependencies, diff --git a/faststream/redis/broker.pyi b/faststream/redis/broker.pyi index 4685447286..71633837ca 100644 --- a/faststream/redis/broker.pyi +++ b/faststream/redis/broker.pyi @@ -159,10 +159,7 @@ class RedisBroker( func: Callable[[StreamMessage[Any]], Awaitable[T_HandlerReturn]], watcher: Callable[..., AsyncContextManager[None]], **kwargs: Any, - ) -> Callable[ - [StreamMessage[Any]], - Awaitable[WrappedReturn[T_HandlerReturn]], - ]: ... + ) -> Callable[[StreamMessage[Any]], Awaitable[WrappedReturn[T_HandlerReturn]],]: ... @override def subscriber( # type: ignore[override] self, diff --git a/faststream/redis/publisher.py b/faststream/redis/publisher.py index e4f6c5f69b..d789553d0d 100644 --- a/faststream/redis/publisher.py +++ b/faststream/redis/publisher.py @@ -43,7 +43,9 @@ async def publish( # type: ignore[override] list = ListSub.validate(list or self.list) stream = StreamSub.validate(stream or self.stream) - assert any((channel, list, stream)), "You have to specify outgoing channel" # nosec B101 + assert any( + (channel, list, stream) + ), "You have to specify outgoing channel" # nosec B101 headers_to_send = (self.headers or {}).copy() if headers is not None: diff --git a/pyproject.toml b/pyproject.toml index bd4e62e250..2494ee6906 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -318,3 +318,13 @@ omit = [ ] [tool.bandit] + +[tool.black] +line-length = 88 + +extend-exclude = """ +/( + docs/docs_src + | some_other_dir +)/ +""" diff --git a/scripts/lint.sh b/scripts/lint.sh index d607cfb956..b2f83c2872 100755 --- a/scripts/lint.sh +++ b/scripts/lint.sh @@ -6,5 +6,8 @@ pyup_dirs --py38-plus --recursive faststream examples tests docs echo "Running ruff linter (isort, flake, pyupgrade, etc. replacement)..." ruff check -echo "Running ruff formater (black replacement)..." -ruff format +# echo "Running ruff formater (black replacement)..." +# ruff format + +echo "Running black..." +black faststream examples tests docs diff --git a/tests/asyncapi/base/arguments.py b/tests/asyncapi/base/arguments.py index 7df9a45cdd..677273ac93 100644 --- a/tests/asyncapi/base/arguments.py +++ b/tests/asyncapi/base/arguments.py @@ -70,7 +70,7 @@ async def handle(msg: int): schema = get_app_schema(self.build_app(broker)).to_jsonable() payload = schema["components"]["schemas"] - assert tuple(schema["channels"].values())[0].get("description") is None # noqa: RUF015 + assert next(iter(schema["channels"].values())).get("description") is None for key, v in payload.items(): assert key == "Handle:Message:Payload" @@ -333,7 +333,7 @@ class User(pydantic.BaseModel): else: class Config: - schema_extra = {"examples": [{"name": "john", "id": 1}]} # noqa: RUF012 + schema_extra = {"examples": [{"name": "john", "id": 1}]} broker = self.broker_class() @@ -379,7 +379,11 @@ async def handle_default(msg): schema = get_app_schema(self.build_app(broker)).to_jsonable() assert ( - len(list(schema["components"]["messages"].values())[0]["payload"]["oneOf"]) # noqa: RUF015 + len( + next(iter(schema["components"]["messages"].values()))["payload"][ + "oneOf" + ] + ) == 2 ) diff --git a/tests/asyncapi/rabbit/test_connection.py b/tests/asyncapi/rabbit/test_connection.py index 4362e8ac48..9ab1837f6e 100644 --- a/tests/asyncapi/rabbit/test_connection.py +++ b/tests/asyncapi/rabbit/test_connection.py @@ -56,65 +56,58 @@ def test_custom(): broker.publisher("test") schema = get_app_schema(FastStream(broker)).to_jsonable() - assert ( - schema - == { - "asyncapi": "2.6.0", - "channels": { - "test:_:Publisher": { + assert schema == { + "asyncapi": "2.6.0", + "channels": { + "test:_:Publisher": { + "bindings": { + "amqp": { + "bindingVersion": "0.2.0", + "exchange": {"type": "default", "vhost": "/vh"}, + "is": "routingKey", + "queue": { + "autoDelete": False, + "durable": False, + "exclusive": False, + "name": "test", + "vhost": "/vh", + }, + } + }, + "publish": { "bindings": { "amqp": { + "ack": True, "bindingVersion": "0.2.0", - "exchange": {"type": "default", "vhost": "/vh"}, - "is": "routingKey", - "queue": { - "autoDelete": False, - "durable": False, - "exclusive": False, - "name": "test", - "vhost": "/vh", - }, + "cc": "test", + "deliveryMode": 1, + "mandatory": True, } }, - "publish": { - "bindings": { - "amqp": { - "ack": True, - "bindingVersion": "0.2.0", - "cc": "test", - "deliveryMode": 1, - "mandatory": True, - } - }, - "message": { - "$ref": "#/components/messages/test:_:Publisher:Message" - }, + "message": { + "$ref": "#/components/messages/test:_:Publisher:Message" }, - "servers": ["development"], - } - }, - "components": { - "messages": { - "test:_:Publisher:Message": { - "correlationId": { - "location": "$message.header#/correlation_id" - }, - "payload": { - "$ref": "#/components/schemas/test:_:PublisherPayload" - }, - "title": "test:_:Publisher:Message", - } }, - "schemas": {"test:_:PublisherPayload": {}}, - }, - "defaultContentType": "application/json", - "info": {"description": "", "title": "FastStream", "version": "0.1.0"}, - "servers": { - "development": { - "protocol": "amqp", - "protocolVersion": "0.9.1", - "url": "amqp://guest:guest@127.0.0.1:5672/vh", # pragma: allowlist secret + "servers": ["development"], + } + }, + "components": { + "messages": { + "test:_:Publisher:Message": { + "correlationId": {"location": "$message.header#/correlation_id"}, + "payload": {"$ref": "#/components/schemas/test:_:PublisherPayload"}, + "title": "test:_:Publisher:Message", } }, - } - ) + "schemas": {"test:_:PublisherPayload": {}}, + }, + "defaultContentType": "application/json", + "info": {"description": "", "title": "FastStream", "version": "0.1.0"}, + "servers": { + "development": { + "protocol": "amqp", + "protocolVersion": "0.9.1", + "url": "amqp://guest:guest@127.0.0.1:5672/vh", # pragma: allowlist secret + } + }, + } diff --git a/tests/asyncapi/rabbit/test_naming.py b/tests/asyncapi/rabbit/test_naming.py index 4948b0886d..128a08a419 100644 --- a/tests/asyncapi/rabbit/test_naming.py +++ b/tests/asyncapi/rabbit/test_naming.py @@ -48,63 +48,58 @@ async def handle(): schema = get_app_schema(FastStream(broker)).to_jsonable() - assert ( - schema - == { - "asyncapi": "2.6.0", - "defaultContentType": "application/json", - "info": {"title": "FastStream", "version": "0.1.0", "description": ""}, - "servers": { - "development": { - "url": "amqp://guest:guest@localhost:5672/", # pragma: allowlist secret - "protocol": "amqp", - "protocolVersion": "0.9.1", - } - }, - "channels": { - "test:_:Handle": { - "servers": ["development"], + assert schema == { + "asyncapi": "2.6.0", + "defaultContentType": "application/json", + "info": {"title": "FastStream", "version": "0.1.0", "description": ""}, + "servers": { + "development": { + "url": "amqp://guest:guest@localhost:5672/", # pragma: allowlist secret + "protocol": "amqp", + "protocolVersion": "0.9.1", + } + }, + "channels": { + "test:_:Handle": { + "servers": ["development"], + "bindings": { + "amqp": { + "is": "routingKey", + "bindingVersion": "0.2.0", + "queue": { + "name": "test", + "durable": False, + "exclusive": False, + "autoDelete": False, + "vhost": "/", + }, + "exchange": {"type": "default", "vhost": "/"}, + } + }, + "subscribe": { "bindings": { "amqp": { - "is": "routingKey", + "cc": "test", + "ack": True, "bindingVersion": "0.2.0", - "queue": { - "name": "test", - "durable": False, - "exclusive": False, - "autoDelete": False, - "vhost": "/", - }, - "exchange": {"type": "default", "vhost": "/"}, } }, - "subscribe": { - "bindings": { - "amqp": { - "cc": "test", - "ack": True, - "bindingVersion": "0.2.0", - } - }, - "message": { - "$ref": "#/components/messages/test:_:Handle:Message" - }, + "message": { + "$ref": "#/components/messages/test:_:Handle:Message" }, - } - }, - "components": { - "messages": { - "test:_:Handle:Message": { - "title": "test:_:Handle:Message", - "correlationId": { - "location": "$message.header#/correlation_id" - }, - "payload": {"$ref": "#/components/schemas/EmptyPayload"}, - } - }, - "schemas": { - "EmptyPayload": {"title": "EmptyPayload", "type": "null"} }, + } + }, + "components": { + "messages": { + "test:_:Handle:Message": { + "title": "test:_:Handle:Message", + "correlationId": { + "location": "$message.header#/correlation_id" + }, + "payload": {"$ref": "#/components/schemas/EmptyPayload"}, + } }, - } - ) + "schemas": {"EmptyPayload": {"title": "EmptyPayload", "type": "null"}}, + }, + } diff --git a/tests/asyncapi/rabbit/test_router.py b/tests/asyncapi/rabbit/test_router.py index 145951e4fc..8dfe4e5567 100644 --- a/tests/asyncapi/rabbit/test_router.py +++ b/tests/asyncapi/rabbit/test_router.py @@ -24,68 +24,65 @@ async def handle(msg): schema = get_app_schema(FastStream(broker)).to_jsonable() - assert ( - schema - == { - "asyncapi": "2.6.0", - "defaultContentType": "application/json", - "info": {"title": "FastStream", "version": "0.1.0", "description": ""}, - "servers": { - "development": { - "url": "amqp://guest:guest@localhost:5672/", # pragma: allowlist secret - "protocol": "amqp", - "protocolVersion": "0.9.1", - } - }, - "channels": { - "test_test:_:Handle": { - "servers": ["development"], + assert schema == { + "asyncapi": "2.6.0", + "defaultContentType": "application/json", + "info": {"title": "FastStream", "version": "0.1.0", "description": ""}, + "servers": { + "development": { + "url": "amqp://guest:guest@localhost:5672/", # pragma: allowlist secret + "protocol": "amqp", + "protocolVersion": "0.9.1", + } + }, + "channels": { + "test_test:_:Handle": { + "servers": ["development"], + "bindings": { + "amqp": { + "is": "routingKey", + "bindingVersion": "0.2.0", + "queue": { + "name": "test_test", + "durable": False, + "exclusive": False, + "autoDelete": False, + "vhost": "/", + }, + "exchange": {"type": "default", "vhost": "/"}, + } + }, + "subscribe": { "bindings": { "amqp": { - "is": "routingKey", + "cc": "key", + "ack": True, "bindingVersion": "0.2.0", - "queue": { - "name": "test_test", - "durable": False, - "exclusive": False, - "autoDelete": False, - "vhost": "/", - }, - "exchange": {"type": "default", "vhost": "/"}, } }, - "subscribe": { - "bindings": { - "amqp": { - "cc": "key", - "ack": True, - "bindingVersion": "0.2.0", - } - }, - "message": { - "$ref": "#/components/messages/test_test:_:Handle:Message" - }, + "message": { + "$ref": "#/components/messages/test_test:_:Handle:Message" + }, + }, + } + }, + "components": { + "messages": { + "test_test:_:Handle:Message": { + "title": "test_test:_:Handle:Message", + "correlationId": { + "location": "$message.header#/correlation_id" + }, + "payload": { + "$ref": "#/components/schemas/Handle:Message:Payload" }, } }, - "components": { - "messages": { - "test_test:_:Handle:Message": { - "title": "test_test:_:Handle:Message", - "correlationId": { - "location": "$message.header#/correlation_id" - }, - "payload": { - "$ref": "#/components/schemas/Handle:Message:Payload" - }, - } - }, - "schemas": { - "Handle:Message:Payload": {"title": "Handle:Message:Payload"} - }, + "schemas": { + "Handle:Message:Payload": {"title": "Handle:Message:Payload"} }, - } - ) + }, + } class TestRouterArguments(ArgumentsTestcase): # noqa: D101 diff --git a/tests/asyncapi/rabbit/test_security.py b/tests/asyncapi/rabbit/test_security.py index d635132540..1df2aee9c3 100644 --- a/tests/asyncapi/rabbit/test_security.py +++ b/tests/asyncapi/rabbit/test_security.py @@ -57,28 +57,25 @@ def test_plaintext_security_schema(): assert broker._connection_kwargs.get("ssl_context") is ssl_context schema = get_app_schema(FastStream(broker)).to_jsonable() - assert ( - schema - == { - "asyncapi": "2.6.0", - "channels": {}, - "components": { - "messages": {}, - "schemas": {}, - "securitySchemes": {"user-password": {"type": "userPassword"}}, - }, - "defaultContentType": "application/json", - "info": {"description": "", "title": "FastStream", "version": "0.1.0"}, - "servers": { - "development": { - "protocol": "amqps", - "protocolVersion": "0.9.1", - "security": [{"user-password": []}], - "url": "amqps://admin:password@localhost:5672/", # pragma: allowlist secret - } - }, - } - ) + assert schema == { + "asyncapi": "2.6.0", + "channels": {}, + "components": { + "messages": {}, + "schemas": {}, + "securitySchemes": {"user-password": {"type": "userPassword"}}, + }, + "defaultContentType": "application/json", + "info": {"description": "", "title": "FastStream", "version": "0.1.0"}, + "servers": { + "development": { + "protocol": "amqps", + "protocolVersion": "0.9.1", + "security": [{"user-password": []}], + "url": "amqps://admin:password@localhost:5672/", # pragma: allowlist secret + } + }, + } def test_plaintext_security_schema_without_ssl(): @@ -95,25 +92,22 @@ def test_plaintext_security_schema_without_ssl(): ) # pragma: allowlist secret schema = get_app_schema(FastStream(broker)).to_jsonable() - assert ( - schema - == { - "asyncapi": "2.6.0", - "channels": {}, - "components": { - "messages": {}, - "schemas": {}, - "securitySchemes": {"user-password": {"type": "userPassword"}}, - }, - "defaultContentType": "application/json", - "info": {"description": "", "title": "FastStream", "version": "0.1.0"}, - "servers": { - "development": { - "protocol": "amqp", - "protocolVersion": "0.9.1", - "security": [{"user-password": []}], - "url": "amqp://admin:password@localhost:5672/", # pragma: allowlist secret - } - }, - } - ) + assert schema == { + "asyncapi": "2.6.0", + "channels": {}, + "components": { + "messages": {}, + "schemas": {}, + "securitySchemes": {"user-password": {"type": "userPassword"}}, + }, + "defaultContentType": "application/json", + "info": {"description": "", "title": "FastStream", "version": "0.1.0"}, + "servers": { + "development": { + "protocol": "amqp", + "protocolVersion": "0.9.1", + "security": [{"user-password": []}], + "url": "amqp://admin:password@localhost:5672/", # pragma: allowlist secret + } + }, + } diff --git a/tests/cli/test_app.py b/tests/cli/test_app.py index c544ed898e..d33b389efc 100644 --- a/tests/cli/test_app.py +++ b/tests/cli/test_app.py @@ -145,11 +145,10 @@ async def test_exception_group(async_mock: AsyncMock, app: FastStream): async def raises(): await async_mock.excp() - with patch.object(app.broker, "start", async_mock.broker_run), patch.object( # noqa: SIM117 - app.broker, "close", async_mock.broker_stopped - ): - with pytest.raises(ValueError): # noqa: PT011 - await app.run() + with patch.object(app.broker, "start", async_mock.broker_run): # noqa: SIM117 + with patch.object(app.broker, "close", async_mock.broker_stopped): + with pytest.raises(ValueError): # noqa: PT011 + await app.run() @pytest.mark.asyncio() diff --git a/tests/cli/test_asyncapi_docs.py b/tests/cli/test_asyncapi_docs.py index 1c60a24bae..3dc9ba2742 100644 --- a/tests/cli/test_asyncapi_docs.py +++ b/tests/cli/test_asyncapi_docs.py @@ -21,7 +21,7 @@ def test_gen_asyncapi_json_for_kafka_app(runner: CliRunner, kafka_basic_project: Path): - r = runner.invoke(cli, GEN_JSON_CMD + ["--out", "shema.json", kafka_basic_project]) # noqa: RUF005 + r = runner.invoke(cli, GEN_JSON_CMD + ["--out", "shema.json", kafka_basic_project]) assert r.exit_code == 0 schema_path = Path.cwd() / "shema.json" diff --git a/tests/docs/getting_started/context/test_initial.py b/tests/docs/getting_started/context/test_initial.py new file mode 100644 index 0000000000..dd8aaec1cf --- /dev/null +++ b/tests/docs/getting_started/context/test_initial.py @@ -0,0 +1,60 @@ +import pytest + +from faststream import context +from tests.marks import python39 + + +@pytest.mark.asyncio() +@python39 +async def test_kafka(): + from docs.docs_src.getting_started.context.kafka.initial import broker + from faststream.kafka import TestKafkaBroker + + async with TestKafkaBroker(broker) as br: + await br.publish("", "test-topic") + await br.publish("", "test-topic") + + assert context.get("collector") == ["", ""] + context.clear() + + +@pytest.mark.asyncio() +@python39 +async def test_rabbit(): + from docs.docs_src.getting_started.context.rabbit.initial import broker + from faststream.rabbit import TestRabbitBroker + + async with TestRabbitBroker(broker) as br: + await br.publish("", "test-queue") + await br.publish("", "test-queue") + + assert context.get("collector") == ["", ""] + context.clear() + + +@pytest.mark.asyncio() +@python39 +async def test_nats(): + from docs.docs_src.getting_started.context.nats.initial import broker + from faststream.nats import TestNatsBroker + + async with TestNatsBroker(broker) as br: + await br.publish("", "test-subject") + await br.publish("", "test-subject") + + assert context.get("collector") == ["", ""] + context.clear() + + +@pytest.mark.asyncio() +@python39 +async def test_redis(): + from docs.docs_src.getting_started.context.redis.initial import broker + from faststream.redis import TestRedisBroker + + async with TestRedisBroker(broker) as br: + await br.publish("", "test-channel") + await br.publish("", "test-channel") + + assert context.get("collector") == ["", ""] + context.clear() diff --git a/tests/docs/getting_started/publishing/test_decorator.py b/tests/docs/getting_started/publishing/test_decorator.py index b866a8703d..441b4dcae6 100644 --- a/tests/docs/getting_started/publishing/test_decorator.py +++ b/tests/docs/getting_started/publishing/test_decorator.py @@ -19,7 +19,7 @@ async def test_decorator_kafka(): async with TestKafkaBroker(broker), TestApp(app): handle.mock.assert_called_once_with("") handle_next.mock.assert_called_once_with("Hi!") - list(broker._publishers.values())[0].mock.assert_called_once_with("Hi!") # noqa: RUF015 + next(iter(broker._publishers.values())).mock.assert_called_once_with("Hi!") @pytest.mark.asyncio() @@ -34,7 +34,7 @@ async def test_decorator_rabbit(): async with TestRabbitBroker(broker), TestApp(app): handle.mock.assert_called_once_with("") handle_next.mock.assert_called_once_with("Hi!") - list(broker._publishers.values())[0].mock.assert_called_once_with("Hi!") # noqa: RUF015 + next(iter(broker._publishers.values())).mock.assert_called_once_with("Hi!") @pytest.mark.asyncio() @@ -49,7 +49,7 @@ async def test_decorator_nats(): async with TestNatsBroker(broker), TestApp(app): handle.mock.assert_called_once_with("") handle_next.mock.assert_called_once_with("Hi!") - list(broker._publishers.values())[0].mock.assert_called_once_with("Hi!") # noqa: RUF015 + next(iter(broker._publishers.values())).mock.assert_called_once_with("Hi!") @pytest.mark.asyncio() @@ -64,4 +64,4 @@ async def test_decorator_redis(): async with TestRedisBroker(broker), TestApp(app): handle.mock.assert_called_once_with("") handle_next.mock.assert_called_once_with("Hi!") - list(broker._publishers.values())[0].mock.assert_called_once_with("Hi!") # noqa: RUF015 + next(iter(broker._publishers.values())).mock.assert_called_once_with("Hi!") diff --git a/tests/docs/getting_started/routers/test_delay.py b/tests/docs/getting_started/routers/test_delay.py index 4600ef3c9f..5888e0103a 100644 --- a/tests/docs/getting_started/routers/test_delay.py +++ b/tests/docs/getting_started/routers/test_delay.py @@ -15,7 +15,7 @@ async def test_delay_router_kafka(): ) async with TestKafkaBroker(broker) as br, TestApp(app): - list(br.handlers.values())[0].calls[0][0].mock.assert_called_once_with( # noqa: RUF015 + next(iter(br.handlers.values())).calls[0][0].mock.assert_called_once_with( {"name": "John", "user_id": 1} ) @@ -28,7 +28,7 @@ async def test_delay_router_rabbit(): ) async with TestRabbitBroker(broker) as br, TestApp(app): - list(br.handlers.values())[0].calls[0][0].mock.assert_called_once_with( # noqa: RUF015 + next(iter(br.handlers.values())).calls[0][0].mock.assert_called_once_with( {"name": "John", "user_id": 1} ) @@ -41,7 +41,7 @@ async def test_delay_router_nats(): ) async with TestNatsBroker(broker) as br, TestApp(app): - list(br.handlers.values())[0].calls[0][0].mock.assert_called_once_with( # noqa: RUF015 + next(iter(br.handlers.values())).calls[0][0].mock.assert_called_once_with( {"name": "John", "user_id": 1} ) @@ -54,6 +54,6 @@ async def test_delay_router_redis(): ) async with TestRedisBroker(broker) as br, TestApp(app): - list(br.handlers.values())[0].calls[0][0].mock.assert_called_once_with( # noqa: RUF015 + next(iter(br.handlers.values())).calls[0][0].mock.assert_called_once_with( {"name": "John", "user_id": 1} ) diff --git a/tests/docs/rabbit/test_security.py b/tests/docs/rabbit/test_security.py index c6fea53c0b..a486152a7e 100644 --- a/tests/docs/rabbit/test_security.py +++ b/tests/docs/rabbit/test_security.py @@ -35,24 +35,21 @@ async def test_base_security(): assert connection.mock.call_args.kwargs["ssl_context"] schema = get_app_schema(FastStream(broker)).to_jsonable() - assert ( - schema - == { - "asyncapi": "2.6.0", - "channels": {}, - "components": {"messages": {}, "schemas": {}, "securitySchemes": {}}, - "defaultContentType": "application/json", - "info": {"description": "", "title": "FastStream", "version": "0.1.0"}, - "servers": { - "development": { - "protocol": "amqps", - "protocolVersion": "0.9.1", - "security": [], - "url": "amqps://guest:guest@localhost:5672/", # pragma: allowlist secret - } - }, - } - ) + assert schema == { + "asyncapi": "2.6.0", + "channels": {}, + "components": {"messages": {}, "schemas": {}, "securitySchemes": {}}, + "defaultContentType": "application/json", + "info": {"description": "", "title": "FastStream", "version": "0.1.0"}, + "servers": { + "development": { + "protocol": "amqps", + "protocolVersion": "0.9.1", + "security": [], + "url": "amqps://guest:guest@localhost:5672/", # pragma: allowlist secret + } + }, + } @pytest.mark.asyncio() @@ -68,25 +65,22 @@ async def test_plaintext_security(): assert connection.mock.call_args.kwargs["ssl_context"] schema = get_app_schema(FastStream(broker)).to_jsonable() - assert ( - schema - == { - "asyncapi": "2.6.0", - "channels": {}, - "components": { - "messages": {}, - "schemas": {}, - "securitySchemes": {"user-password": {"type": "userPassword"}}, - }, - "defaultContentType": "application/json", - "info": {"description": "", "title": "FastStream", "version": "0.1.0"}, - "servers": { - "development": { - "protocol": "amqps", - "protocolVersion": "0.9.1", - "security": [{"user-password": []}], - "url": "amqps://admin:password@localhost:5672/", # pragma: allowlist secret - } - }, - } - ) + assert schema == { + "asyncapi": "2.6.0", + "channels": {}, + "components": { + "messages": {}, + "schemas": {}, + "securitySchemes": {"user-password": {"type": "userPassword"}}, + }, + "defaultContentType": "application/json", + "info": {"description": "", "title": "FastStream", "version": "0.1.0"}, + "servers": { + "development": { + "protocol": "amqps", + "protocolVersion": "0.9.1", + "security": [{"user-password": []}], + "url": "amqps://admin:password@localhost:5672/", # pragma: allowlist secret + } + }, + } diff --git a/tests/marks.py b/tests/marks.py index b1e7f0e4c7..911467508d 100644 --- a/tests/marks.py +++ b/tests/marks.py @@ -12,4 +12,4 @@ pydanticV1 = pytest.mark.skipif(PYDANTIC_V2, reason="requires PydanticV2") # noqa: N816 -pydanticV2 = pytest.mark.skipif(not PYDANTIC_V2, reason="requires PydanticV1") # noqa: N816 +pydanticV2 = pytest.mark.skipif(not PYDANTIC_V2, reason="requires PydanticV1")