Skip to content

Commit

Permalink
Add initial test for celery tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
xmnlab committed May 30, 2024
1 parent 60a159f commit 02640a6
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 14 deletions.
7 changes: 2 additions & 5 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,10 @@ jobs:
poetry install
- name: start up services
run: |
sugar build
sugar ext restart --options -d
run: makim tests.setup

- name: Run tests
run: |
makim tests.unit
run: makim tests.unit

- name: Run style checks
if: success() || failure()
Expand Down
12 changes: 12 additions & 0 deletions .makim.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ groups:
celery -A tasks.app worker --loglevel=debug &
python app.py
setup:
help: Run the setup for the unit tests
shell: bash
run: |
set -ex
sugar build
sugar ext restart --options -d
sleep 5
pushd tests/
celery -A celery_tasks worker --loglevel=debug &
popd
ci:
help: run the sames tests executed on CI
dependencies:
Expand Down
25 changes: 16 additions & 9 deletions example/tasks/config.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
"""Configuration for Celery app."""

import os

import redis

from celery import Celery

redis_host: str = os.getenv("RETSU_REDIS_HOST", "localhost")
redis_port: int = int(os.getenv("RETSU_REDIS_PORT", 6379))
redis_db: int = int(os.getenv("RETSU_REDIS_DB", 0))

redis_uri = f"redis://{redis_host}:{redis_port}/{redis_db}"

app = Celery(
"retsu",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0",
"app",
broker=redis_uri,
backend=redis_uri,
)

LOG_FORMAT_PREFIX = "[%(asctime)s: %(levelname)s/%(processName)s]"

app.conf.update(
broker_url="redis://localhost:6379/0",
result_backend="redis://localhost:6379/0",
broker_url=redis_uri,
result_backend=redis_uri,
worker_log_format=f"{LOG_FORMAT_PREFIX} %(message)s",
worker_task_log_format=(
f"{LOG_FORMAT_PREFIX} %(task_name)s[%(task_id)s]: %(message)s"
Expand All @@ -27,13 +35,12 @@
)

redis_client = redis.Redis(
host="localhost",
port=6379,
db=0,
host=redis_host,
port=redis_port,
db=redis_db,
ssl=False,
)


try:
print("Pinging Redis...")
redis_client.ping()
Expand Down
69 changes: 69 additions & 0 deletions tests/celery_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Celery Tasks."""

from __future__ import annotations

import os

from datetime import datetime
from time import sleep

import redis

from celery import Celery

redis_host: str = os.getenv("RETSU_REDIS_HOST", "localhost")
redis_port: int = int(os.getenv("RETSU_REDIS_PORT", 6379))
redis_db: int = int(os.getenv("RETSU_REDIS_DB", 0))

redis_uri = f"redis://{redis_host}:{redis_port}/{redis_db}"

app = Celery(
"celery_tasks",
broker=redis_uri,
backend=redis_uri,
)

LOG_FORMAT_PREFIX = "[%(asctime)s: %(levelname)s/%(processName)s]"

app.conf.update(
broker_url=redis_uri,
result_backend=redis_uri,
worker_log_format=f"{LOG_FORMAT_PREFIX} %(message)s",
worker_task_log_format=(
f"{LOG_FORMAT_PREFIX} %(task_name)s[%(task_id)s]: %(message)s"
),
task_annotations={"*": {"rate_limit": "10/s"}},
task_track_started=True,
task_time_limit=30 * 60,
task_soft_time_limit=30 * 60,
worker_redirect_stdouts_level="DEBUG",
)

redis_client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
ssl=False,
)

try:
print("Pinging Redis...")
redis_client.ping()
print("Redis connection is working.")
except redis.ConnectionError as e:
print(f"Failed to connect to Redis: {e}")
exit(1)


@app.task # type: ignore
def task_sum(x: int, y: int, task_id: str) -> int:
"""Sum two numbers, x and y."""
result = x + y
return result


@app.task # type: ignore
def task_sleep(seconds: int, task_id: str) -> int:
"""Sum two numbers, x and y, and sleep the same amount of the sum."""
sleep(seconds)
return int(datetime.now().timestamp())
46 changes: 46 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Configuration used by pytest."""

from __future__ import annotations

import os
import subprocess
import time

from typing import Generator

import pytest


@pytest.fixture(autouse=True, scope="session")
def setup() -> Generator[None, None, None]:
"""Set up the services needed by the tests."""
try:
# Run the `sugar build` command
subprocess.run(["sugar", "build"], check=True)

# Run the `sugar ext restart --options -d` command
subprocess.run(
["sugar", "ext", "restart", "--options", "-d"], check=True
)

# Sleep for 5 seconds
time.sleep(5)

# Change directory to `tests/`
os.chdir("tests/")

# Start the Celery worker
celery_process = subprocess.Popen(
["celery", "-A", "celery_tasks", "worker", "--loglevel=debug"]
)

# Change directory back to the original
os.chdir("..")

yield

finally:
# Teardown: Terminate the Celery worker
celery_process.terminate()
celery_process.wait()
subprocess.run(["sugar", "ext", "stop"], check=True)
93 changes: 93 additions & 0 deletions tests/test_task_celery_serial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""Tests for retsu package."""

from __future__ import annotations

from datetime import datetime
from time import sleep
from typing import Any, Generator

import pytest

from retsu import SerialTask, Task


class MyResultTask(SerialTask):
"""Task for the test."""

def task(self, *args, task_id: str, **kwargs) -> Any: # type: ignore
"""Return the sum of the given 2 numbers."""
a = kwargs.pop("a", 0)
b = kwargs.pop("b", 0)
result = a + b
return result


class MyTimestampTask(SerialTask):
"""Task for the test."""

def task(self, *args, task_id: str, **kwargs) -> Any: # type: ignore
"""Sleep the given seconds, and return the current timestamp."""
sleep_time = kwargs.pop("sleep", 0)
sleep(sleep_time)
return datetime.now().timestamp()


@pytest.fixture
def task_result() -> Generator[Task, None, None]:
"""Create a fixture for MyResultTask."""
task = MyResultTask()
task.start()
yield task
task.stop()


@pytest.fixture
def task_timestamp() -> Generator[Task, None, None]:
"""Create a fixture for MyResultTask."""
task = MyTimestampTask()
task.start()
yield task
task.stop()


class TestSerialTask:
"""TestSerialTask."""

def test_serial_result(self, task_result: Task) -> None:
"""Run simple test for a serial task."""
results: dict[str, int] = {}

task = task_result

for i in range(10):
task_id = task.request(a=i, b=i)
results[task_id] = i + i

for task_id, expected in results.items():
result = task.result.get(task_id, timeout=2)
assert (
result == expected
), f"Expected Result: {expected}, Actual Result: {result}"

def test_serial_timestamp(self, task_timestamp: Task) -> None:
"""Run simple test for a serial task."""
results: list[tuple[str, int]] = []

task = task_timestamp

for sleep_time in range(5, 1, -1):
task_id = task.request(sleep=sleep_time)
results.append((task_id, 0))

# gather results
for i, (task_id, _) in enumerate(results):
results[i] = (task_id, task.result.get(task_id, timeout=10))

# check results
previous_timestamp = results[0][1]
for _, current_timestamp in results[1:]:
assert current_timestamp > previous_timestamp, (
f"Previous timestamp: {previous_timestamp}, "
f"Current timestamp: {current_timestamp}"
)
previous_timestamp = current_timestamp

0 comments on commit 02640a6

Please sign in to comment.