Skip to content

Commit

Permalink
Merge pull request #66 from exalearn/funcx_v1
Browse files Browse the repository at this point in the history
Update FuncX version to v1
  • Loading branch information
WardLT authored Aug 17, 2022
2 parents 839b826 + 24a5f6d commit faf9cb7
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 5 deletions.
25 changes: 22 additions & 3 deletions colmena/task_server/funcx.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,19 @@ class FuncXTaskServer(FutureBasedTaskServer):
def __init__(self, methods: Dict[Callable, str],
funcx_client: FuncXClient,
queues: TaskServerQueues,
timeout: Optional[int] = None):
timeout: Optional[int] = None,
batch_enabled: bool = True,
batch_interval: float = 1.0,
batch_size: int = 100):
"""
Args:
methods: Map of functions to the endpoint on which it will run
funcx_client: Authenticated FuncX client
queues: Queues used to communicate with thinker
timeout: Timeout for requests from the task queue
batch_enabled: Whether to use FuncX's batch submission feature
batch_interval: Maximum time to wait between batch submissions
batch_size: Maximum number of task request to receive before submitting
"""
super(FuncXTaskServer, self).__init__(queues, timeout)

Expand All @@ -63,8 +69,13 @@ def __init__(self, methods: Dict[Callable, str],
# Store the FuncX information for the function
self.registered_funcs[func_name] = (new_func, endpoint)

# Create the executor and queue of tasks to be submitted back to the user
self.fx_exec = FuncXExecutor(self.fx_client)
# Placeholder for the executor and queue of tasks to be submitted back to the user
self.fx_exec: Optional[FuncXExecutor] = None
self._batch_options = dict(
batch_enabled=batch_enabled,
batch_size=batch_size,
batch_interval=batch_interval
)

def perform_callback(self, future: Future, result: Result, topic: str):
# Check if the failure was due to a ManagerLost
Expand All @@ -87,5 +98,13 @@ def _submit(self, task: Result, topic: str) -> Future:
logger.info(f'Submitted {task.method} to run on {endp_id}')
return future

def run(self) -> None:
# Creates a FuncX executor only once the thread has been launched
self.fx_exec = FuncXExecutor(self.fx_client, **self._batch_options)
logger.info('Created a FuncX executor')

# Now start running tasks
super().run()

def _cleanup(self):
self.fx_exec.shutdown()
2 changes: 2 additions & 0 deletions colmena/task_server/tests/test_funcx.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ def register_function(self, new_func, function_name: str, description: str, sear
return uuid


# TODO (wardlt): This mocked version does not mimic the real versions' multi-threaded nature, which
# means it does not suffer the same problems when being copied over to a subprocess
class FakeExecutor:
"""Faked FuncXExecutor that generates "futures" but does not communicate with FuncX"""

Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
long_description=long_desc,
long_description_content_type='text/markdown',
install_requires=install_requires,
python_requires=">=3.6.*",
python_requires=">=3.7.*",
extras_require={
'funcx': ['funcx>=0.3.4', 'globus_sdk<3'] # TODO (wardlt): Remove Globus SDK version pinning once FuncX supports v3
'funcx': ['funcx>=1']
},
classifiers=[
"Development Status :: 3 - Alpha",
Expand Down

0 comments on commit faf9cb7

Please sign in to comment.