diff --git a/README.md b/README.md index ae6fd79..0f47ad0 100644 --- a/README.md +++ b/README.md @@ -163,6 +163,9 @@ There is an optional if_not_exists flag. If it is set, request will be registere await queue.register(SycBucket, Bucket(7756527), if_not_exists=True) await queue.register(SycBucket, Bucket(7756527), if_not_exists=True) ``` +## Performance +Performance dependends on many factors, we can only measure clean library overhead with in-memory storages. You can run performance on your hardware with `pytest -s`, with this option performance test will print result for different cases. Perfomance test on intel i5-4670K, Ubuntu 23.04 LTS using Python 3.11.4 gives us about `200_000` rps for batch request registration with sharding and about `600_000` requests for request handling in concurrent mode. + ## Advanced queue configuration You can configure sharded queue using env - `QUEUE_BACKLOG_RETRY_DELAY = 1`\ diff --git a/tests/test_performance.py b/tests/test_performance.py new file mode 100644 index 0000000..7247db5 --- /dev/null +++ b/tests/test_performance.py @@ -0,0 +1,81 @@ +from asyncio import gather +from contextlib import asynccontextmanager +from datetime import datetime +from typing import AsyncGenerator, NamedTuple + +from pydantic_settings import BaseSettings +from pytest import mark + +from sharded_queue import Handler, Queue, Route, Tube, Worker +from sharded_queue.drivers import RuntimeLock, RuntimeStorage +from sharded_queue.settings import settings + + +class BenchmarkSettings(BaseSettings): + threads: int = 1 + requests: int = 100_000 + + +benchmark = BenchmarkSettings() + + +class DummyRequest(NamedTuple): + n: int + + +class DummyHandler(Handler): + @classmethod + async def route(cls, *requests: DummyRequest) -> list[Route]: + return [Route(r.n % benchmark.threads) for r in requests] + + async def handle(self, *requests: DummyRequest) -> None: + pass + + +@mark.asyncio +async def test_performance() -> None: + print('start benchmark') + settings.worker_batch_size = 512 + settings.worker_empty_limit = 0 + settings.worker_empty_pause = 0 + queue: Queue = Queue(RuntimeStorage()) + worker: Worker = Worker(RuntimeLock(), queue) + for requests in [1000, 10_000, 100_000]: + benchmark.requests = requests + for threads in [1, 5, 10, 25]: + benchmark.threads = threads + print(f'test {requests} requests using {threads} thread(s)') + async with measure('registration'): + await queue.register( + DummyHandler, + *[DummyRequest(n) for n in range(1, requests)] + ) + async with measure('one worker'): + await worker.loop(requests-1) + + for thread in range(1, threads): + pipe = Tube(DummyRequest, Route(thread)).pipe + assert await queue.storage.length(pipe) == 0 + + await queue.register( + DummyHandler, + *[DummyRequest(n) for n in range(1, requests)] + ) + + async with measure('worker threads'): + await gather(*[ + Worker(RuntimeLock(), queue).loop( + (requests-1/threads) - 1 + ) + ]) + + +@asynccontextmanager +async def measure(label: str) -> AsyncGenerator: + start = datetime.now().timestamp() + yield + time = datetime.now().timestamp() - start + rps = round(benchmark.requests / time) + time = round(time, 3) + + print(f'{label} [rps: {rps}]')