Skip to content

Releases: airtai/faststream

0.5.0

15 Apr 09:12
7d757dc
Compare
Choose a tag to compare

What's Changed

This is the biggest change since the creation of FastStream. We have completely refactored the entire package, changing the object registration mechanism, message processing pipeline, and application lifecycle. However, you won't even notice it—we've preserved all public APIs from breaking changes. The only feature not compatible with the previous code is the new middleware.

New features:

  1. await FastStream.stop() method and StopApplication exception to stop a FastStream worker are added.

  2. broker.subscriber() and router.subscriber() functions now return a Subscriber object you can use later.

subscriber = broker.subscriber("test")

@subscriber(filter = lambda msg: msg.content_type == "application/json")
async def handler(msg: dict[str, Any]):
    ...
 
@subscriber()
async def handler(msg: dict[str, Any]):
    ...

This is the preferred syntax for filtering now (the old one will be removed in 0.6.0)

  1. The router.publisher() function now returns the correct Publisher object you can use later (after broker startup).
publisher = router.publisher("test")

@router.subscriber("in")
async def handler():
    await publisher.publish("msg")

(Until 0.5.0 you could use it in this way with broker.publisher only)

  1. A list of middlewares can be passed to a broker.publisher as well:
broker = Broker(..., middlewares=())

@broker.subscriber(..., middlewares=())
@broker.publisher(..., middlewares=())  # new feature
async def handler():
    ...
  1. Broker-level middlewares now affect all ways to publish a message, so you can encode application outgoing messages here.

  2. ⚠️ BREAKING CHANGE ⚠️ : both subscriber and publisher middlewares should be async context manager type

async def subscriber_middleware(call_next, msg):
    return await call_next(msg)

async def publisher_middleware(call_next, msg, **kwargs):
    return await call_next(msg, **kwargs)

@broker.subscriber(
    "in",
    middlewares=(subscriber_middleware,),
)
@broker.publisher(
    "out",
    middlewares=(publisher_middleware,),
)
async def handler(msg):
    return msg

Such changes allow you two previously unavailable features:

  • suppress any exceptions and pass fall-back message body to publishers, and
  • patch any outgoing message headers and other parameters.

Without those features we could not implement Observability Middleware or any similar tool, so it is the job that just had to be done.
7. A better FastAPI compatibility: fastapi.BackgroundTasks and response_class subscriber option are supported.

  1. All .pyi files are removed, and explicit docstrings and methods options are added.

  2. New subscribers can be registered in runtime (with an already-started broker):

subscriber = broker.subscriber("dynamic")
subscriber(handler_method)
...
broker.setup_subscriber(subscriber)
await subscriber.start()
...
await subscriber.close()
  1. faststream[docs] distribution is removed.

New Contributors

Full Changelog: 0.4.7...0.5.0

v0.5.0rc2

09 Apr 13:08
9656f4b
Compare
Choose a tag to compare
v0.5.0rc2 Pre-release
Pre-release

What's Changed

This is the final API change before stable 0.5.0 release

⚠️ HAS BREAKING CHANGE

In it, we stabilize the behavior of publishers & subscribers middlewares

async def subscriber_middleware(call_next, msg):
    return await call_next(msg)

async def publisher_middleware(call_next, msg, **kwargs):
    return await call_next(msg, **kwargs)

@broker.subscriber(
    "in",
    middlewares=(subscriber_middleware,),
)
@broker.publisher(
    "out",
    middlewares=(publisher_middleware,),
)
async def handler(msg):
    return msg

Such changes allows you two features previously unavailable

  • suppress any exceptions and pas fall-back message body to publishers
  • patch any outgoing message headers and other parameters

Without these features we just can't impelement Observability Middleware or any similar tool, so it is the job to be done.

Now you are free to get access at any message processing stage and we are one step closer to the framework we would like to create!

Full Changelog: 0.5.0rc0...0.5.0rc2

v0.5.0rc0

06 Apr 16:40
7a3a6f9
Compare
Choose a tag to compare
v0.5.0rc0 Pre-release
Pre-release

What's Changed

This is the biggest change since the creation of FastStream. We have completely refactored the entire package, changing the object registration mechanism, message processing pipeline, and application lifecycle. However, you won't even notice it—we've preserved all public APIs from breaking changes. The only feature not compatible with the previous code is the new middleware.

This is still an RC (Release Candidate) for you to test before the stable release. You can manually install it in your project:

pip install faststream==0.5.0rc0

We look forward to your feedback!

New features:

  1. await FastStream.stop() method and StopApplication exception to stop a FastStream worker are added.

  2. broker.subscriber() and router.subscriber() functions now return a Subscriber object you can use later.

