Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vine: Improve how errors are propagated with FuturesExecutor #3922

Merged
merged 6 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 87 additions & 76 deletions taskvine/src/bindings/python3/ndcctools/taskvine/futures.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

from . import cvine
import hashlib
from collections import deque
from concurrent.futures import Executor
from concurrent.futures import Future
from concurrent.futures import FIRST_COMPLETED
Expand All @@ -14,7 +15,6 @@
from .task import (
PythonTask,
FunctionCall,
PythonTaskNoResult,
FunctionCallNoResult,
)
from .manager import (
Expand Down Expand Up @@ -53,44 +53,44 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
if not f._is_submitted:
f.module_manager.submit(f._task)

time_init = time.time()
if timeout is None:
time_check = float('inf')
else:
time_check = timeout
start = time.perf_counter()
time_check = float('inf') if timeout is None else timeout
result_timeout = min(timeout, 5) if timeout is not None else 5

done = False
while time.time() - time_init < time_check and not done:
done = True
while time.perf_counter() - start < time_check and not done:
for f in fs:

# skip if future is complete
if f in results.done:
continue

# check for completion
result = f.result(timeout=5)

# add to set of finished tasks and break when needed.
if result != RESULT_PENDING:
try:
# check for completion
f.result(timeout=result_timeout)
except TimeoutError:
# TimeoutError's are expected since we are polling the
# future's status. If a timeout happens, we do nothing.
pass
except Exception:
# Future.result() raises the task's exception but that is
# not relevant here---just if the task has finished.
results.done.add(f)
if return_when in (FIRST_EXCEPTION, FIRST_COMPLETED):
done = True
break
else:
results.done.add(f)

# if this is the first completed task, break.
if return_when == FIRST_COMPLETED:
done = True
break

if isinstance(result, Exception) and return_when == FIRST_EXCEPTION:
if len(results.done) == len(fs):
done = True
break

# set done to false to finish loop.
else:
done = False

# check form timeout
if timeout is not None:
if time.time() - time_init > timeout:
break
if time.perf_counter() - start > time_check:
break

# add incomplete futures to set
for f in fs:
Expand All @@ -101,48 +101,45 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):


def as_completed(fs, timeout=None):
fs = deque(fs)

results = set()

# submit tasks if they have not been subitted
# submit tasks if they have not been submitted
for f in fs:
if not f._is_submitted:
f.module_manager.submit(f._task)

time_init = time.time()
if timeout is None:
time_check = float('inf')
else:
time_check = timeout

done = False
while time.time() - time_init < time_check and not done:
for f in fs:
done = True
# skip if future is complete
if f in results:
continue

# check for completion
result = f.result(timeout=5)

# add to set of finished tasks
if result != RESULT_PENDING:
results.add(f)

# set done to false to finish loop.
start = time.perf_counter()
result_timeout = min(timeout, 5) if timeout is not None else 5

def _iterator():
# iterate of queue of futures, yeilding completed futures and
# requeuing non-completed futures until all futures are yielded or
# the timeout is reached.
while fs:
f = fs.popleft()

try:
result = f.result(timeout=result_timeout)
except TimeoutError:
# TimeoutError's are expected since we are polling the
# future's status. If a timeout happens, add the future to
# the back of the queue.
fs.append(f)
except Exception:
# Future.result() raises the task's exception but that is
# not relevant here---just if the task has finished.
yield f
else:
done = False
assert result != RESULT_PENDING
yield f

# check form timeout
if timeout is not None:
if time.time() - time_init > timeout:
break
for f in fs:
if f not in results:
results.add(TimeoutError)
if (
fs and timeout is not None
and time.perf_counter() - start > timeout
):
raise TimeoutError()

return iter(results)
return _iterator()


##
Expand Down Expand Up @@ -233,6 +230,7 @@ def __init__(self, task):
self._task = task
self._callback_fns = []
self._result = None
self._exception = None
self._is_submitted = False
self._ran_functions = False

Expand Down Expand Up @@ -266,15 +264,30 @@ def result(self, timeout="wait_forever"):
timeout = "wait_forever"
result = self._task.output(timeout=timeout)
if result == RESULT_PENDING:
return RESULT_PENDING
raise TimeoutError()

if isinstance(result, Exception):
self._exception = result
else:
self._result = result
self._state = FINISHED
if self._callback_fns and not self._ran_functions:
self._ran_functions = True
for fn in self._callback_fns:
fn(self)
return result

self._state = FINISHED
if self._callback_fns and not self._ran_functions:
self._ran_functions = True
for fn in self._callback_fns:
fn(self)

if isinstance(result, Exception):
raise result
return result

def exception(self, timeout="wait_forever"):
try:
self.result()
except Exception as e:
return e
else:
return None

def add_done_callback(self, fn):
self._callback_fns.append(fn)
Expand Down Expand Up @@ -318,11 +331,10 @@ def output(self, timeout="wait_forever"):
if output['Success']:
self._saved_output = output['Result']
else:
self._saved_output = output['Reason']
self._saved_output = FunctionCallNoResult(output['Reason'])

except Exception as e:
self._saved_output = e
raise e
else:
self._saved_output = FunctionCallNoResult()
self._output_loaded = True
Expand Down Expand Up @@ -388,15 +400,14 @@ def output(self, timeout="wait_forever"):

# fetch output file and load output
if not self._output_loaded and self._has_retrieved:
if self.successful():
try:
self._module_manager.fetch_file(self._output_file)
self._output = cloudpickle.loads(self._output_file.contents())
except Exception as e:
self._output = e
raise e
else:
self._output = PythonTaskNoResult()
try:
self._module_manager.fetch_file(self._output_file)
# _output can be either the return value of a successful
# task or the exception object of a failed task.
self._output = cloudpickle.loads(self._output_file.contents())
except Exception as e:
# handle output file fetch/deserialization failures
self._output = e
self._output_loaded = True

return self._output
Expand Down
21 changes: 20 additions & 1 deletion taskvine/test/vine_python_future_funcall.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ def my_sum(x, y, negate=False):
return s


def failure():
raise Exception('Expected failure.')


def main():
executor = vine.FuturesExecutor(
port=[9123, 9129], manager_name="test-executor", factory=False
Expand All @@ -33,7 +37,7 @@ def main():
# Create library task
print("creating library from functions...")
libtask = executor.create_library_from_functions(
"test-library", my_sum, hoisting_modules=None, add_env=False
"test-library", my_sum, failure, hoisting_modules=None, add_env=False
)

# Install library on executor.manager
Expand All @@ -59,6 +63,21 @@ def main():
c = a + b
assert res == c

# Test failure handling
f1 = executor.future_funcall("test-library", "failure")
future = executor.submit(f1)

try:
future.result()
except Exception as e:
# FutureFunctionCall wraps the exception string in a
# FunctionCallNoResult so we check the original error message is
# present in the raised error.
assert "Expected failure." in str(e)
assert future.exception() is not None
else:
raise RuntimeError("Future did not raise exception.")


if __name__ == "__main__":
main()
Expand Down
83 changes: 82 additions & 1 deletion taskvine/test/vine_python_future_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import sys
import ndcctools.taskvine as vine
from concurrent.futures import FIRST_COMPLETED
from concurrent.futures import FIRST_EXCEPTION
from concurrent.futures import ALL_COMPLETED
from concurrent.futures import TimeoutError

port_file = None
try:
Expand All @@ -22,6 +26,15 @@ def my_sum(x, y, negate=False):
return s


def my_exception():
raise Exception("Expected failure.")


def my_timeout():
import time
time.sleep(60)


def main():
executor = vine.FuturesExecutor(
port=[9123, 9129], manager_name="vine_matrtix_build_test", factory=False
Expand Down Expand Up @@ -67,9 +80,77 @@ def main():
c = executor.submit(t3)

print("waiting for result...")
results = vine.futures.as_completed([a, b, c])
results = list(vine.futures.as_completed([a, b, c]))
print(f"results = {results}")

# Test timeouts
t1 = executor.future_task(my_timeout)
t1.set_cores(1)
future = executor.submit(t1)

try:
future.result(timeout=1)
except TimeoutError:
future.cancel()
print("timeout raised correctly")
else:
raise RuntimeError("TimeoutError was not raised correctly.")

# # Test error handling with wait
t1 = executor.future_task(my_sum, 7, 4)
t1.set_cores(1)
a = executor.submit(t1)

t2 = executor.future_task(my_timeout)
t2.set_cores(1)
b = executor.submit(t2)

results = vine.futures.wait([a, b], return_when=FIRST_COMPLETED)
assert len(results.done) == 1
assert len(results.not_done) == 1
assert results.done.pop().result() == 11

results = vine.futures.wait([a, b], timeout=2, return_when=ALL_COMPLETED)
assert len(results.done) == 1
assert len(results.not_done) == 1
assert results.done.pop().result() == 11

t3 = executor.future_task(my_exception)
t3.set_cores(1)
c = executor.submit(t3)

results = vine.futures.wait([b, c], return_when=FIRST_EXCEPTION)
assert len(results.done) == 1
assert len(results.not_done) == 1
assert results.done.pop().exception() is not None

# Cancel the task that is still sleeping
b.cancel()

# Test timeouts with as_completed
t1 = executor.future_task(my_sum, 7, 4)
t1.set_cores(1)
a = executor.submit(t1)

t2 = executor.future_task(my_timeout)
t2.set_cores(1)
b = executor.submit(t2)

iterator = vine.futures.as_completed([a, b], timeout=5)

# task 1 should complete correctly within the timeout and be yielded first
a_future = next(iterator)
assert a_future.result() == 11

try:
next(iterator)
except TimeoutError:
b.cancel()
print("as_completed raised timeout correctly")
else:
raise RuntimeError("TimeoutError was not raised correctly.")


if __name__ == "__main__":
main()

Expand Down
Loading
Loading