Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slow task submission from Prefect DaskExecutor prevents cluster scaling. #1

Open
sharkinsspatial opened this issue Sep 17, 2021 · 8 comments

Comments

@sharkinsspatial
Copy link
Contributor

We are experiencing extremely slow task submission via the DaskExecutor for very large mapped tasks. With previous flow tests where a task was mapped over roughly 20K items, task submission was sufficiently fast that our Dask cluster scaled workers up to the worker limit. But with a task mapped over 400K items, the DaskExecutor task submission to the scheduler appears rate limited and there are never sufficient tasks on the scheduler it to create more workers and scale so we are stuck with the cluster crawling along with the minimum number of workers.
Screen Shot 2021-09-17 at 11 13 47 AM

And note the relatively small number of task which the scheduler has received. Normally the number of cache_inputs tasks should be growing very rapidly and the workers should be saturated forcing the cluster to scale but as you can see in the dashboard image below, the task submission to the scheduler is slow for some reason.
Screen Shot 2021-09-17 at 11 12 23 AM

Prefect Slack discussion is https://prefect-community.slack.com/archives/C01TK7XSLT0/p1631896033088800 @rabernat This might be another topic of technical discussion for when you meet with the Prefect team.

@rabernat
Copy link

Can you reproduce this without Pangeo Forge? Like, if you just create a really simple flow with 400k mapped tasked, do you see the same behavior?

@sharkinsspatial
Copy link
Contributor Author

☝️ @rabernat I will run a test to verify.

@rabernat
Copy link

Can you also explain why there are so many tasks? Is that basically just how many files there are?

@sharkinsspatial
Copy link
Contributor Author

Correct. That is the total number of input files.

@rabernat
Copy link

I wonder if we should be batching together some of these into larger tasks. That is possible at the store_chunk stage with inputs_per_chunk, but not at the cache_inputs stage.

@sharkinsspatial
Copy link
Contributor Author

@rabernat I tested with the following example flow

from prefect import Flow, task
from time import sleep

    @task
    def map_fn(x):
        sleep(0.25)
    task_range = list(range(1, 400000))
    with Flow('map_testing') as flow:
        map_fn.map(task_range)

And we did not see the same slow task submission issue

Screen Shot 2021-09-17 at 3 48 10 PM

However this revealed another, deeper issue as my scheduler pod (which is configured with 10GB of memory) was killed almost instantly with an OOM error. Additionally we see worker unmanaged memory growing rapidly. As shown in the image above, unmanaged memory has inexplicably grown to 11GB after only 311 tasks have been processed.

I'm flummoxed that this simple example map is not manageable with Prefect on instances with this resource capacity.

@sharkinsspatial
Copy link
Contributor Author

@rabernat I'm quickly putting together a pure Dask demonstration to try and narrow this down to a Prefect issue.

@sharkinsspatial
Copy link
Contributor Author

sharkinsspatial commented Sep 17, 2021

As a quick experiment to verify the issues we are experiencing are Prefect specific (as there have been some related Dask issue discussions previously dask/distributed#2757). I ran a small experiment to approximate the task submission behavior used by Prefect's DaskExecutor.

from dask_kubernetes import KubeCluster, make_pod_spec
from distributed import Client
from time import sleep
import weakref

futures = weakref.WeakSet()

pod_spec = make_pod_spec(
    image=worker_image,
    memory_limit='4G',
    memory_request='4G',
    cpu_limit=1,
    cpu_request=1,
    env={}
)
cluster = KubeCluster(pod_spec)
cluster.adapt(minimum=4, maximum=10)

client = Client(cluster)


def map_fn(x):
    sleep(0.25)


value_range = list(range(1, 400000))

for value in value_range:
    future = client.submit(map_fn, value)
    futures.add(future)

client.gather(futures)

I did not encounter task submission delays or the memory spikes demonstrated in the equivalent Prefect operation using the DaskExecutor. This operation completed in approx 60s on 4 workers with no memory pressure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants