Skip to content

Commit

Permalink
fix (#1082): correct NatsTestClient stream publisher (#1083)
Browse files Browse the repository at this point in the history
* fix (#1082): correct NatsTestClient stream publisher

* chore: remove anyio restriction

* test: add test for ExceptionGroup

* test: fix compatibility
  • Loading branch information
Lancetnik authored Dec 20, 2023
1 parent 2c6a524 commit f573af6
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 20 deletions.
36 changes: 21 additions & 15 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,14 @@ jobs:
- name: Test
run: bash scripts/test.sh -m "(slow and (not nats and not kafka and not rabbit and not redis)) or (not nats and not kafka and not rabbit and not redis)"
env:
COVERAGE_FILE: coverage/.coverage.${{ runner.os }}-py${{ matrix.python-version }}
CONTEXT: ${{ runner.os }}-py${{ matrix.python-version }}
COVERAGE_FILE: coverage/.coverage.${{ runner.os }}-py${{ matrix.python-version }}-${{ matrix.pydantic-version }}
CONTEXT: ${{ runner.os }}-py${{ matrix.python-version }}-${{ matrix.pydantic-version }}
- name: Store coverage files
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: coverage
name: .coverage.${{ runner.os }}-py${{ matrix.python-version }}-${{ matrix.pydantic-version }}
path: coverage
if-no-files-found: error

test-macos-latest:
if: github.event.pull_request.draft == false
Expand Down Expand Up @@ -149,10 +150,11 @@ jobs:
COVERAGE_FILE: coverage/.coverage.kafka-py
CONTEXT: kafka-py
- name: Store coverage files
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: coverage
name: .coverage.kafka-py
path: coverage
if-no-files-found: error

test-kafka-smoke:
if: github.event.pull_request.draft == false
Expand Down Expand Up @@ -197,10 +199,11 @@ jobs:
COVERAGE_FILE: coverage/.coverage.rabbit-py
CONTEXT: rabbit-py
- name: Store coverage files
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: coverage
name: .coverage.rabbit-py
path: coverage
if-no-files-found: error

test-rabbit-smoke:
if: github.event.pull_request.draft == false
Expand Down Expand Up @@ -245,10 +248,11 @@ jobs:
COVERAGE_FILE: coverage/.coverage.nats-py
CONTEXT: nats-py
- name: Store coverage files
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: coverage
name: .coverage.nats-py
path: coverage
if-no-files-found: error

test-nats-smoke:
if: github.event.pull_request.draft == false
Expand Down Expand Up @@ -293,10 +297,11 @@ jobs:
COVERAGE_FILE: coverage/.coverage.redis-py
CONTEXT: redis-py
- name: Store coverage files
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: coverage
name: .coverage.redis-py
path: coverage
if-no-files-found: error

test-redis-smoke:
if: github.event.pull_request.draft == false
Expand Down Expand Up @@ -335,10 +340,11 @@ jobs:
cache-dependency-path: pyproject.toml

- name: Get coverage files
uses: actions/download-artifact@v3
uses: actions/download-artifact@v4
with:
name: coverage
pattern: .coverage*
path: coverage
merge-multiple: true

- run: pip install coverage[toml]

Expand All @@ -348,7 +354,7 @@ jobs:
- run: coverage html --show-contexts --title "FastStream coverage for ${{ github.sha }}"

- name: Store coverage html
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: coverage-html
path: htmlcov
Expand Down
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Simple and fast framework to create message brokers based microservices"""
__version__ = "0.3.8"
__version__ = "0.3.9"


INSTALL_YAML = """
Expand Down
7 changes: 6 additions & 1 deletion faststream/nats/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ async def publish( # type: ignore[override]
subject: str,
reply_to: str = "",
headers: Optional[Dict[str, str]] = None,
stream: Optional[str] = None,
correlation_id: Optional[str] = None,
# NatsJSFastProducer compatibility
timeout: Optional[float] = None,
stream: Optional[str] = None,
*,
rpc: bool = False,
rpc_timeout: Optional[float] = None,
Expand All @@ -72,6 +74,9 @@ async def publish( # type: ignore[override]
for handler in self.broker.handlers.values(): # pragma: no branch
call = False

if stream and getattr(handler.stream, "name", None) != stream:
continue

if subject == handler.subject:
call = True

Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ classifiers = [
dynamic = ["version"]

dependencies = [
"anyio>=3.7.1,<4; python_version < '3.11'",
"anyio>=3.7.1,<5; python_version >= '3.11'",
"anyio>=3.7.1,<5",
"fast-depends>=2.2.6,<3",
"typer>=0.9,<1",
"typing-extensions>=4.8.0",
"uvloop>=0.18.0; sys_platform != 'win32' and (sys_platform != 'cygwin' and platform_python_implementation != 'PyPy')",
]

Expand Down
28 changes: 27 additions & 1 deletion tests/brokers/nats/test_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,30 @@

@pytest.mark.nats
class TestPublish(BrokerPublishTestcase):
pass
@pytest.mark.asyncio
async def test_stream_publish(
self,
queue: str,
test_broker,
):
@test_broker.subscriber(queue, stream="test")
async def m():
...

await test_broker.start()
await test_broker.publish("Hi!", queue, stream="test")
m.mock.assert_called_once_with("Hi!")

@pytest.mark.asyncio
async def test_wrong_stream_publish(
self,
queue: str,
test_broker,
):
@test_broker.subscriber(queue)
async def m():
...

await test_broker.start()
await test_broker.publish("Hi!", queue, stream="test")
assert not m.mock.called
17 changes: 17 additions & 0 deletions tests/cli/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,23 @@ async def test_running(async_mock, app: FastStream):
async_mock.broker_stopped.assert_called_once()


@pytest.mark.asyncio
async def test_exception_group(async_mock: AsyncMock, app: FastStream):
app._init_async_cycle()
app._stop_event.set()

async_mock.excp.side_effect = ValueError("Ooops!")

@app.on_startup
async def raises():
await async_mock.excp()

with pytest.raises(ValueError):
with patch.object(app.broker, "start", async_mock.broker_run):
with patch.object(app.broker, "close", async_mock.broker_stopped):
await app.run()


@pytest.mark.asyncio
async def test_running_lifespan_contextmanager(async_mock, mock: Mock, app: FastStream):
@asynccontextmanager
Expand Down

0 comments on commit f573af6

Please sign in to comment.