diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py index 71314b626e..682aff5085 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py @@ -77,7 +77,7 @@ def __init__(self, dsk, low_memory_mode=False): self._pending_parents_of = defaultdict(lambda: set()) # key->depth. The shallowest level the key is found - self._depth_of = defaultdict(lambda: float('inf')) + self._depth_of = defaultdict(lambda: float("inf")) # target keys that the dag should compute self._targets = set() @@ -115,7 +115,10 @@ def find_dependencies(self, sexpr, depth=0): def set_relations(self, key, sexpr): sexpr = self._working_graph[key] + self._children_of[key] = self.find_dependencies(sexpr) + self._depth_of[key] = max([self._depth_of[c] for c in self._children_of[key]]) + 1 if self._children_of[key] else 0 + self._missing_of[key] = set(self._children_of[key]) for c in self._children_of[key]: diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py index 86caf83dec..89157509d0 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py @@ -14,10 +14,13 @@ from .dask_dag import DaskVineDag from .cvine import VINE_TEMP +import os +import time +import random import contextlib import cloudpickle -import os from uuid import uuid4 +from collections import defaultdict try: import rich @@ -123,6 +126,7 @@ def get(self, dsk, keys, *, lib_command=None, lib_modules=None, task_mode='tasks', + scheduling_mode='FIFO', env_per_task=False, progress_disable=False, progress_label="[green]tasks", @@ -164,12 +168,16 @@ def get(self, dsk, keys, *, else: self.lib_modules = hoisting_modules if hoisting_modules else import_modules # Deprecated self.task_mode = task_mode + self.scheduling_mode = scheduling_mode self.env_per_task = env_per_task self.progress_disable = progress_disable self.progress_label = progress_label self.wrapper = wrapper self.wrapper_proc = wrapper_proc self.prune_files = prune_files + self.category_execution_time = defaultdict(list) + self.max_priority = float('inf') + self.min_priority = float('-inf') if submit_per_cycle is not None and submit_per_cycle < 1: submit_per_cycle = None @@ -273,6 +281,7 @@ def _dask_execute(self, dsk, keys): print(f"{t.key} ran on {t.hostname}") if t.successful(): + self.category_execution_time[t.category].append(t.execution_time) result_file = DaskVineFile(t.output_file, t.key, dag, self.task_mode) rs = dag.set_result(t.key, result_file) self._enqueue_dask_calls(dag, tag, rs, self.retries, enqueued_calls) @@ -334,7 +343,36 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): if lazy and self.checkpoint_fn: lazy = self.checkpoint_fn(dag, k) + # each task has a category name cat = self.category_name(sexpr) + + task_depth = dag.depth_of(k) + if self.scheduling_mode == 'random': + priority = random.randint(self.min_priority, self.max_priority) + elif self.scheduling_mode == 'depth-first': + # dig more information about different kinds of tasks + priority = task_depth + elif self.scheduling_mode == 'breadth-first': + # prefer to start all branches as soon as possible + priority = -task_depth + elif self.scheduling_mode == 'longest-first': + # if no tasks have been executed in this category, set a high priority so that we know more information about each category + priority = sum(self.category_execution_time[cat]) / len(self.category_execution_time[cat]) if len(self.category_execution_time[cat]) else self.max_priority + elif self.scheduling_mode == 'shortest-first': + # if no tasks have been executed in this category, set a high priority so that we know more information about each category + priority = -sum(self.category_execution_time[cat]) / len(self.category_execution_time[cat]) if len(self.category_execution_time[cat]) else self.max_priority + elif self.scheduling_mode == 'FIFO': + # first in first out, the default behavior + priority = -round(time.time(), 6) + elif self.scheduling_mode == 'LIFO': + # last in first out, the opposite of FIFO + priority = round(time.time(), 6) + elif self.scheduling_mode == 'largest-input-first': + # best for saving disk space (with pruing) + priority = sum([len(dag.get_result(c)._file) for c in dag.get_children(k)]) + else: + raise ValueError(f"Unknown scheduling mode {self.scheduling_mode}") + if self.task_mode == 'tasks': if cat not in self._categories_known: if self.resources: @@ -356,6 +394,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): worker_transfers=lazy, wrapper=self.wrapper) + t.set_priority(priority) if self.env_per_task: t.set_command( f"mkdir envdir && tar -xf {self._environment_name} -C envdir && envdir/bin/run_in_env {t._command}") @@ -373,6 +412,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): worker_transfers=lazy, wrapper=self.wrapper) + t.set_priority(priority) t.set_tag(tag) # tag that identifies this dag enqueued_calls.append(t) @@ -631,6 +671,7 @@ def __init__(self, m, self.set_category(category) if worker_transfers: self.enable_temp_output() + if extra_files: for f, name in extra_files.items(): self.add_input(f, name) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/task.py b/taskvine/src/bindings/python3/ndcctools/taskvine/task.py index c043d62626..e8ee2cafcf 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/task.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/task.py @@ -829,6 +829,13 @@ def resources_allocated(self): return None return cvine.vine_task_get_resources(self._task, "allocated") + ## + # Get the execution time of the task in seconds. + # + @property + def execution_time(self): + return cvine.vine_task_get_execution_time(self._task) / 1e6 + ## # Adds inputs for nopen library and rules file and sets LD_PRELOAD # diff --git a/taskvine/src/manager/taskvine.h b/taskvine/src/manager/taskvine.h index 2f9c5c683e..3b2483b100 100644 --- a/taskvine/src/manager/taskvine.h +++ b/taskvine/src/manager/taskvine.h @@ -455,6 +455,13 @@ regardless of the priority. void vine_task_set_priority(struct vine_task *t, double priority); +/** Get the actual execution time of the task. +execution_time = t->time_workers_execute_last_end - t->time_workers_execute_last_start. +@param t A task object. +@return The actual execution time of the task in seconds. +*/ +double vine_task_get_execution_time(struct vine_task *t); + /** Specify an environment variable to be added to the task. @param t A task object @param name Name of the variable. diff --git a/taskvine/src/manager/vine_task.c b/taskvine/src/manager/vine_task.c index 147f8170d3..0ea78d8963 100644 --- a/taskvine/src/manager/vine_task.c +++ b/taskvine/src/manager/vine_task.c @@ -677,6 +677,11 @@ void vine_task_set_priority(struct vine_task *t, double priority) t->priority = priority; } +double vine_task_get_execution_time(struct vine_task *t) +{ + return t->time_workers_execute_last_end - t->time_workers_execute_last_start; +} + int vine_task_set_monitor_output(struct vine_task *t, const char *monitor_output_directory) {