-
Notifications
You must be signed in to change notification settings - Fork 28
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
41 changed files
with
1,184 additions
and
111 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
# NATS JetStream | ||
|
||
The default **NATS** usage is suitable for scenarios where: | ||
|
||
* `publisher` and `consumer` are always online; | ||
* a system tolerate to the messages loss. | ||
|
||
If you need stricter restrictions, like: | ||
|
||
* an availability of a message processing confirmation mechanism (`ack`/`nack'); | ||
* message persistence (will accumulate in the queue when `consumer` is offline). | ||
|
||
You should use the **NATS JetStream** extension. | ||
|
||
In fact, the **JetStream** extension is the same **NATS** with the addition a persistent layer above the file system. Therefore, all interfaces for publishing and consuming messages are similar to the refular **NATS** usage. | ||
|
||
However, the **JetStream** layer has many possibilities for configuration: from the deleting old messages policy to the maximum stored messages number limit. You can find out more about all **JetStream** features in the official [documentation](https://docs.nats.io/using-nats/developer/develop_jetstream ){.external-link target="_blank"}. | ||
|
||
!!! tip "" | ||
If you have worked with other message brokers, then you should know that the logic of **JS** is closer to **Kafka** than to **RabbitMQ**: messages after confirmation are not deleted from the queue, but remain there until the queue is full and it will start deleting old messages (or in accordance with other logic that you can configure yourself). | ||
|
||
When connecting a `consumer` (and, especially, when reconnecting), you must determine for yourself, according to what logic it will consume messages: from the subject beginning, starting with some message, starting from some time, only new ones, etc. Don't be surprised if a connection is restored, your `consumer` starts to process all messages received earlier again - you haven't defined the rule. | ||
|
||
Also **NATS JetStream** has built-in `key-value` (close to **Redis**) and `object` (close to **Minio**) storages, which, in addition to interface *put/get* have the ability to subscribe to events, which can be extremely useful in vary scenarios. | ||
|
||
**Propan** does not provide access to this functionality directly, but it is covered by the [nats-py] library used({{urls.nats_py }}){target="_blank"}. You can access the **JS** object from the application context: | ||
|
||
```python linenums="1" hl_lines="2 8" | ||
{!> docs_src/nats/js.py !} | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
# NATS JetStream | ||
|
||
Обычное использование **NATS** идеально подходит для сценариев, в которых: | ||
|
||
* `publisher` и `consumer` всегда находятся онлайн; | ||
* система допускает потерю сообщений. | ||
|
||
Если вам нужны более строгие ограничения, а именно: | ||
|
||
* наличие механизма подтверждения обработки сообщений (`ack`/`nack`); | ||
* персистентность сообщений (при отсутствии `consumer`'а сообщения будут накапливаться в очереди). | ||
|
||
Вам следует использование расширение **NATS JetStream**. | ||
|
||
На самом деле расширение **JetStream** - это тот же самый **NATS** с добавлением | ||
персистентного слоя над файловой системой, который обеспечивает хранение сообщений в очереди. Поэтому все интерфейсы публикации и потребления сообщений аналогичны обычному использованию **NATS**. | ||
|
||
Однако, сама логика работы слоя **JetStream** имеет множество возможностей для конфигурации: от политики удаления старых сообщений до ограничения на максимальное число хранимых сообщений. Подробно со всеми возможностями **JetStream** вы можете ознакомиться в официальной [документации](https://docs.nats.io/using-nats/developer/develop_jetstream){.external-link target="_blank"}. | ||
|
||
!!! tip "" | ||
Если вы работали с другими брокерами сообщений, то вам следует знать, что логика работы **JS** ближе к **Kafka**, нежели к **RabbitMQ**: сообщения после подтверждения их обработки не удаляются из очереди, а остаются там до тех пор, пока очередь не наполнится и не начнет удалять старые сообщения (либо в соответсвии с другой логикой, которую вы можете сконфигурировать сами). | ||
|
||
При подключении `consumer`'а (и, особенно, при переподключении) вы должны сами определить, в соотвествии с какой логикой он будет потреблять сообщения: с самого начала, начиная с какого-то сообщения, начиная с какого-то времени, только новые и т.д. Не удивляйтесь, если при восстановлении соединения ваш `consumer` начнет заново обрабатывать все сообщения, полученные ранее - вы просто не определили это правило. | ||
|
||
Также **NATS JetStream** имеет встроенное `key-value`(cхоже с **Redis**) и `object`(схоже с **Minio**) хранилища, которые, помимо своего базового интерфейса *положить/прочитать* имеют возможность подписки на события, что может быть крайне полезно во многих сценариях. | ||
|
||
**Propan** не предоставляет доступ к этому функционалу напрямую, однако он покрывается используемой библиотекой [nats-py]({{ urls.nats_py }}){target="_blank"}. Доступ к объекту **JS** вы можете получить из контекста приложения: | ||
|
||
```python linenums="1" hl_lines="2 8" | ||
{!> docs_src/nats/js.py !} | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
from propan import PropanApp, NatsJSBroker | ||
from propan.annotations import NatsJS | ||
|
||
broker = NatsJSBroker() | ||
app = PropanApp(broker) | ||
|
||
@app.after_startup | ||
async def example(js: NatsJS): | ||
# JS Key-Value Storage | ||
storage = await js.create_key_value(bucket="propan_kv") | ||
|
||
await storage.put("hello", b"propan!") | ||
assert (await storage.get("hello")) == b"propan!" | ||
|
||
# JS Object Storage | ||
storage = await js.create_object_sotre("propan-obs") | ||
|
||
obj_name = "file.mp4" | ||
with open(obj_name) as f: | ||
await storage.put(obj_name, f) | ||
|
||
with open(f"copy-{obj_name}") as f: | ||
await storage.get(obj_name, f) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ | |
|
||
from unittest.mock import Mock | ||
|
||
__version__ = "0.1.4.6" | ||
__version__ = "0.1.5.0" | ||
|
||
|
||
INSTALL_MESSAGE = ( | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,13 @@ | ||
from nats.js.api import DeliverPolicy | ||
|
||
from propan.brokers.nats.nats_broker import NatsBroker, NatsMessage | ||
from propan.brokers.nats.nats_js_broker import NatsJSBroker | ||
from propan.brokers.nats.routing import NatsRouter | ||
|
||
__all__ = ( | ||
"NatsBroker", | ||
"NatsMessage", | ||
"DeliverPolicy", | ||
"NatsJSBroker", | ||
"NatsRouter", | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
from nats.js.api import ( | ||
ConsumerConfig, | ||
DeliverPolicy, | ||
DiscardPolicy, | ||
Placement, | ||
RePublish, | ||
RetentionPolicy, | ||
StorageType, | ||
StreamConfig, | ||
StreamSource, | ||
) | ||
|
||
__all__ = ( | ||
"StreamConfig", | ||
"RetentionPolicy", | ||
"DiscardPolicy", | ||
"Placement", | ||
"StorageType", | ||
"StreamSource", | ||
"RePublish", | ||
"ConsumerConfig", | ||
"DeliverPolicy", | ||
) |
Oops, something went wrong.