Skip to content

Commit

Permalink
Added test project
Browse files Browse the repository at this point in the history
  • Loading branch information
danyi1212 committed Jan 31, 2024
1 parent c7b4b4e commit 300a640
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 1 deletion.
4 changes: 3 additions & 1 deletion .idea/celery-insights.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions .idea/runConfigurations/test___producer.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions test_project/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
BROKER_URL='amqp://guest:guest@localhost/'
RESULT_BACKEND="redis://localhost"
Empty file added test_project/__init__.py
Empty file.
58 changes: 58 additions & 0 deletions test_project/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import logging.config
import random
import time

from celery import Celery

from logging_config import LOGGING_CONFIG
from settings import Settings

logging.config.dictConfig(LOGGING_CONFIG)

settings = Settings()
app = Celery(
"tests",
broker=settings.broker_url,
backend=settings.result_backend,
)
app.conf.broker_connection_retry_on_startup = True
app.conf.worker_send_task_events = True
app.conf.task_send_sent_event = True
app.conf.task_track_started = True
app.conf.result_extended = True
app.conf.enable_utc = True


@app.task()
def order_workflow():
time.sleep(random.randrange(1, 5))
update_inventory.apply_async()
create_invoice.apply_async()


@app.task()
def create_invoice():
time.sleep(random.randrange(1, 5))


@app.task()
def update_inventory():
time.sleep(random.randrange(1, 5))
create_shipment.apply_async(countdown=10)


@app.task()
def create_shipment():
time.sleep(random.randrange(1, 5))
generate_sales_report.apply_async()
notify_user.apply_async()


@app.task()
def generate_sales_report():
time.sleep(random.randrange(1, 5))


@app.task()
def notify_user():
time.sleep(random.randrange(1, 5))
45 changes: 45 additions & 0 deletions test_project/logging_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": True,
"formatters": {
"color": {
"()": "colorlog.ColoredFormatter",
"format": "%(log_color)s[%(asctime)s.%(msecs)03d] %(levelname)s - %(name)s:%(lineno)d | %(message)s",
"datefmt": "%H:%M:%S",
"log_colors": {
"DEBUG": "cyan",
"INFO": "green",
"WARNING": "yellow",
"ERROR": "red",
"CRITICAL": "red,bg_white",
},
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "color",
"stream": "ext://sys.stdout",
},
},
"root": {
"handlers": ["console"],
"level": "INFO",
"propagate": False,
},
"loggers": {
# Internal loggers
"app": {"level": "INFO"},
"producer": {"level": "INFO"},
# Third-party loggers
"asyncio": {"level": "INFO"},
"concurrent": {"level": "INFO"},
"faker": {"level": "INFO"},
"amqp": {"level": "INFO"},
"celery": {"level": "INFO"},
"dotenv": {"level": "INFO"},
"fastapi": {"level": "INFO"},
"fastapi_cache": {"level": "INFO"},
"kombu": {"level": "INFO"},
},
}
39 changes: 39 additions & 0 deletions test_project/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import asyncio
import inspect
import logging
from typing import Callable, Coroutine, Never, Union

from celery.canvas import Signature

from app import order_workflow

logger = logging.getLogger(__name__)
Callback = Union[Callable[..., Coroutine], Callable[..., None]]


async def timer(interval: float, callback: Callback, stop_signal: asyncio.Event) -> Never:
"""An asynchronous timer that triggers a callback at a given interval and stops when signalled."""
while not stop_signal.is_set():
if inspect.iscoroutinefunction(callback):
await callback()
else:
callback()

await asyncio.sleep(interval)


def publish(signature: Signature) -> None:
logger.info(f"Published {signature.name}")
signature.apply_async()


async def main():
stop_signal = asyncio.Event()

logger.info("Starting producer...")
async with asyncio.TaskGroup() as tg:
tg.create_task(timer(10, lambda: publish(order_workflow.si()), stop_signal))


if __name__ == "__main__":
asyncio.run(main())
13 changes: 13 additions & 0 deletions test_project/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
model_config = SettingsConfigDict(
frozen=True,
case_sensitive=False,
env_file=".env",
env_file_encoding="utf-8",
)

broker_url: str = "amqp://guest:[email protected]/"
result_backend: str = "redis://host.docker.internal:6379/0"

0 comments on commit 300a640

Please sign in to comment.