Skip to content

Commit

Permalink
docs: add Context initial section (#1089)
Browse files Browse the repository at this point in the history
* docs: add Context initial examples

* docs: add orjson mention

* chore: bump version

* ruff format replaced with black

---------

Co-authored-by: Davor Runje <[email protected]>
  • Loading branch information
Lancetnik and davorrunje authored Dec 23, 2023
1 parent 1e5f52e commit 8ab9d30
Show file tree
Hide file tree
Showing 32 changed files with 402 additions and 304 deletions.
6 changes: 6 additions & 0 deletions docs/docs/en/getting-started/context/extra.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,9 @@ By default, context fields are **NOT CAST** to the type specified in their annot
If you require this functionality, you can enable the appropriate flag.

{! includes/getting_started/context/cast.md !}

## Initial Value

Also, `Context` provides you with a `initial` option to setup base context value without previous `set_global` call.

{! includes/getting_started/context/initial.md !}
3 changes: 3 additions & 0 deletions docs/docs/en/getting-started/subscription/annotation.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,6 @@ But it doesn't looks like a correct message validation, does it?
For this reason, **FastStream** supports per-argument message serialization: you can declare multiple arguments with various types and your message will unpack to them:

{! includes/getting_started/subscription/annotation/3.md !}

!!! tip
By default **FastStream** uses `json.loads` to decode and `json.dumps` to encode your messages. But if you prefer [**orjson**](https://github.com/ijl/orjson){.external-link target="_blank"}, just install it and framework will use it automatically.
11 changes: 11 additions & 0 deletions docs/docs_src/getting_started/context/kafka/initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from faststream import Context
from faststream.kafka import KafkaBroker

broker = KafkaBroker()

@broker.subscriber("test-topic")
async def handle(
msg: str,
collector: list[str] = Context(initial=list),
):
collector.append(msg)
11 changes: 11 additions & 0 deletions docs/docs_src/getting_started/context/nats/initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from faststream import Context
from faststream.nats import NatsBroker

broker = NatsBroker()

@broker.subscriber("test-subject")
async def handle(
msg: str,
collector: list[str] = Context(initial=list),
):
collector.append(msg)
11 changes: 11 additions & 0 deletions docs/docs_src/getting_started/context/rabbit/initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from faststream import Context
from faststream.rabbit import RabbitBroker

broker = RabbitBroker()

@broker.subscriber("test-queue")
async def handle(
msg: str,
collector: list[str] = Context(initial=list),
):
collector.append(msg)
11 changes: 11 additions & 0 deletions docs/docs_src/getting_started/context/redis/initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from faststream import Context
from faststream.redis import RedisBroker

broker = RedisBroker()

@broker.subscriber("test-channel")
async def handle(
msg: str,
collector: list[str] = Context(initial=list),
):
collector.append(msg)
19 changes: 19 additions & 0 deletions docs/includes/getting_started/context/initial.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
=== "Kafka"
```python linenums="1" hl_lines="4 6"
{!> docs_src/getting_started/context/kafka/initial.py [ln:6-11] !}
```

=== "RabbitMQ"
```python linenums="1" hl_lines="4 6"
{!> docs_src/getting_started/context/rabbit/initial.py [ln:6-11] !}
```

=== "NATS"
```python linenums="1" hl_lines="4 6"
{!> docs_src/getting_started/context/nats/initial.py [ln:6-11] !}
```

=== "Redis"
```python linenums="1" hl_lines="4 6"
{!> docs_src/getting_started/context/redis/initial.py [ln:6-11] !}
```
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Simple and fast framework to create message brokers based microservices."""
__version__ = "0.3.9"
__version__ = "0.3.10"


INSTALL_YAML = """
Expand Down
5 changes: 1 addition & 4 deletions faststream/broker/core/asyncronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,7 @@ def _process_message(
func: Callable[[StreamMessage[MsgType]], Awaitable[T_HandlerReturn]],
watcher: Callable[..., AsyncContextManager[None]],
**kwargs: Any,
) -> Callable[
[StreamMessage[MsgType]],
Awaitable[WrappedReturn[T_HandlerReturn]],
]:
) -> Callable[[StreamMessage[MsgType]], Awaitable[WrappedReturn[T_HandlerReturn]],]:
"""Process a message.
Args:
Expand Down
4 changes: 3 additions & 1 deletion faststream/broker/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,9 @@ async def consume(self, msg: MsgType) -> SendableMessage: # type: ignore[overri
if IS_OPTIMIZED: # pragma: no cover
break

assert not self.running or processed, "You have to consume message" # nosec B101
assert (
not self.running or processed
), "You have to consume message" # nosec B101

context.reset_local("log_context", log_context_tag)

Expand Down
5 changes: 1 addition & 4 deletions faststream/kafka/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,7 @@ def _process_message(
func: Callable[[KafkaMessage], Awaitable[T_HandlerReturn]],
watcher: Callable[..., AsyncContextManager[None]],
**kwargs: Any,
) -> Callable[
[KafkaMessage],
Awaitable[WrappedReturn[T_HandlerReturn]],
]:
) -> Callable[[KafkaMessage], Awaitable[WrappedReturn[T_HandlerReturn]],]:
"""Wrap a message processing function with a watcher and publisher.
Args:
Expand Down
4 changes: 3 additions & 1 deletion faststream/log/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ class ColourizedFormatter(logging.Formatter):
"""

level_name_colors: DefaultDict[str, Callable[[str], str]] = defaultdict( # noqa: RUF012
level_name_colors: DefaultDict[
str, Callable[[str], str]
] = defaultdict( # noqa: RUF012
lambda: str,
**{
str(logging.DEBUG): lambda level_name: click.style(
Expand Down
11 changes: 2 additions & 9 deletions faststream/nats/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,7 @@ def _process_message(
func: Callable[[StreamMessage[Msg]], Awaitable[T_HandlerReturn]],
watcher: Callable[..., AsyncContextManager[None]],
**kwargs: Any,
) -> Callable[
[StreamMessage[Msg]],
Awaitable[WrappedReturn[T_HandlerReturn]],
]:
) -> Callable[[StreamMessage[Msg]], Awaitable[WrappedReturn[T_HandlerReturn]],]:
@wraps(func)
async def process_wrapper(
message: StreamMessage[Msg],
Expand Down Expand Up @@ -411,11 +408,7 @@ def subscriber( # type: ignore[override]

def consumer_wrapper(
func: Callable[P_HandlerParams, T_HandlerReturn],
) -> HandlerCallWrapper[
Msg,
P_HandlerParams,
T_HandlerReturn,
]:
) -> HandlerCallWrapper[Msg, P_HandlerParams, T_HandlerReturn,]:
handler_call, dependant = self._wrap_handler(
func,
extra_dependencies=dependencies,
Expand Down
5 changes: 1 addition & 4 deletions faststream/nats/broker.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,7 @@ class NatsBroker(
func: Callable[[StreamMessage[Msg]], Awaitable[T_HandlerReturn]],
watcher: Callable[..., AsyncContextManager[None]],
**kwargs: Any,
) -> Callable[
[StreamMessage[Msg]],
Awaitable[WrappedReturn[T_HandlerReturn]],
]: ...
) -> Callable[[StreamMessage[Msg]], Awaitable[WrappedReturn[T_HandlerReturn]],]: ...
def _log_connection_broken(
self,
error_cb: Optional[ErrorCallback] = None,
Expand Down
5 changes: 1 addition & 4 deletions faststream/nats/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ async def parse_message(

async def parse_message(
self, message: Union[Msg, List[Msg]], *, path: Optional[AnyDict] = None
) -> Union[
StreamMessage[Msg],
StreamMessage[List[Msg]],
]:
) -> Union[StreamMessage[Msg], StreamMessage[List[Msg]],]:
if isinstance(message, list):
return NatsMessage(
is_js=self.is_js,
Expand Down
11 changes: 2 additions & 9 deletions faststream/redis/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,7 @@ def _process_message(
func: Callable[[StreamMessage[Any]], Awaitable[T_HandlerReturn]],
watcher: Callable[..., AsyncContextManager[None]],
**kwargs: Any,
) -> Callable[
[StreamMessage[Any]],
Awaitable[WrappedReturn[T_HandlerReturn]],
]:
) -> Callable[[StreamMessage[Any]], Awaitable[WrappedReturn[T_HandlerReturn]],]:
@wraps(func)
async def process_wrapper(
message: StreamMessage[Any],
Expand Down Expand Up @@ -256,11 +253,7 @@ def subscriber( # type: ignore[override]

def consumer_wrapper(
func: Callable[P_HandlerParams, T_HandlerReturn],
) -> HandlerCallWrapper[
AnyRedisDict,
P_HandlerParams,
T_HandlerReturn,
]:
) -> HandlerCallWrapper[AnyRedisDict, P_HandlerParams, T_HandlerReturn,]:
handler_call, dependant = self._wrap_handler(
func,
extra_dependencies=dependencies,
Expand Down
5 changes: 1 addition & 4 deletions faststream/redis/broker.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,7 @@ class RedisBroker(
func: Callable[[StreamMessage[Any]], Awaitable[T_HandlerReturn]],
watcher: Callable[..., AsyncContextManager[None]],
**kwargs: Any,
) -> Callable[
[StreamMessage[Any]],
Awaitable[WrappedReturn[T_HandlerReturn]],
]: ...
) -> Callable[[StreamMessage[Any]], Awaitable[WrappedReturn[T_HandlerReturn]],]: ...
@override
def subscriber( # type: ignore[override]
self,
Expand Down
4 changes: 3 additions & 1 deletion faststream/redis/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ async def publish( # type: ignore[override]
list = ListSub.validate(list or self.list)
stream = StreamSub.validate(stream or self.stream)

assert any((channel, list, stream)), "You have to specify outgoing channel" # nosec B101
assert any(
(channel, list, stream)
), "You have to specify outgoing channel" # nosec B101

headers_to_send = (self.headers or {}).copy()
if headers is not None:
Expand Down
10 changes: 10 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -318,3 +318,13 @@ omit = [
]

[tool.bandit]

[tool.black]
line-length = 88

extend-exclude = """
/(
docs/docs_src
| some_other_dir
)/
"""
7 changes: 5 additions & 2 deletions scripts/lint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@ pyup_dirs --py38-plus --recursive faststream examples tests docs
echo "Running ruff linter (isort, flake, pyupgrade, etc. replacement)..."
ruff check

echo "Running ruff formater (black replacement)..."
ruff format
# echo "Running ruff formater (black replacement)..."
# ruff format

echo "Running black..."
black faststream examples tests docs
10 changes: 7 additions & 3 deletions tests/asyncapi/base/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def handle(msg: int):
schema = get_app_schema(self.build_app(broker)).to_jsonable()

payload = schema["components"]["schemas"]
assert tuple(schema["channels"].values())[0].get("description") is None # noqa: RUF015
assert next(iter(schema["channels"].values())).get("description") is None

for key, v in payload.items():
assert key == "Handle:Message:Payload"
Expand Down Expand Up @@ -333,7 +333,7 @@ class User(pydantic.BaseModel):
else:

class Config:
schema_extra = {"examples": [{"name": "john", "id": 1}]} # noqa: RUF012
schema_extra = {"examples": [{"name": "john", "id": 1}]}

broker = self.broker_class()

Expand Down Expand Up @@ -379,7 +379,11 @@ async def handle_default(msg):
schema = get_app_schema(self.build_app(broker)).to_jsonable()

assert (
len(list(schema["components"]["messages"].values())[0]["payload"]["oneOf"]) # noqa: RUF015
len(
next(iter(schema["components"]["messages"].values()))["payload"][
"oneOf"
]
)
== 2
)

Expand Down
Loading

0 comments on commit 8ab9d30

Please sign in to comment.