diff --git a/colmena/task_server/funcx.py b/colmena/task_server/funcx.py index 9f91dac..47d7484 100644 --- a/colmena/task_server/funcx.py +++ b/colmena/task_server/funcx.py @@ -39,13 +39,19 @@ class FuncXTaskServer(FutureBasedTaskServer): def __init__(self, methods: Dict[Callable, str], funcx_client: FuncXClient, queues: TaskServerQueues, - timeout: Optional[int] = None): + timeout: Optional[int] = None, + batch_enabled: bool = True, + batch_interval: float = 1.0, + batch_size: int = 100): """ Args: methods: Map of functions to the endpoint on which it will run funcx_client: Authenticated FuncX client queues: Queues used to communicate with thinker timeout: Timeout for requests from the task queue + batch_enabled: Whether to use FuncX's batch submission feature + batch_interval: Maximum time to wait between batch submissions + batch_size: Maximum number of task request to receive before submitting """ super(FuncXTaskServer, self).__init__(queues, timeout) @@ -63,8 +69,13 @@ def __init__(self, methods: Dict[Callable, str], # Store the FuncX information for the function self.registered_funcs[func_name] = (new_func, endpoint) - # Create the executor and queue of tasks to be submitted back to the user - self.fx_exec = FuncXExecutor(self.fx_client) + # Placeholder for the executor and queue of tasks to be submitted back to the user + self.fx_exec: Optional[FuncXExecutor] = None + self._batch_options = dict( + batch_enabled=batch_enabled, + batch_size=batch_size, + batch_interval=batch_interval + ) def perform_callback(self, future: Future, result: Result, topic: str): # Check if the failure was due to a ManagerLost @@ -87,5 +98,13 @@ def _submit(self, task: Result, topic: str) -> Future: logger.info(f'Submitted {task.method} to run on {endp_id}') return future + def run(self) -> None: + # Creates a FuncX executor only once the thread has been launched + self.fx_exec = FuncXExecutor(self.fx_client, **self._batch_options) + logger.info('Created a FuncX executor') + + # Now start running tasks + super().run() + def _cleanup(self): self.fx_exec.shutdown() diff --git a/colmena/task_server/tests/test_funcx.py b/colmena/task_server/tests/test_funcx.py index f7f0e9c..ad53640 100644 --- a/colmena/task_server/tests/test_funcx.py +++ b/colmena/task_server/tests/test_funcx.py @@ -23,6 +23,8 @@ def register_function(self, new_func, function_name: str, description: str, sear return uuid +# TODO (wardlt): This mocked version does not mimic the real versions' multi-threaded nature, which +# means it does not suffer the same problems when being copied over to a subprocess class FakeExecutor: """Faked FuncXExecutor that generates "futures" but does not communicate with FuncX""" diff --git a/setup.py b/setup.py index c49b512..77c0ec8 100644 --- a/setup.py +++ b/setup.py @@ -21,9 +21,9 @@ long_description=long_desc, long_description_content_type='text/markdown', install_requires=install_requires, - python_requires=">=3.6.*", + python_requires=">=3.7.*", extras_require={ - 'funcx': ['funcx>=0.3.4', 'globus_sdk<3'] # TODO (wardlt): Remove Globus SDK version pinning once FuncX supports v3 + 'funcx': ['funcx>=1'] }, classifiers=[ "Development Status :: 3 - Alpha",