Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster and Client creation error when using EMR #124

Open
nmerket opened this issue Aug 5, 2020 · 4 comments
Open

Cluster and Client creation error when using EMR #124

nmerket opened this issue Aug 5, 2020 · 4 comments

Comments

@nmerket
Copy link

nmerket commented Aug 5, 2020

What happened:

Running dask-yarn on EMR causes a repeating error in tornado on client creation.

What you expected to happen:

No error, just the client being created and being able to use it.

Minimal Complete Verifiable Example:

I am using dask-yarn on EMR. I followed the directions outlined here and used the unaltered bootstrap-dask script from this repo. I used the emr-5.29.0 release to avoid the other bootstrap issue #122. Connect to the master node and open jupyter notebook. Start with a new notebook.

from dask_yarn import YarnCluster
from dask.distributed import Client

# Create a cluster
cluster = YarnCluster()

# Connect to the cluster
client = Client(cluster)
client

I then get this error every few seconds reported back to the notebook:

distributed.scheduler - INFO - Receive client connection: Client-d994493a-d748-11ea-afbf-025317f23bc7
distributed.core - INFO - Starting established connection
Exception in callback with_timeout.<locals>.error_callback(<Future cancelled>) at /home/hadoop/miniconda/lib/python3.8/site-packages/tornado/gen.py:968
handle: <Handle with_timeout.<locals>.error_callback(<Future cancelled>) at /home/hadoop/miniconda/lib/python3.8/site-packages/tornado/gen.py:968>
Traceback (most recent call last):
  File "/home/hadoop/miniconda/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "/home/hadoop/miniconda/lib/python3.8/site-packages/tornado/gen.py", line 970, in error_callback
    future.result()
asyncio.exceptions.CancelledError

Anything else we need to know?:

Environment:

@aimran-adroll
Copy link

I had to downgrade to python3.7 and then it started working again

here is the bit from bootstap.sh
curl https://repo.anaconda.com/miniconda/Miniconda3-py37_4.8.3-Linux-x86_64.sh -o /tmp/miniconda.sh

Notice I pinned to 3.7.x

But it'd be good to get to the bottom of it.

@timothymugayi
Copy link

I seem to be having the same issue running on python 3.8

Python 3.8.3 (default, May 19 2020, 18:47:26)
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from dask_yarn import YarnCluster
>>> cluster = YarnCluster(environment="/home/hadoop/environment.tar.gz")
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:   tcp://10.0.10.191:46623
20/10/21 02:46:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/10/21 02:46:41 INFO client.RMProxy: Connecting to ResourceManager at ip-10-0-10-191.ec2.internal/10.0.10.191:8032
20/10/21 02:46:41 INFO client.AHSProxy: Connecting to Application History server at ip-10-0-10-191.ec2.internal/10.0.10.191:10200
20/10/21 02:46:42 INFO skein.Driver: Driver started, listening on 40913
20/10/21 02:46:42 INFO conf.Configuration: resource-types.xml not found
20/10/21 02:46:42 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
20/10/21 02:46:42 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
20/10/21 02:46:42 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
20/10/21 02:46:42 INFO skein.Driver: Uploading application resources to hdfs://ip-10-0-10-191.ec2.internal:8020/user/hadoop/.skein/application_1603177429164_0002
20/10/21 02:46:43 INFO skein.Driver: Submitting application...
20/10/21 02:46:43 INFO impl.YarnClientImpl: Submitted application application_1603177429164_0002
>>>
>>>
>>> cluster.scale(1)
>>>
>>> distributed.scheduler - INFO - Register worker <Worker 'tcp://10.0.10.161:38381', name: dask.worker_0, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.10.161:38381
distributed.core - INFO - Starting established connection
Exception in callback with_timeout.<locals>.error_callback(<Future cancelled>) at /home/hadoop/miniconda/lib/python3.8/site-packages/tornado/gen.py:968
handle: <Handle with_timeout.<locals>.error_callback(<Future cancelled>) at /home/hadoop/miniconda/lib/python3.8/site-packages/tornado/gen.py:968>
Traceback (most recent call last):
  File "/home/hadoop/miniconda/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "/home/hadoop/miniconda/lib/python3.8/site-packages/tornado/gen.py", line 970, in error_callback
    future.result()
asyncio.exceptions.CancelledError

@timothymugayi
Copy link

timothymugayi commented Oct 21, 2020

Downgraded to python 3.7.9 and all seems well

Steps taken

  • Updated yarn cluster bootstrap script for EMI swapped out Upstart for systemd as EMR ec2 instance removed Upstart cant use that to launch the jupyter notebooks hence that part of the script has to change
  • Changed python version to 3.7.9 you can do this by changing miniconda download link
Python 3.7.9 (default, Aug 31 2020, 12:42:55)
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
>>>
>>>
>>> from dask_yarn import YarnCluster
>>>
>>> cluster = YarnCluster(environment="/home/hadoop/environment.tar.gz")
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:   tcp://10.0.10.191:35369
distributed.scheduler - INFO -   dashboard at:                    :41787
20/10/21 05:09:26 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/10/21 05:09:26 INFO client.RMProxy: Connecting to ResourceManager at ip-10-0-10-191.ec2.internal/10.0.10.191:8032
20/10/21 05:09:26 INFO client.AHSProxy: Connecting to Application History server at ip-10-0-10-191.ec2.internal/10.0.10.191:10200
20/10/21 05:09:27 INFO skein.Driver: Driver started, listening on 41837
20/10/21 05:09:27 INFO conf.Configuration: resource-types.xml not found
20/10/21 05:09:27 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
20/10/21 05:09:27 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
20/10/21 05:09:27 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
20/10/21 05:09:27 INFO skein.Driver: Uploading application resources to hdfs://ip-10-0-10-191.ec2.internal:8020/user/hadoop/.skein/application_1603177429164_0003
20/10/21 05:09:28 INFO skein.Driver: Submitting application...
20/10/21 05:09:28 INFO impl.YarnClientImpl: Submitted application application_1603177429164_0003
>>>
>>>
>>> cluster.scale(1)
>>> distributed.scheduler - INFO - Register worker <Worker 'tcp://10.0.10.161:40657', name: dask.worker_0, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.10.161:40657
distributed.core - INFO - Starting established connection


>>>
>>>
>>>
>>> cluster.scale(3)
>>> distributed.scheduler - INFO - Register worker <Worker 'tcp://10.0.10.161:44253', name: dask.worker_1, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.10.161:44253
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <Worker 'tcp://10.0.10.161:37565', name: dask.worker_2, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.10.161:37565
distributed.core - INFO - Starting established connection

>>>
>>>
>>> from dask.distributed import Client
>>>
>>> import dask.array as da
>>> client = Client(cluster)
distributed.scheduler - INFO - Receive client connection: Client-a0c24102-135d-11eb-a1a6-0a89473033af
distributed.core - INFO - Starting established connection
>>>
>>>
>>> array = da.ones((10000, 10000, 10000))
>>> print(array.mean().compute())
distributed.core - INFO - Event loop was unresponsive in Scheduler for 8.97s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

1.0
>>>
>>> len(client.scheduler_info()['workers'])
3
>>>

@quasiben
Copy link
Member

Can you try again with latest dask/distributed ? should be 2.30.0. I don't think we want to force users to python 3.7

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants