Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prefect / Dask interaction tracking issue #208

Closed
sharkinsspatial opened this issue Sep 21, 2021 · 2 comments
Closed

Prefect / Dask interaction tracking issue #208

sharkinsspatial opened this issue Sep 21, 2021 · 2 comments

Comments

@sharkinsspatial
Copy link
Contributor

These ongoing issues have been discussed in several disparate threads pangeo-forge/pangeo-forge-azure-bakery#10 and pangeo-forge/gpm-imerge-hhr-feedstock#1 as well as others and several ongoing threads on the Prefect Slack channels but I am opening this issue to centralize tracking of these problems and improve visibility for the pangeo-forge team. To summarize, there appears to be an underlying issue with the Prefect DaskExecutor https://github.com/PrefectHQ/prefect/blob/master/src/prefect/executors/dask.py which causes Prefect tasks to consume a large amount of memory after serialization and submission causing Dask scheduler OOM issues. To provide a basic example

from prefect import Flow, task
from time import sleep

    @task
    def map_fn(x):
        sleep(0.25)
    task_range = list(range(1, 400000))
    with Flow('map_testing') as flow:
        map_fn.map(task_range)

Screen Shot 2021-09-17 at 3 48 10 PM
Results in the scheduler pod (which is configured with 10GB of memory) being killed almost instantly with an OOM error. Additionally we see worker unmanaged memory growing rapidly. As shown in the image above, unmanaged memory has inexplicably grown to 11GB after only 311 tasks have been processed.

In an attempt to verify that this is a Prefect specific issue and not an underlying issue with rapid Dask task submission, I ran a small experiment to approximate the task submission behavior used by Prefect's DaskExecutor.

from dask_kubernetes import KubeCluster, make_pod_spec
from distributed import Client
from time import sleep
import weakref

futures = weakref.WeakSet()

pod_spec = make_pod_spec(
    image=worker_image,
    memory_limit='4G',
    memory_request='4G',
    cpu_limit=1,
    cpu_request=1,
    env={}
)
cluster = KubeCluster(pod_spec)
cluster.adapt(minimum=4, maximum=10)

client = Client(cluster)


def map_fn(x):
    sleep(0.25)


value_range = list(range(1, 400000))

for value in value_range:
    future = client.submit(map_fn, value)
    futures.add(future)

client.gather(futures)

I did not encounter the memory spikes demonstrated in the equivalent Prefect operation using the DaskExecutor. This operation completed in approx 60s on 4 workers with no memory pressure.

Currently, @kvnkho from the Prefect team is investigating this and has reproduced these issues using their own cluster infrastructure. I will try to post updates here as we get more information around this.

@sharkinsspatial
Copy link
Contributor Author

@cisaacstern @rabernat I got a message on Slack from @kvnkho that he is hoping to have a PR addressing prepped later this week.

@cisaacstern
Copy link
Member

We no longer use Prefect, so closing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants