Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using dask-yarn + EMR encountering InvalidStateError when doing cluster.close() #127

Open
amyzheng1995 opened this issue Oct 23, 2020 · 0 comments

Comments

@amyzheng1995
Copy link

Thank you for reporting an issue.

When reporting an issue, please include the following:

  • A description of the bug

    I created a EMR cluster using the provided bootstrap action. I then ssh-tunnel into the master node. I tried creating a dask cluster on local jupyter notebook. I was able to create a cluster, but when I try to run cluster.close(). I get InvalidStateError.

  • Steps to reproduce

from dask_yarn import YarnCluster

cluster = YarnCluster()
client = Client(cluster)

# Scale up to 3 workers
cluster.scale(3)

cluster.close()
  • Relevant logs/tracebacks
CancelledError                            Traceback (most recent call last)
/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in _watch_worker_status(self, comm)
    902                 try:
--> 903                     msgs = await comm.read()
    904                 except OSError:

/home/hadoop/miniconda/lib/python3.7/site-packages/distributed/comm/tcp.py in read(self, deserializers)
    185         try:
--> 186             n_frames = await stream.read_bytes(8)
    187             n_frames = struct.unpack("Q", n_frames)[0]

CancelledError: 

During handling of the above exception, another exception occurred:

InvalidStateError                         Traceback (most recent call last)
<ipython-input-8-f7593bfa8275> in <module>()
----> 1 cluster.close()

/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in close(self, **kwargs)
    740         shutdown
    741         """
--> 742         return self.shutdown(**kwargs)
    743 
    744     def __del__(self):

/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in shutdown(self, status, diagnostics)
    727             return self._stop_internal(status=status, diagnostics=diagnostics)
    728         if self.loop.asyncio_loop.is_running() and not sys.is_finalizing():
--> 729             self._sync(self._stop_internal(status=status, diagnostics=diagnostics))
    730         else:
    731             # Always run this!

/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in _sync(self, task)
    699         future = asyncio.run_coroutine_threadsafe(task, self.loop.asyncio_loop)
    700         try:
--> 701             return future.result()
    702         except BaseException:
    703             future.cancel()

/home/hadoop/miniconda/lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
    433                 raise CancelledError()
    434             elif self._state == FINISHED:
--> 435                 return self.__get_result()
    436             else:
    437                 raise TimeoutError()

/home/hadoop/miniconda/lib/python3.7/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in _stop_internal(self, status, diagnostics)
    630                 self._stop_async(status=status, diagnostics=diagnostics)
    631             )
--> 632         await self._stop_task
    633 
    634     async def _stop_async(self, status="SUCCEEDED", diagnostics=None):

/home/hadoop/miniconda/lib/python3.7/site-packages/IPython/core/interactiveshell.py in run_code(self, code_obj, result)
   2876                 self.hooks.pre_run_code_hook()
   2877                 #rprint('Running code', repr(code_obj)) # dbg
-> 2878                 exec(code_obj, self.user_global_ns, self.user_ns)
   2879             finally:
   2880                 # Reset our crash handler in place

<ipython-input-5-f7593bfa8275> in <module>()
----> 1 cluster.close()

/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in close(self, **kwargs)
    740         shutdown
    741         """
--> 742         return self.shutdown(**kwargs)
    743 
    744     def __del__(self):

/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in shutdown(self, status, diagnostics)
    727             return self._stop_internal(status=status, diagnostics=diagnostics)
    728         if self.loop.asyncio_loop.is_running() and not sys.is_finalizing():
--> 729             self._sync(self._stop_internal(status=status, diagnostics=diagnostics))
    730         else:
    731             # Always run this!

/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in _sync(self, task)
    699         future = asyncio.run_coroutine_threadsafe(task, self.loop.asyncio_loop)
    700         try:
--> 701             return future.result()
    702         except BaseException:
    703             future.cancel()

/home/hadoop/miniconda/lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
    433                 raise CancelledError()
    434             elif self._state == FINISHED:
--> 435                 return self.__get_result()
    436             else:
    437                 raise TimeoutError()

/home/hadoop/miniconda/lib/python3.7/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in _stop_internal(self, status, diagnostics)
    630                 self._stop_async(status=status, diagnostics=diagnostics)
    631             )
--> 632         await self._stop_task
    633 
    634     async def _stop_async(self, status="SUCCEEDED", diagnostics=None):

/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in _stop_async(self, status, diagnostics)
    643 
    644         if self._watch_worker_status_task is not None:
--> 645             await cancel_task(self._watch_worker_status_task)
    646             self._watch_worker_status_task = None
    647 

/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in cancel_task(task)
     63     task.cancel()
     64     try:
---> 65         await task
     66     except asyncio.CancelledError:
     67         pass

/home/hadoop/miniconda/lib/python3.7/site-packages/dask_yarn/core.py in _watch_worker_status(self, comm)
    922                     self = None
    923         finally:
--> 924             await comm.close()
    925 
    926     def _widget_status(self):

/home/hadoop/miniconda/lib/python3.7/site-packages/tornado/gen.py in wrapper(*args, **kwargs)
    324                 try:
    325                     orig_stack_contexts = stack_context._state.contexts
--> 326                     yielded = next(result)
    327                     if stack_context._state.contexts is not orig_stack_contexts:
    328                         yielded = _create_future()

/home/hadoop/miniconda/lib/python3.7/site-packages/distributed/comm/tcp.py in close(self)
    279             finally:
    280                 self._finalizer.detach()
--> 281                 stream.close()
    282 
    283     def abort(self):

/home/hadoop/miniconda/lib/python3.7/site-packages/tornado/iostream.py in close(self, exc_info)
    634             self.close_fd()
    635             self._closed = True
--> 636         self._maybe_run_close_callback()
    637 
    638     def _maybe_run_close_callback(self):

/home/hadoop/miniconda/lib/python3.7/site-packages/tornado/iostream.py in _maybe_run_close_callback(self)
    653                 self._ssl_connect_future = None
    654             for future in futures:
--> 655                 future.set_exception(StreamClosedError(real_error=self.error))
    656                 future.exception()
    657             if self._close_callback is not None:

InvalidStateError: invalid state```

- **Version information**
    - Python version 3.7.7
    - Dask-Yarn version 0.8.1
    https://github.com/dask/dask-yarn/blob/master/deployment_resources/aws-emr/bootstrap-dask
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant