Skip to content

Commit

Permalink
doc: add return type link reference
Browse files Browse the repository at this point in the history
  • Loading branch information
jason810496 committed Jul 21, 2024
1 parent 4192ff0 commit 6c01871
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 18 deletions.
49 changes: 33 additions & 16 deletions pgmq_sqlalchemy/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def __init__(
.. note::
| ``PGMQueue`` will **auto create** the ``pgmq`` extension ( and ``pg_partman`` extension if the method is related with **partitioned_queue** ) if it does not exist in the Postgres.
| But you must make sure that the ``pgmq`` extension ( or ``pg_partman`` extension )already **installed** in the Postgres.
| But you must make sure that the ``pgmq`` extension ( or ``pg_partman`` extension ) already **installed** in the Postgres.
"""
if not dsn and not engine and not session_maker:
raise ValueError("Must provide either dsn, engine, or session_maker")
Expand Down Expand Up @@ -334,7 +334,8 @@ async def _drop_queue_async(self, queue: str, partitioned: bool = False) -> bool
def drop_queue(self, queue: str, partitioned: bool = False) -> bool:
"""Drop a queue.
.. _drop_queue_anchor:
.. _drop_queue_method: ref:`pgmq_sqlalchemy.PGMQueue.drop_queue`
.. |drop_queue_method| replace:: :py:meth:`~pgmq_sqlalchemy.PGMQueue.drop_queue`
.. code-block:: python
Expand All @@ -345,6 +346,8 @@ def drop_queue(self, queue: str, partitioned: bool = False) -> bool:
.. warning::
| All messages and queue itself will be deleted. (``pgmq.q_<queue_name>`` table)
| **Archived tables** (``pgmq.a_<queue_name>`` table **will be dropped as well. )**
|
| See |archive_method|_ for more details.
"""
# check if the pg_partman extension exists before dropping a partitioned queue at runtime
if partitioned:
Expand Down Expand Up @@ -521,11 +524,14 @@ def read(self, queue_name: str, vt: Optional[int] = None) -> Optional[Message]:
Read a message from the queue.
Returns:
|schema_message_class|_ or ``None`` if the queue is empty.
.. note::
| ``PGMQ`` use |for_update_skip_locked|_ lock to make sure **a message is only read by one consumer**.
| See the `pgmq.read <https://github.com/tembo-io/pgmq/blob/main/pgmq-extension/sql/pgmq.sql?plain=1#L44-L75>`_ function for more details.
|
| For **consumer retries mechanism** (e.g. mark a message as failed after a certain number of retries) can be implemented by using the ``read_ct`` field in the ``Message`` object.
| For **consumer retries mechanism** (e.g. mark a message as failed after a certain number of retries) can be implemented by using the ``read_ct`` field in the |schema_message_class|_ object.
.. important::
Expand Down Expand Up @@ -642,6 +648,9 @@ def read_batch(
| Read a batch of messages from the queue.
| Usage:
Returns:
List of |schema_message_class|_ or ``None`` if the queue is empty.
.. code-block:: python
from pgmq_sqlalchemy.schema import Message
Expand Down Expand Up @@ -752,6 +761,9 @@ def read_with_poll(
max_poll_seconds (int): The maximum number of seconds to poll.
poll_interval_ms (int): The interval in milliseconds to poll.
Returns:
List of |schema_message_class|_ or ``None`` if the queue is empty.
Usage:
.. code-block:: python
Expand Down Expand Up @@ -865,6 +877,9 @@ def delete(self, queue_name: str, msg_id: int) -> bool:
"""
Delete a message from the queue.
.. _delete_method: ref:`pgmq_sqlalchemy.PGMQueue.delete`
.. |delete_method| replace:: :py:meth:`~pgmq_sqlalchemy.PGMQueue.delete`
* Raises an error if the ``queue_name`` does not exist.
* Returns ``True`` if the message is deleted successfully.
* If the message does not exist, returns ``False``.
Expand Down Expand Up @@ -912,9 +927,12 @@ def delete_batch(self, queue_name: str, msg_ids: List[int]) -> List[int]:
"""
Delete a batch of messages from the queue.
.. _delete_batch_method: ref:`pgmq_sqlalchemy.PGMQueue.delete_batch`
.. |delete_batch_method| replace:: :py:meth:`~pgmq_sqlalchemy.PGMQueue.delete_batch`
.. note::
| Instead of return `bool` like ``delete``,
| ``delete_batch`` will return a list of ``msg_ids`` that are successfully deleted.
| Instead of return `bool` like |delete_method|_,
| |delete_batch_method|_ will return a list of ``msg_id`` that are successfully deleted.
.. code-block:: python
Expand Down Expand Up @@ -952,6 +970,9 @@ def archive(self, queue_name: str, msg_id: int) -> bool:
"""
Archive a message from a queue.
.. _archive_method: ref:`pgmq_sqlalchemy.PGMQueue.archive`
.. |archive_method| replace:: :py:meth:`~pgmq_sqlalchemy.PGMQueue.archive`
* Message will be deleted from the queue and moved to the archive table.
* Will be deleted from ``pgmq.q_<queue_name>`` and be inserted into the ``pgmq.a_<queue_name>`` table.
Expand Down Expand Up @@ -997,7 +1018,7 @@ def archive_batch(self, queue_name: str, msg_ids: List[int]) -> List[int]:
Archive multiple messages from a queue.
* Messages will be deleted from the queue and moved to the archive table.
* Returns a list of ``msg_ids`` that are successfully archived.
* Returns a list of ``msg_id`` that are successfully archived.
.. code-block:: python
Expand Down Expand Up @@ -1091,11 +1112,8 @@ def metrics(self, queue_name: str) -> Optional[QueueMetrics]:
"""
Get metrics for a queue.
.. _schema_message_class: `schema.Message`_
.. |schema_message_class| replace:: :py:class:`.~pgmq_sqlalchemy.schema.QueueMetrics`
Returns:
|schema_message_class|_
|schema_queue_metrics_class|_ or ``None`` if the queue does not exist.
Usage:
Expand Down Expand Up @@ -1155,15 +1173,13 @@ def metrics_all(self) -> Optional[List[QueueMetrics]]:
.. _read_committed_isolation_level: https://www.postgresql.org/docs/current/transaction-iso.html#XACT-READ-COMMITTED
.. |read_committed_isolation_level| replace:: **READ COMMITTED**
.. _drop_queue_method: ref:`pgmq_sqlalchemy.PGMQueue.drop_queue`
.. |drop_queue_method| replace:: :py:class:`~pgmq_sqlalchemy.PGMQueue.drop_queue`
.. _metrics_all_method: ref:`pgmq_sqlalchemy.PGMQueue.metrics_all`
.. |metrics_all_method| replace:: :py:meth:`~pgmq_sqlalchemy.PGMQueue.metrics_all`
Get metrics for all queues.
Returns:
List of |schema_message_class|_ objects.
:py:class:`~pgmq_sqlalchemy.schema.QueueMetrics`
List of |schema_queue_metrics_class|_ or ``None`` if there are no queues.
Usage:
Expand All @@ -1178,9 +1194,10 @@ def metrics_all(self) -> Optional[List[QueueMetrics]]:
print(m.queue_length)
.. warning::
| You should use a **distributed lock** to avoid **race conditions** when calling :py:meth:`~pgmq_sqlalchemy.metrics_call` in **concurrent** |drop_queue_method|_ **scenarios**.
| You should use a **distributed lock** to avoid **race conditions** when calling |metrics_all_method|_ in **concurrent** |drop_queue_method|_ **scenarios**.
|
| Since the default PostgreSQL isolation level is |read_committed_isolation_level|_, the queue metrics to be fetched **may not exist** if there are **concurrent** |drop_queue_method|_ **operations**.
| Check the `pgmq.metrics_all <https://github.com/tembo-io/pgmq/blob/main/pgmq-extension/sql/pgmq.sql?plain=1#L334-L346>`_ function for more details.
"""
Expand Down
15 changes: 13 additions & 2 deletions pgmq_sqlalchemy/schema.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Optional


@dataclass
class Message:
"""
.. _schema_message_class: `schema.Message`_
.. |schema_message_class| replace:: :py:class:`.~pgmq_sqlalchemy.schema.Message`
"""

msg_id: int
read_ct: int
enqueued_at: datetime
Expand All @@ -13,8 +19,13 @@ class Message:

@dataclass
class QueueMetrics:
"""
.. _schema_queue_metrics_class: `schema.QueueMetrics`_
.. |schema_queue_metrics_class| replace:: :py:class:`.~pgmq_sqlalchemy.schema.QueueMetrics`
"""

queue_name: str
queue_length: int
newest_msg_age_sec: int
oldest_msg_age_sec: int
newest_msg_age_sec: Optional[int]
oldest_msg_age_sec: Optional[int]
total_messages: int

0 comments on commit 6c01871

Please sign in to comment.