subscriber = broker.subscriber("test")

@subscriber(filter = lambda msg: msg.content_type == "application/json")
async def handler(msg: dict[str, Any]):
    ...
 
@subscriber()
async def handler(msg: dict[str, Any]):
    ...

This is the preferred syntax for filtering now (the old one will be removed in 0.6.0)

  1. The router.publisher() function now returns the correct Publisher object you can use later (after broker startup).
publisher = router.publisher("test")

@router.subscriber("in")
async def handler():
    await publisher.publish("msg")

(Until 0.5.0 you could use it in this way with broker.publisher only)

  1. A list of middlewares can be passed to a broker.publisher as well:
broker = Broker(..., middlewares=())

@broker.subscriber(..., middlewares=())
@broker.publisher(..., middlewares=())  # new feature
async def handler():
    ...
  1. Broker-level middlewares now affect all ways to publish a message, so you can encode application outgoing messages here.

  2. ⚠️ BREAKING CHANGE ⚠️ : both subscriber and publisher middlewares should be async context manager type

from contextlib import asynccontextmanager

@asynccontextmanager
async def subscriber_middleware(msg_body):
    yield msg_body

@asynccontextmanager
async def publisher_middleware(
    msg_to_publish,
    **publish_arguments,
):
    yield msg_to_publish

@broker.subscriber("in", middlewares=(subscriber_middleware,))
@broker.publisher("out", middlewares=(publisher_middleware,))
async def handler():
    ...
  1. A better FastAPI compatibility: fastapi.BackgroundTasks and response_class subscriber option are supported.

  2. All .pyi files are removed, and explicit docstrings and methods options are added.

  3. New subscribers can be registered in runtime (with an already-started broker):

subscriber = broker.subscriber("dynamic")
subscriber(handler_method)
...
broker.setup_subscriber(subscriber)
await subscriber.start()
...
await subscriber.close()
  1. faststream[docs] distribution is removed.

New Contributors

Full Changelog: 0.4.7...0.5.0rc0

v0.4.7

09 Mar 16:23
c653913
Compare
Choose a tag to compare

What's Changed

  • Update Release Notes for 0.4.6 by @faststream-release-notes-updater in #1286
  • fix (#1263): correct nested descriminator msg type AsyncAPI schema by @Lancetnik in #1288
  • docs: add apply_types warning notice to subscription/index.md by @Lancetnik in #1291
  • chore: fixed nats-py version by @Lancetnik in #1294

Full Changelog: 0.4.6...0.4.7

v0.4.6

04 Mar 16:08
64f4b52
Compare
Choose a tag to compare

What's Changed

Full Changelog: 0.4.5...0.4.6

v0.4.5

27 Feb 14:54
0318b97
Compare
Choose a tag to compare

What's Changed

  • Update Release Notes for 0.4.4 by @faststream-release-notes-updater in #1260
  • Removed unused pytest dependecy from redis/schemas.py by @ashambalev in #1261
  • chore: bumped package versions by @davorrunje in #1270
  • fix (#1263): correct AsyncAPI schema in descriminator case by @Lancetnik in #1272

New Contributors

Full Changelog: 0.4.4...0.4.5

v0.4.4

24 Feb 09:02
77d8a66
Compare
Choose a tag to compare

What's Changed

Add RedisStream batch size option

@broker.subscriber(stream=StreamSub("input", batch=True, max_records=3))
async def on_input_data(msgs: list[str]):
    assert len(msgs) <= 3
  • Update Release Notes for 0.4.3 by @faststream-release-notes-updater in #1247
  • docs: add manual run section by @Lancetnik in #1249
  • feat (#1252): respect Redis StreamSub last_id with consumer group by @Lancetnik in #1256
  • fix: correct Redis consumer group behavior by @Lancetnik in #1258
  • feat: add Redis Stream max_records option by @Lancetnik in #1259

Full Changelog: 0.4.3...0.4.4

v0.4.3

20 Feb 05:03
ffe1566
Compare
Choose a tag to compare

What's Changed

Allow to specify Redis Stream maxlen option in publisher:

@broker.publisher(stream=StreamSub("Output", maxlen=10))
async def on_input_data():
    ....

Full Changelog: 0.4.2...0.4.3

v0.4.2

02 Feb 06:07
f9f2c17
Compare
Choose a tag to compare

What's Changed

Bug fixes

Full Changelog: 0.4.1...0.4.2

v0.4.1

01 Feb 15:47
380740d
Compare
Choose a tag to compare

What's Changed

Bug fixes

Documentation

Full Changelog: 0.4.0...0.4.1