Skip to content

Commit

Permalink
performance notes
Browse files Browse the repository at this point in the history
  • Loading branch information
nekufa committed Sep 15, 2023
1 parent a426a48 commit 77bcc53
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 0 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ There is an optional if_not_exists flag. If it is set, request will be registere
await queue.register(SycBucket, Bucket(7756527), if_not_exists=True)
await queue.register(SycBucket, Bucket(7756527), if_not_exists=True)
```
## Performance
Performance dependends on many factors, we can only measure clean library overhead with in-memory storages. You can run performance on your hardware with `pytest -s`, with this option performance test will print result for different cases. Perfomance test on intel i5-4670K, Ubuntu 23.04 LTS using Python 3.11.4 gives us about `200_000` rps for batch request registration with sharding and about `600_000` requests for request handling in concurrent mode.

## Advanced queue configuration
You can configure sharded queue using env
- `QUEUE_BACKLOG_RETRY_DELAY = 1`\
Expand Down
81 changes: 81 additions & 0 deletions tests/test_performance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from asyncio import gather
from contextlib import asynccontextmanager
from datetime import datetime
from typing import AsyncGenerator, NamedTuple

from pydantic_settings import BaseSettings
from pytest import mark

from sharded_queue import Handler, Queue, Route, Tube, Worker
from sharded_queue.drivers import RuntimeLock, RuntimeStorage
from sharded_queue.settings import settings


class BenchmarkSettings(BaseSettings):
threads: int = 1
requests: int = 100_000


benchmark = BenchmarkSettings()


class DummyRequest(NamedTuple):
n: int


class DummyHandler(Handler):
@classmethod
async def route(cls, *requests: DummyRequest) -> list[Route]:
return [Route(r.n % benchmark.threads) for r in requests]

async def handle(self, *requests: DummyRequest) -> None:
pass


@mark.asyncio
async def test_performance() -> None:
print('start benchmark')
settings.worker_batch_size = 512
settings.worker_empty_limit = 0
settings.worker_empty_pause = 0
queue: Queue = Queue(RuntimeStorage())
worker: Worker = Worker(RuntimeLock(), queue)
for requests in [1000, 10_000, 100_000]:
benchmark.requests = requests
for threads in [1, 5, 10, 25]:
benchmark.threads = threads
print(f'test {requests} requests using {threads} thread(s)')
async with measure('registration'):
await queue.register(
DummyHandler,
*[DummyRequest(n) for n in range(1, requests)]
)
async with measure('one worker'):
await worker.loop(requests-1)

for thread in range(1, threads):
pipe = Tube(DummyRequest, Route(thread)).pipe
assert await queue.storage.length(pipe) == 0

await queue.register(
DummyHandler,
*[DummyRequest(n) for n in range(1, requests)]
)

async with measure('worker threads'):
await gather(*[
Worker(RuntimeLock(), queue).loop(
(requests-1/threads) - 1
)
])


@asynccontextmanager
async def measure(label: str) -> AsyncGenerator:
start = datetime.now().timestamp()
yield
time = datetime.now().timestamp() - start
rps = round(benchmark.requests / time)
time = round(time, 3)

print(f'{label} [rps: {rps}]')

0 comments on commit 77bcc53

Please sign in to comment.