Skip to content

Commit

Permalink
Add ability to set Fx batch options
Browse files Browse the repository at this point in the history
  • Loading branch information
WardLT committed Aug 17, 2022
1 parent e5dbc04 commit 24a5f6d
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions colmena/task_server/funcx.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,19 @@ class FuncXTaskServer(FutureBasedTaskServer):
def __init__(self, methods: Dict[Callable, str],
funcx_client: FuncXClient,
queues: TaskServerQueues,
timeout: Optional[int] = None):
timeout: Optional[int] = None,
batch_enabled: bool = True,
batch_interval: float = 1.0,
batch_size: int = 100):
"""
Args:
methods: Map of functions to the endpoint on which it will run
funcx_client: Authenticated FuncX client
queues: Queues used to communicate with thinker
timeout: Timeout for requests from the task queue
batch_enabled: Whether to use FuncX's batch submission feature
batch_interval: Maximum time to wait between batch submissions
batch_size: Maximum number of task request to receive before submitting
"""
super(FuncXTaskServer, self).__init__(queues, timeout)

Expand All @@ -65,6 +71,11 @@ def __init__(self, methods: Dict[Callable, str],

# Placeholder for the executor and queue of tasks to be submitted back to the user
self.fx_exec: Optional[FuncXExecutor] = None
self._batch_options = dict(
batch_enabled=batch_enabled,
batch_size=batch_size,
batch_interval=batch_interval
)

def perform_callback(self, future: Future, result: Result, topic: str):
# Check if the failure was due to a ManagerLost
Expand All @@ -89,7 +100,7 @@ def _submit(self, task: Result, topic: str) -> Future:

def run(self) -> None:
# Creates a FuncX executor only once the thread has been launched
self.fx_exec = FuncXExecutor(self.fx_client)
self.fx_exec = FuncXExecutor(self.fx_client, **self._batch_options)
logger.info('Created a FuncX executor')

# Now start running tasks
Expand Down

0 comments on commit 24a5f6d

Please sign in to comment.