Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise DB worker queries #15

Open
RealOrangeOne opened this issue Jun 4, 2024 · 10 comments · May be fixed by #113
Open

Optimise DB worker queries #15

RealOrangeOne opened this issue Jun 4, 2024 · 10 comments · May be fixed by #113
Labels
database-backend Issues relating to the database backend
Milestone

Comments

@RealOrangeOne
Copy link
Owner

The initial DB worker (#3) polls for tasks every second (by default, configurable). These queries (and by extension, the underlying table) aren't especially optimised. It would be great if they could be. The queries themselves ought to be ok, however adding strategic indexing should help with performance.

@RealOrangeOne RealOrangeOne added this to the 1.0 milestone Jun 4, 2024
@jribbens
Copy link

jribbens commented Jun 5, 2024

You should perhaps consider making the table for active (i.e. NEW/RUNNING) tasks a separate table from that used for inactive (FAILED/COMPLETE) tasks. Indexing can only get you so far, and it's hard to beat the simplicity of keeping the short, frequently-accessed list of active tasks separate from the long, infrequently-accessed list of inactive tasks.

@ddahan
Copy link

ddahan commented Jun 5, 2024

Another idea could be to automatically purge failed/complete tasks after a period that the user can define in a setting.
Of course, this is not possible in all use cases and can not be considered as an all-in-one solution.

@jribbens
Copy link

jribbens commented Jun 5, 2024

Another idea could be to automatically purge failed/complete tasks after a period that the user can define in a setting. Of course, this is not possible in all use cases and can not be considered as an all-in-one solution.

That's discussed in #16. That has to be implemented because otherwise you're basically making a never-rotated log which will just continue to use more and more storage space forever. But it's not great to have competing requirements ("I want to keep task results for a long time" vs "I want the system to be fast") pulling you in opposite directions on the same setting.

@RealOrangeOne
Copy link
Owner Author

I think "I want to keep task results for a long time" is an anti-pattern. The result should be short-lived, and persisted to somewhere else (ie your business logic) in all cases.

Reducing the size of the table absolutely helps with performance, but database engines are well optimised for scans on large tables - far larger than this table is likely to hit. Indexing will help partition said scans down quite a lot, achieving something very similar to partitioning the table. I think a hard partition is far more effort than it's worth.

@RealOrangeOne RealOrangeOne added the database-backend Issues relating to the database backend label Jun 8, 2024
@knyghty
Copy link
Contributor

knyghty commented Jun 25, 2024

Just from looking at the QuerySet I'd say the filtering mostly happens on the status field, so adding an index there seems quite obvious.

run_after is also filtered when using the ready() method. It could be added as a second field to the index. I just wonder how much it's worth it. How many tasks do we expect to be new but not ready? Maybe hundreds? Even thousands would probably be fine and maybe wouldn't even use the index. On the other hand, disk space is cheap, so maybe it'd be worthwhile.

If there's some agreement I could prepare a PR.

@RealOrangeOne
Copy link
Owner Author

That sounds like the ideal start to me. I'm not sure what the query plan would look like for combined indexes vs separate ones, but it's hard to know that without a proper scale of data.

The queue_name is also used quite often for the worker - perhaps it'd be useful to have an index there?

@knyghty
Copy link
Contributor

knyghty commented Jun 25, 2024

I will try to find some time to have a look into it and throw some data in and see when they get used.

@knyghty
Copy link
Contributor

knyghty commented Oct 7, 2024

Okay I made some time to look into this.

I mostly looked at DBTaskResult.objects.ready() as this is I think what will benefit most from the indexes. The others only filter on status so the index should also work well for those. I also checked .complete() because I made a million of those and wanted to check out how well the ordering works for that. Probably there's not much reason to get all rows, I imagine it'd be paginated. But in the admin, counts are still slow and pagination works such that looking at high page numbers is also very slow, so I thought it worth a look.

I tried at first with SQLite but the explain output wasn't useful enough, so I switched to PostgreSQL. But from my initial tests I would say the following mostly holds for both SQLite and Postgres.

I set up the test data with the following command. I'm not sure how realistic this data is, but it seemed "reasonable" to me. If we clean up this database often enough that we don't have millions of rows in it, probably none of this is necessary.

import datetime

from django.core.management import BaseCommand
from django.utils import timezone
from django_tasks.backends.database.models import DBTaskResult


class Command(BaseCommand):
    def handle(self, *args, **kwargs):
        now = timezone.now()
        one_day = datetime.timedelta(days=1)
        earlier = now - one_day
        later = now + one_day
        for _ in range(1_000_000):
            DBTaskResult.objects.create(status="COMPLETE", args_kwargs="")
        for _ in range(100_000):
            DBTaskResult.objects.create(status="FAILED", args_kwargs="")
        for _ in range(10_000):
            DBTaskResult.objects.create(status="NEW", run_after=later, args_kwargs="")
        for _ in range(1_000):
            DBTaskResult.objects.create(status="NEW", run_after=earlier, args_kwargs="")
            DBTaskResult.objects.create(status="NEW", args_kwargs="")
            DBTaskResult.objects.create(status="RUNNING", args_kwargs="")

Explain for DBTaskResult.objects.ready() without changes:

Sort  (cost=21569.78..21569.79 rows=1 width=143) (actual time=83.894..84.912 rows=2000 loops=1)
  Sort Key: priority DESC, run_after DESC NULLS LAST
  Sort Method: quicksort  Memory: 330kB
  ->  Gather  (cost=1000.00..21569.77 rows=1 width=143) (actual time=83.054..84.462 rows=2000 loops=1)
       Workers Planned: 2
        Workers Launched: 2
        ->  Parallel Seq Scan on django_tasks_database_dbtaskresult  (cost=0.00..20569.67 rows=1 width=143) (actual time=61.924..62.123 rows=667 loops=3)
              Filter: (((run_after IS NULL) OR (run_after <= '2024-10-07 10:42:49.646184+00'::timestamp with time zone)) AND ((status)::text = 'NEW'::text))
              Rows Removed by Filter: 370333
Planning Time: 0.432 ms
Execution Time: 84.986 ms

Sequential scan is not amazing to see, and is reflected in the fairly slow speed.

DBTaskResult.objects.complete():

Gather Merge  (cost=125734.95..229278.01 rows=887450 width=143) (actual time=188.961..507.089 rows=1000000 loops=1)
  Workers Planned: 2
  Workers Launched: 2
  ->  Sort  (cost=124734.93..125844.24 rows=443725 width=143) (actual time=180.958..206.857 rows=333333 loops=3)
        Sort Key: priority DESC, run_after DESC NULLS LAST
        Sort Method: external merge  Disk: 28816kB
        Worker 0:  Sort Method: external merge  Disk: 26560kB
        Worker 1:  Sort Method: external merge  Disk: 27096kB
        ->  Parallel Seq Scan on django_tasks_database_dbtaskresult  (cost=0.00..19415.06 rows=443725 width=143) (actual time=0.019..60.951 rows=333333 loops=3)
              Filter: ((status)::text = 'COMPLETE'::text)
              Rows Removed by Filter: 37667
Planning Time: 0.107 ms
Execution Time: 533.226 ms

The on-disk sort here is also not wonderful, and this is very slow, as you'd expect. Putting a limit on avoids the on-disk merge, but is still slow (around 100ms), which is probably what you'd see on e.g. first page of paginated results.

DBTaskResult.objects.ready() with indexes = [models.Index(fields=["status"])]:

Sort  (cost=4.46..4.46 rows=1 width=143) (actual time=4.328..4.548 rows=2000 loops=1)
  Sort Key: priority DESC, run_after DESC NULLS LAST
  Sort Method: quicksort  Memory: 330kB
  ->  Index Scan using django_task_status_9e2de0_idx on django_tasks_database_dbtaskresult  (cost=0.43..4.45 rows=1 width=143) (actual time=2.639..3.365 rows=2000 loops=1)
        Index Cond: ((status)::text = 'NEW'::text)
        Filter: ((run_after IS NULL) OR (run_after <= '2024-10-07 10:51:33.499874+00'::timestamp with time zone))
        Rows Removed by Filter: 10000
Planning Time: 0.147 ms
Execution Time: 4.664 ms

Much better. It doesn't seem to help complete() though.

With indexes = [models.Index(fields=["status", "run_after"])]:

Sort  (cost=6.21..6.21 rows=1 width=143) (actual time=5.052..5.165 rows=2000 loops=1)
  Sort Key: priority DESC, run_after DESC NULLS LAST
  Sort Method: quicksort  Memory: 330kB
  ->  Index Scan using django_task_status_845397_idx on django_tasks_database_dbtaskresult  (cost=0.43..6.20 rows=1 width=143) (actual time=0.069..4.197 rows=2000 loops=1)
        Index Cond: ((status)::text = 'NEW'::text)
        Filter: ((run_after IS NULL) OR (run_after <= '2024-10-07 10:53:13.943714+00'::timestamp with time zone))
        Rows Removed by Filter: 10000
Planning Time: 0.133 ms
Execution Time: 5.391 ms

Didn't seem to get used, and slows down the query slightly if anything. Probably after the first filter, there's few enough rows that it's not needed. With different data this could be faster but I'm not sure how many ready or scheduled tasks we expect to see. complete() is still not helped.

Adding a new index just for the ordering, so we end up with the following does work very nicely though, without impacting the other queries.

indexes = [
    models.Index(fields=["status"),
    models.Index(F("priority").desc(), F("run_after").desc(nulls_last=True), name="idx_task_ordering"),
]

With explain:

Index Scan using idx_task_ordering on django_tasks_database_dbtaskresult  (cost=0.43..55058.56 rows=1073019 width=143) (actual time=2.926..187.273 rows=1003836 loops=1)
  Filter: ((status)::text = 'COMPLETE'::text)
  Rows Removed by Filter: 113000
Planning Time: 0.477 ms
Execution Time: 213.439 ms

So unless you have more details about how many rows you'd expect of which type, I think this is quite reasonable.

I also didn't check queue_name (yet!). I think this should be added to the manager, because part of me not checking it was just that I didn't notice it.

I suspect that it is only really used in combination with ready(), which is already indexed quite well. So as long as you don't have tens of thousands of tasks per queue, it might be unnecessary. On the other hand, adding this (either before or after, but I suspect after would be better for admin filtering, unless people rather filter by queue than state... 🤔) to the existing status index might be useful at a large scale. If you have some numbers in mind @RealOrangeOne, I'd be happy to test them.

@RealOrangeOne
Copy link
Owner Author

RealOrangeOne commented Oct 10, 2024

This is fantastic!

An index on status makes sense, since most of the time that's going to cut down the number of rows quite a lot. queue_name ought to help for similar reasons. run_after and priority might need indexing too, although I'm not sure what the behaviour would be like on looking through 2 indexes vs having 1 main "find the task to execute" index which is specially designed.

I don't think this necessarily needs to be right first time. Getting the more obvious changes in first, to get most of the benefit, then the indexes can be tuned over time. You're right that cleanup should mean the table doesn't get too big, but given it's polled, any query improvements will help.

@knyghty
Copy link
Contributor

knyghty commented Oct 13, 2024

I ran some more sxperiments.

I added the same amount of data agin but with a different queue name. So there are now double the number of objects, half in one queue, half in the other.

  • I could never get any index using run_after to work. The most likely reason is that the status part of the filter is more than enough already. If there are a lot of tasks in the new state, in the order of hundreds of K or more, maybe it would be useful to concatneate on to the status index.
  • I tried various forms of concatenated indexes. The only useful one was for the ordering.
  • A queue_name index seems to not be used, again because the status index is good enough. It still seems useful to help with admin filtering though, and I suspect in some setups this could be more useful than the status index, so it seems useful to keep in case the database chooses to use it, and for the admin.

So I would suggest either:

indexes = [
    models.Index(fields=["status"]),
    models.Index(fields=["queue_name"]),
    models.Index(F("priority").desc(), F("run_after").desc(nulls_last=True), name="idx_task_ordering"),
]

Or if we think it's likely to be used in some setups:

indexes = [
    models.Index(fields=["status", "run_after"]),
    models.Index(fields=["queue_name"]),
    models.Index(F("priority").desc(), F("run_after").desc(nulls_last=True), name="idx_task_ordering"),
]

I will prepare a PR with the first option I think, we can maybe discuss more once there's an implementation to look at.

@knyghty knyghty linked a pull request Oct 13, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
database-backend Issues relating to the database backend
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants