Skip to content

Commit

Permalink
tests (#570): cover docs (#1077)
Browse files Browse the repository at this point in the history
* tests (#570): cover docs

* tests: fix 3.8 compatibility
  • Loading branch information
Lancetnik authored Dec 18, 2023
1 parent cee7f97 commit f6807c3
Show file tree
Hide file tree
Showing 71 changed files with 1,035 additions and 98 deletions.
4 changes: 2 additions & 2 deletions docs/docs/en/nats/examples/direct.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ async def handler():
Full example:

```python linenums="1"
{! docs_src/nats/direct.py !}
{! docs_src/nats/direct.py [ln:1-12.42,13-] !}
```

### Consumer Announcement

To begin with, we have declared several consumers for two `subjects`: `#!python "test-subj-1"` and `#!python "test-subj-2"`:

```python linenums="7" hl_lines="1 5 9"
{! docs_src/nats/direct.py [ln:7-17] !}
{! docs_src/nats/direct.py [ln:7-12.42,13-17] !}
```

!!! note
Expand Down
4 changes: 2 additions & 2 deletions docs/docs/en/nats/examples/pattern.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ Thus, *NATS* can independently balance the load on queue consumers. You can incr
## Example

```python linenums="1"
{! docs_src/nats/pattern.py !}
{! docs_src/nats/pattern.py [ln:1-12.42,13-] !}
```

### Consumer Announcement

To begin with, we have announced several consumers for two `subjects`: `#!python "*.info"` and `#!python "*.error"`:

```python linenums="7" hl_lines="1 5 9"
{! docs_src/nats/pattern.py [ln:7-17] !}
{! docs_src/nats/pattern.py [ln:7-12.42,13-17] !}
```

At the same time, in the `subject` of our consumers, we specify the *pattern* that will be processed by these consumers.
Expand Down
2 changes: 1 addition & 1 deletion docs/docs_src/nats/direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async def base_handler1(logger: Logger):
logger.info("base_handler1")

@broker.subscriber("test-subj-1", "workers")
async def base_handler2(logger: Logger):
async def base_handler2(logger: Logger): # pragma: no branch
logger.info("base_handler2")

@broker.subscriber("test-subj-2", "workers")
Expand Down
2 changes: 1 addition & 1 deletion docs/docs_src/nats/pattern.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async def base_handler1(logger: Logger):
logger.info("base_handler1")

@broker.subscriber("*.info", "workers")
async def base_handler2(logger: Logger):
async def base_handler2(logger: Logger): # pragma: no branch
logger.info("base_handler2")

@broker.subscriber("*.error", "workers")
Expand Down
Empty file added docs/docs_src/redis/__init__.py
Empty file.
2 changes: 1 addition & 1 deletion docs/docs_src/redis/pub_sub/publisher_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class Data(BaseModel):
)


broker = RedisBroker("localhost:6379")
broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)


Expand Down
2 changes: 1 addition & 1 deletion docs/docs_src/redis/pub_sub/publisher_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from faststream import FastStream, Logger
from faststream.redis import RedisBroker, TestRedisBroker

broker = RedisBroker("localhost:6379")
broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)


Expand Down
4 changes: 2 additions & 2 deletions docs/docs_src/redis/pub_sub/raw_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ class Data(BaseModel):
)


broker = RedisBroker("localhost:6379")
broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)


@broker.subscriber("input_data")
async def on_input_data(msg: Data, logger: Logger) -> Data:
async def on_input_data(msg: Data, logger: Logger):
logger.info(f"on_input_data({msg=})")


Expand Down
2 changes: 1 addition & 1 deletion docs/docs_src/redis/rpc/app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from faststream import FastStream, Logger
from faststream.redis import RedisBroker

broker = RedisBroker("localhost:6379")
broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)


Expand Down
4 changes: 2 additions & 2 deletions docs/docs_src/redis/stream/ack_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from faststream.exceptions import AckMessage
from faststream.redis import RedisBroker, StreamSub

broker = RedisBroker("localhost:6379")
broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)


Expand All @@ -18,4 +18,4 @@ def processing_logic(body):

@app.after_startup
async def test_publishing():
await broker.publish("Hello World!", "test-stream")
await broker.publish("Hello World!", stream="test-stream")
2 changes: 1 addition & 1 deletion docs/docs_src/redis/stream/pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class Data(BaseModel):
)


broker = RedisBroker("localhost:6379")
broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)


Expand Down
Empty file added examples/redis/__init__.py
Empty file.
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Simple and fast framework to create message brokers based microservices"""
__version__ = "0.3.7"
__version__ = "0.3.8"


INSTALL_YAML = """
Expand Down
22 changes: 0 additions & 22 deletions faststream/broker/core/asyncronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,25 +440,3 @@ def _wrap_handler(
_get_dependant=_get_dependant,
_process_kwargs=_process_kwargs,
)

async def _execute_handler(
self,
func: Callable[[StreamMessage[MsgType]], Awaitable[T_HandlerReturn]],
message: StreamMessage[MsgType],
) -> T_HandlerReturn:
"""Executes a handler function asynchronously.
Args:
func: The handler function to be executed.
message: The message to be passed to the handler function.
Returns:
The return value of the handler function.
Raises:
AckMessage: If the handler function raises an AckMessage exception.
NackMessage: If the handler function raises a NackMessage exception.
RejectMessage: If the handler function raises a RejectMessage exception.
"""
return await func(message)
6 changes: 3 additions & 3 deletions faststream/broker/fastapi/route.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class StreamRoute(BaseRoute, Generic[MsgType, P_HandlerParams, T_HandlerReturn])

def __init__(
self,
path: Union[NameRequired, str],
path: Union[NameRequired, str, None],
*extra: Union[NameRequired, str],
endpoint: Union[
Callable[P_HandlerParams, T_HandlerReturn],
Expand All @@ -77,10 +77,10 @@ def __init__(
None.
"""
self.path = path
self.path = path or ""
self.broker = broker

path_name = (path if isinstance(path, str) else path.name) or ""
path_name = self.path if isinstance(self.path, str) else self.path.name

if isinstance(endpoint, HandlerCallWrapper):
orig_call = endpoint._original_call
Expand Down
3 changes: 0 additions & 3 deletions faststream/broker/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ def __init__(self, name: str, **kwargs: Any) -> None:
"""
super().__init__(name=name, **kwargs)

def __hash__(self) -> int:
return hash(self.name)

@overload
@classmethod
def validate(
Expand Down
3 changes: 2 additions & 1 deletion faststream/broker/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ def __init__(
TestApp.__name__,
)

except Exception as e:
except Exception as e: # pragma: no cover
# TODO: remove with 0.5.0
warnings.warn(
(
f"\nError `{repr(e)}` occured at `{self.__class__.__name__}` AST parsing"
Expand Down
2 changes: 1 addition & 1 deletion faststream/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def run(
app, extra = parse_cli_args(app, *ctx.args)
casted_log_level = get_log_level(log_level)

if app_dir:
if app_dir: # pragma: no branch
sys.path.insert(0, app_dir)

args = (app, extra, casted_log_level)
Expand Down
4 changes: 3 additions & 1 deletion faststream/kafka/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,11 @@ async def _consume(self) -> None:
timeout_ms=self.batch_timeout_ms,
max_records=self.max_records,
)
if not messages:

if not messages: # pragma: no cover
await anyio.sleep(self.batch_timeout_ms / 1000)
continue

msg = tuple(chain(*messages.values()))
else:
msg = await self.consumer.getone()
Expand Down
4 changes: 3 additions & 1 deletion faststream/nats/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ async def publish( # type: ignore[override]
if call:
r = await call_handler(
handler=handler,
message=incoming,
message=[incoming]
if getattr(handler.pull_sub, "batch", False)
else incoming,
rpc=rpc,
rpc_timeout=rpc_timeout,
raise_timeout=raise_timeout,
Expand Down
3 changes: 2 additions & 1 deletion faststream/rabbit/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ def __init__(
protocol_version=protocol_version,
security=security,
ssl_context=security_args.get(
"ssl_context", kwargs.pop("ssl_context", None)
"ssl_context",
kwargs.pop("ssl_context", None),
),
**kwargs,
)
Expand Down
2 changes: 1 addition & 1 deletion faststream/rabbit/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async def parse_message(
handler = context.get_local("handler_")
path: AnyDict = {}
path_re: Optional[Pattern[str]]
if ( # pragma: no branch
if (
handler
and (path_re := handler.queue.path_regex) is not None
and (match := path_re.match(message.routing_key or "")) is not None
Expand Down
5 changes: 5 additions & 0 deletions faststream/redis/asyncapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ def schema(self) -> Dict[str, Channel]:
else:
method = "xread"

else:
raise AssertionError("unreachable")

return {
self.name: Channel(
description=self.description,
Expand Down Expand Up @@ -78,6 +81,8 @@ def schema(self) -> Dict[str, Channel]:
method = "publish"
elif self.stream is not None:
method = "xadd"
else:
raise AssertionError("unreachable")

return {
self.name: Channel(
Expand Down
22 changes: 22 additions & 0 deletions faststream/redis/fastapi.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
from typing import Any, Callable, Optional, Sequence, Union

from fastapi import Depends
from redis.asyncio.client import Redis as RedisClient

from faststream._compat import Annotated, override
from faststream.broker.fastapi.context import Context, ContextRepo, Logger
from faststream.broker.fastapi.router import StreamRouter
from faststream.broker.types import P_HandlerParams, T_HandlerReturn
from faststream.broker.wrapper import HandlerCallWrapper
from faststream.redis.broker import RedisBroker as RB
from faststream.redis.message import AnyRedisDict
from faststream.redis.message import RedisMessage as RM
from faststream.redis.schemas import PubSub

__all__ = (
"Context",
Expand All @@ -25,6 +31,22 @@
class RedisRouter(StreamRouter[AnyRedisDict]):
broker_class = RB

def subscriber(
self,
channel: Union[str, PubSub, None] = None,
*,
dependencies: Optional[Sequence[Depends]] = None,
**broker_kwargs: Any,
) -> Callable[
[Callable[P_HandlerParams, T_HandlerReturn]],
HandlerCallWrapper[AnyRedisDict, P_HandlerParams, T_HandlerReturn],
]:
return super().subscriber(
path=channel,
dependencies=dependencies,
**broker_kwargs,
)

@override
@staticmethod
def _setup_log_context( # type: ignore[override]
Expand Down
6 changes: 2 additions & 4 deletions faststream/redis/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,9 @@ async def parse_message(
handler
and handler.channel is not None
and (path_re := handler.channel.path_regex) is not None
and (match := path_re.match(channel)) is not None
):
if path_re is not None:
match = path_re.match(channel)
if match:
path = match.groupdict()
path = match.groupdict()

return OneRedisMessage(
raw_message=message,
Expand Down
2 changes: 1 addition & 1 deletion faststream/redis/shared/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,5 @@ def _setup_log_context(
self,
channel: Optional[str] = None,
) -> None:
if channel is not None:
if channel is not None: # pragma: no branch
self._max_channel_name = max((self._max_channel_name, len(channel)))
2 changes: 1 addition & 1 deletion faststream/utils/ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def read_source_ast(filename: str) -> ast.Module:


def find_ast_node(module: ast.Module, lineno: Optional[int]) -> Optional[ast.AST]:
if lineno is not None:
if lineno is not None: # pragma: no branch
for i in getattr(module, "body", ()):
if i.lineno == lineno:
return cast(ast.AST, i)
Expand Down
12 changes: 11 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ omit = [
[tool.coverage.report]
show_missing = true
skip_empty = true
exclude_lines = [
exclude_also = [
"if __name__ == .__main__.:",
"self.logger",
"def __repr__",
Expand All @@ -276,6 +276,16 @@ omit = [
'*/__main__.py',
'*/__init__.py',
'*/annotations.py',
'docs/docs_src/getting_started/serialization/avro.py',
'docs/docs_src/getting_started/serialization/msgpack_ex.py',
'docs/docs_src/getting_started/serialization/protobuf.py',
'docs/docs_src/integrations/http_frameworks_integrations/aiohttp.py',
'docs/docs_src/integrations/http_frameworks_integrations/blacksheep.py',
'docs/docs_src/integrations/http_frameworks_integrations/falcon.py',
'docs/docs_src/integrations/http_frameworks_integrations/litestar.py',
'docs/docs_src/integrations/http_frameworks_integrations/quart.py',
'docs/docs_src/integrations/http_frameworks_integrations/sanic.py',
'docs/docs_src/integrations/http_frameworks_integrations/tornado.py',
]

[tool.bandit]
4 changes: 2 additions & 2 deletions tests/brokers/base/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ async def test_invalid(self, queue: str):
app = FastAPI(lifespan=router.lifespan_context)

@router.subscriber(queue)
async def hello(msg: int): # pragma: no cover
return msg
async def hello(msg: int):
...

app.include_router(router)

Expand Down
Loading

0 comments on commit f6807c3

Please sign in to comment.