From d162c2153d696744462afd940ef506ee72c13a9b Mon Sep 17 00:00:00 2001 From: JinZhou5042 Date: Wed, 21 Aug 2024 20:49:22 -0400 Subject: [PATCH 01/12] vine: daskvine priority scheduling --- .../python3/ndcctools/taskvine/dask_dag.py | 24 +++++++++++++++---- .../ndcctools/taskvine/dask_executor.py | 23 ++++++++++++++++-- 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py index d3e6c37f2b..79ed456664 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py @@ -77,7 +77,7 @@ def __init__(self, dsk, low_memory_mode=False): self._pending_parents_of = defaultdict(lambda: set()) # key->depth. The shallowest level the key is found - self._depth_of = defaultdict(lambda: float('inf')) + self._depth_of = defaultdict(lambda: 0) # target keys that the dag should compute self._targets = set() @@ -102,16 +102,32 @@ def depth_of(self, key): def initialize_graph(self): for key, sexpr in self._working_graph.items(): self.set_relations(key, sexpr) + for key, sexpr in self._working_graph.items(): + self.set_depth(key) - def find_dependencies(self, sexpr, depth=0): + def find_dependencies(self, sexpr): dependencies = set() if self.graph_keyp(sexpr): dependencies.add(sexpr) - self._depth_of[sexpr] = min(depth, self._depth_of[sexpr]) elif not DaskVineDag.symbolp(sexpr): for sub in sexpr: - dependencies.update(self.find_dependencies(sub, depth + 1)) + dependencies.update(self.find_dependencies(sub)) return dependencies + + def set_depth(self, key): + if key not in self._children_of or not self._children_of[key]: + self._depth_of[key] = 1 + return 1 + + max_children_depth = 0 + for child in self._children_of[key]: + if child not in self._depth_of: + child_depth = self.set_depth(child) + else: + child_depth = self._depth_of[child] + max_children_depth = max(max_children_depth, child_depth) + self._depth_of[key] = max_children_depth + 1 + return self._depth_of[key] def set_relations(self, key, sexpr): sexpr = self._working_graph[key] diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py index 8bb9c570d1..58df6bfe90 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py @@ -12,12 +12,14 @@ from .task import PythonTask from .task import FunctionCall from .dask_dag import DaskVineDag -from .cvine import VINE_TEMP +from .cvine import VINE_TEMP, vine_task_set_priority import contextlib import cloudpickle import os +import time from uuid import uuid4 +import random try: import rich @@ -123,6 +125,7 @@ def get(self, dsk, keys, *, lib_command=None, lib_modules=None, task_mode='tasks', + task_priority=None, env_per_task=False, progress_disable=False, progress_label="[green]tasks", @@ -160,6 +163,7 @@ def get(self, dsk, keys, *, else: self.lib_modules = hoisting_modules if hoisting_modules else import_modules # Deprecated self.task_mode = task_mode + self.task_priority = task_priority self.env_per_task = env_per_task self.progress_disable = progress_disable self.progress_label = progress_label @@ -329,7 +333,20 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): lazy = self.worker_transfers and k not in targets if lazy and self.checkpoint_fn: lazy = self.checkpoint_fn(dag, k) - + + task_depth = dag.depth_of(k) + if self.task_priority == 'random': + priority = round(random.uniform(-1, 1), 10) + elif self.task_priority == 'depth-first': + priority = task_depth + elif self.task_priority == 'breadth-first': + priority = -task_depth + elif self.task_priority == 'FIFO': + priority = -round(time.time(), 6) + elif self.task_priority == 'LIFO': + priority = round(time.time(), 6) + else: + priority = 0 cat = self.category_name(sexpr) if self.task_mode == 'tasks': if cat not in self._categories_known: @@ -352,6 +369,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): worker_transfers=lazy, wrapper=self.wrapper) + vine_task_set_priority(t._task, priority) if self.env_per_task: t.set_command( f"mkdir envdir && tar -xf {self._environment_name} -C envdir && envdir/bin/run_in_env {t._command}") @@ -369,6 +387,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): worker_transfers=lazy, wrapper=self.wrapper) + vine_task_set_priority(t._task, priority) t.set_tag(tag) # tag that identifies this dag enqueued_calls.append(t) From 19b39c9434bc4da41e894bca4dd41e7ebd0f2077 Mon Sep 17 00:00:00 2001 From: JinZhou5042 Date: Wed, 21 Aug 2024 20:50:34 -0400 Subject: [PATCH 02/12] change var name --- .../python3/ndcctools/taskvine/dask_executor.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py index 58df6bfe90..42eeedf1ef 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py @@ -125,7 +125,7 @@ def get(self, dsk, keys, *, lib_command=None, lib_modules=None, task_mode='tasks', - task_priority=None, + scheduling_mode=None, env_per_task=False, progress_disable=False, progress_label="[green]tasks", @@ -163,7 +163,7 @@ def get(self, dsk, keys, *, else: self.lib_modules = hoisting_modules if hoisting_modules else import_modules # Deprecated self.task_mode = task_mode - self.task_priority = task_priority + self.scheduling_mode = scheduling_mode self.env_per_task = env_per_task self.progress_disable = progress_disable self.progress_label = progress_label @@ -335,15 +335,15 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): lazy = self.checkpoint_fn(dag, k) task_depth = dag.depth_of(k) - if self.task_priority == 'random': + if self.scheduling_mode == 'random': priority = round(random.uniform(-1, 1), 10) - elif self.task_priority == 'depth-first': + elif self.scheduling_mode == 'depth-first': priority = task_depth - elif self.task_priority == 'breadth-first': + elif self.scheduling_mode == 'breadth-first': priority = -task_depth - elif self.task_priority == 'FIFO': + elif self.scheduling_mode == 'FIFO': priority = -round(time.time(), 6) - elif self.task_priority == 'LIFO': + elif self.scheduling_mode == 'LIFO': priority = round(time.time(), 6) else: priority = 0 From 6403a317de9e087283ebaded745b66536884d7f9 Mon Sep 17 00:00:00 2001 From: JinZhou5042 Date: Wed, 21 Aug 2024 20:52:04 -0400 Subject: [PATCH 03/12] lint --- taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py | 4 ++-- .../src/bindings/python3/ndcctools/taskvine/dask_executor.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py index 79ed456664..7836201b9b 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py @@ -113,12 +113,12 @@ def find_dependencies(self, sexpr): for sub in sexpr: dependencies.update(self.find_dependencies(sub)) return dependencies - + def set_depth(self, key): if key not in self._children_of or not self._children_of[key]: self._depth_of[key] = 1 return 1 - + max_children_depth = 0 for child in self._children_of[key]: if child not in self._depth_of: diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py index 42eeedf1ef..2c79b47ffb 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py @@ -333,7 +333,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): lazy = self.worker_transfers and k not in targets if lazy and self.checkpoint_fn: lazy = self.checkpoint_fn(dag, k) - + task_depth = dag.depth_of(k) if self.scheduling_mode == 'random': priority = round(random.uniform(-1, 1), 10) From 6ef5fef5d20bf7d4f2be00175b2a3afcc364a6c0 Mon Sep 17 00:00:00 2001 From: JinZhou5042 Date: Wed, 21 Aug 2024 21:52:21 -0400 Subject: [PATCH 04/12] longest task first scheduling --- .../ndcctools/taskvine/dask_executor.py | 39 ++++++++++++------- .../python3/ndcctools/taskvine/task.py | 7 ++++ taskvine/src/manager/taskvine.h | 7 ++++ taskvine/src/manager/vine_task.c | 5 +++ 4 files changed, 45 insertions(+), 13 deletions(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py index 2c79b47ffb..d9b21a4cd8 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py @@ -12,14 +12,15 @@ from .task import PythonTask from .task import FunctionCall from .dask_dag import DaskVineDag -from .cvine import VINE_TEMP, vine_task_set_priority +from .cvine import VINE_TEMP -import contextlib -import cloudpickle import os import time -from uuid import uuid4 import random +import contextlib +import cloudpickle +from uuid import uuid4 +from collections import defaultdict try: import rich @@ -170,6 +171,7 @@ def get(self, dsk, keys, *, self.wrapper = wrapper self.wrapper_proc = wrapper_proc self.prune_files = prune_files + self.category_execution_time = defaultdict(list) if submit_per_cycle is not None and submit_per_cycle < 1: submit_per_cycle = None @@ -273,6 +275,7 @@ def _dask_execute(self, dsk, keys): print(f"{t.key} ran on {t.hostname}") if t.successful(): + self.category_execution_time[t.category].append(t.execution_time) result_file = DaskVineFile(t.output_file, t.key, dag, self.task_mode) rs = dag.set_result(t.key, result_file) self._enqueue_dask_calls(dag, tag, rs, self.retries, enqueued_calls) @@ -334,6 +337,12 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): if lazy and self.checkpoint_fn: lazy = self.checkpoint_fn(dag, k) + # each task has a category name + category = self.category_name(sexpr) + category_avg_execution_time = 0 + if len(self.category_execution_time[category]): + category_avg_execution_time = sum(self.category_execution_time[category]) / len(self.category_execution_time[category]) + task_depth = dag.depth_of(k) if self.scheduling_mode == 'random': priority = round(random.uniform(-1, 1), 10) @@ -341,27 +350,31 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): priority = task_depth elif self.scheduling_mode == 'breadth-first': priority = -task_depth + elif self.scheduling_mode == 'longest-first': + priority = category_avg_execution_time + elif self.scheduling_mode == 'shortest-first': + priority = -category_avg_execution_time elif self.scheduling_mode == 'FIFO': priority = -round(time.time(), 6) elif self.scheduling_mode == 'LIFO': priority = round(time.time(), 6) else: priority = 0 - cat = self.category_name(sexpr) + if self.task_mode == 'tasks': - if cat not in self._categories_known: + if category not in self._categories_known: if self.resources: - self.set_category_resources_max(cat, self.resources) + self.set_category_resources_max(category, self.resources) if self.resources_mode: - self.set_category_mode(cat, self.resources_mode) + self.set_category_mode(category, self.resources_mode) if not self._categories_known: self.enable_monitoring() - self._categories_known.add(cat) + self._categories_known.add(category) t = PythonTaskDask(self, dag, k, sexpr, - category=cat, + category=category, environment=self.environment, extra_files=self.extra_files, env_vars=self.env_vars, @@ -369,7 +382,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): worker_transfers=lazy, wrapper=self.wrapper) - vine_task_set_priority(t._task, priority) + t.set_priority(priority) if self.env_per_task: t.set_command( f"mkdir envdir && tar -xf {self._environment_name} -C envdir && envdir/bin/run_in_env {t._command}") @@ -381,13 +394,13 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): if self.task_mode == 'function-calls': t = FunctionCallDask(self, dag, k, sexpr, - category=cat, + category=category, extra_files=self.extra_files, retries=retries, worker_transfers=lazy, wrapper=self.wrapper) - vine_task_set_priority(t._task, priority) + t.set_priority(priority) t.set_tag(tag) # tag that identifies this dag enqueued_calls.append(t) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/task.py b/taskvine/src/bindings/python3/ndcctools/taskvine/task.py index 04b6dff7cc..0b0121e3b8 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/task.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/task.py @@ -811,6 +811,13 @@ def resources_allocated(self): return None return cvine.vine_task_get_resources(self._task, "allocated") + ## + # Get the execution time of the task in seconds. + # + @property + def execution_time(self): + return cvine.vine_task_get_execution_time(self._task) / 1e6 + ## # Adds inputs for nopen library and rules file and sets LD_PRELOAD # diff --git a/taskvine/src/manager/taskvine.h b/taskvine/src/manager/taskvine.h index 8bb5f772ab..4e61a5c05b 100644 --- a/taskvine/src/manager/taskvine.h +++ b/taskvine/src/manager/taskvine.h @@ -445,6 +445,13 @@ regardless of the priority. void vine_task_set_priority(struct vine_task *t, double priority); +/** Get the actual execution time of the task. +execution_time = t->time_workers_execute_last_end - t->time_workers_execute_last_start. +@param t A task object. +@return The actual execution time of the task in seconds. +*/ +double vine_task_get_execution_time(struct vine_task *t); + /** Specify an environment variable to be added to the task. @param t A task object @param name Name of the variable. diff --git a/taskvine/src/manager/vine_task.c b/taskvine/src/manager/vine_task.c index ea1307185a..b99e7c7d7d 100644 --- a/taskvine/src/manager/vine_task.c +++ b/taskvine/src/manager/vine_task.c @@ -655,6 +655,11 @@ void vine_task_set_priority(struct vine_task *t, double priority) t->priority = priority; } +double vine_task_get_execution_time(struct vine_task *t) +{ + return t->time_workers_execute_last_end - t->time_workers_execute_last_start; +} + int vine_task_set_monitor_output(struct vine_task *t, const char *monitor_output_directory) { From b88d2619afec2c68e20f54db9a6aed5aa539abc0 Mon Sep 17 00:00:00 2001 From: JinZhou5042 Date: Wed, 21 Aug 2024 21:56:45 -0400 Subject: [PATCH 05/12] longest task first --- .../src/bindings/python3/ndcctools/taskvine/dask_executor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py index d9b21a4cd8..6f7249966e 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py @@ -339,9 +339,11 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): # each task has a category name category = self.category_name(sexpr) - category_avg_execution_time = 0 if len(self.category_execution_time[category]): category_avg_execution_time = sum(self.category_execution_time[category]) / len(self.category_execution_time[category]) + else: + # if no tasks have been executed in this category, set a high priority so that we know more information about each category + category_avg_execution_time = 1e10 task_depth = dag.depth_of(k) if self.scheduling_mode == 'random': From 8ad03a7e1178b5bd245e909801c5ff9bd1a8c691 Mon Sep 17 00:00:00 2001 From: JinZhou5042 Date: Wed, 21 Aug 2024 22:48:46 -0400 Subject: [PATCH 06/12] rename --- .../python3/ndcctools/taskvine/dask_executor.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py index 6f7249966e..9644d94886 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py @@ -172,6 +172,8 @@ def get(self, dsk, keys, *, self.wrapper_proc = wrapper_proc self.prune_files = prune_files self.category_execution_time = defaultdict(list) + self.max_priority = 1e100 + self.min_priority = -1e100 if submit_per_cycle is not None and submit_per_cycle < 1: submit_per_cycle = None @@ -339,23 +341,20 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): # each task has a category name category = self.category_name(sexpr) - if len(self.category_execution_time[category]): - category_avg_execution_time = sum(self.category_execution_time[category]) / len(self.category_execution_time[category]) - else: - # if no tasks have been executed in this category, set a high priority so that we know more information about each category - category_avg_execution_time = 1e10 task_depth = dag.depth_of(k) if self.scheduling_mode == 'random': - priority = round(random.uniform(-1, 1), 10) + priority = random.randint(self.min_priority, self.max_priority) elif self.scheduling_mode == 'depth-first': priority = task_depth elif self.scheduling_mode == 'breadth-first': priority = -task_depth elif self.scheduling_mode == 'longest-first': - priority = category_avg_execution_time + # if no tasks have been executed in this category, set a high priority so that we know more information about each category + priority = sum(self.category_execution_time[category]) / len(self.category_execution_time[category]) if len(self.category_execution_time[category]) else self.max_priority elif self.scheduling_mode == 'shortest-first': - priority = -category_avg_execution_time + # if no tasks have been executed in this category, set a high priority so that we know more information about each category + priority = -sum(self.category_execution_time[category]) / len(self.category_execution_time[category]) if len(self.category_execution_time[category]) else self.max_priority elif self.scheduling_mode == 'FIFO': priority = -round(time.time(), 6) elif self.scheduling_mode == 'LIFO': From 259c5adcdaebc0d6605e8bb2b23f7ad2d2a17a7b Mon Sep 17 00:00:00 2001 From: JinZhou5042 Date: Mon, 18 Nov 2024 10:18:02 -0500 Subject: [PATCH 07/12] remove cat scheduling --- .../ndcctools/taskvine/dask_executor.py | 22 +++++-------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py index 1e516fa8c6..8844f5fa47 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py @@ -175,7 +175,6 @@ def get(self, dsk, keys, *, self.wrapper = wrapper self.wrapper_proc = wrapper_proc self.prune_files = prune_files - self.category_execution_time = defaultdict(list) self.max_priority = 1e100 self.min_priority = -1e100 @@ -281,7 +280,6 @@ def _dask_execute(self, dsk, keys): print(f"{t.key} ran on {t.hostname}") if t.successful(): - self.category_execution_time[t.category].append(t.execution_time) result_file = DaskVineFile(t.output_file, t.key, dag, self.task_mode) rs = dag.set_result(t.key, result_file) self._enqueue_dask_calls(dag, tag, rs, self.retries, enqueued_calls) @@ -344,7 +342,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): lazy = self.checkpoint_fn(dag, k) # each task has a category name - category = self.category_name(sexpr) + cat = self.category_name(sexpr) task_depth = dag.depth_of(k) if self.scheduling_mode == 'random': @@ -353,33 +351,25 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): priority = task_depth elif self.scheduling_mode == 'breadth-first': priority = -task_depth - elif self.scheduling_mode == 'longest-first': - # if no tasks have been executed in this category, set a high priority so that we know more information about each category - priority = sum(self.category_execution_time[category]) / len(self.category_execution_time[category]) if len(self.category_execution_time[category]) else self.max_priority - elif self.scheduling_mode == 'shortest-first': - # if no tasks have been executed in this category, set a high priority so that we know more information about each category - priority = -sum(self.category_execution_time[category]) / len(self.category_execution_time[category]) if len(self.category_execution_time[category]) else self.max_priority elif self.scheduling_mode == 'FIFO': priority = -round(time.time(), 6) elif self.scheduling_mode == 'LIFO': priority = round(time.time(), 6) - else: - priority = 0 if self.task_mode == 'tasks': - if category not in self._categories_known: + if cat not in self._categories_known: if self.resources: - self.set_category_resources_max(category, self.resources) + self.set_category_resources_max(cat, self.resources) if self.resources_mode: - self.set_category_mode(category, self.resources_mode) + self.set_category_mode(cat, self.resources_mode) if not self._categories_known: self.enable_monitoring() - self._categories_known.add(category) + self._categories_known.add(cat) t = PythonTaskDask(self, dag, k, sexpr, - category=category, + category=cat, environment=self.environment, extra_files=self.extra_files, env_vars=self.env_vars, From 741ae7fbf314aa6d022dd05593de7bf1cc261c45 Mon Sep 17 00:00:00 2001 From: JinZhou5042 Date: Mon, 18 Nov 2024 10:19:38 -0500 Subject: [PATCH 08/12] cat scheduling --- .../python3/ndcctools/taskvine/dask_executor.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py index 8844f5fa47..3d64f2bc43 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py @@ -175,6 +175,7 @@ def get(self, dsk, keys, *, self.wrapper = wrapper self.wrapper_proc = wrapper_proc self.prune_files = prune_files + self.category_execution_time = defaultdict(list) self.max_priority = 1e100 self.min_priority = -1e100 @@ -280,6 +281,7 @@ def _dask_execute(self, dsk, keys): print(f"{t.key} ran on {t.hostname}") if t.successful(): + self.category_execution_time[t.category].append(t.execution_time) result_file = DaskVineFile(t.output_file, t.key, dag, self.task_mode) rs = dag.set_result(t.key, result_file) self._enqueue_dask_calls(dag, tag, rs, self.retries, enqueued_calls) @@ -351,6 +353,12 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): priority = task_depth elif self.scheduling_mode == 'breadth-first': priority = -task_depth + elif self.scheduling_mode == 'longest-first': + # if no tasks have been executed in this category, set a high priority so that we know more information about each category + priority = sum(self.category_execution_time[category]) / len(self.category_execution_time[category]) if len(self.category_execution_time[category]) else self.max_priority + elif self.scheduling_mode == 'shortest-first': + # if no tasks have been executed in this category, set a high priority so that we know more information about each category + priority = -sum(self.category_execution_time[category]) / len(self.category_execution_time[category]) if len(self.category_execution_time[category]) else self.max_priority elif self.scheduling_mode == 'FIFO': priority = -round(time.time(), 6) elif self.scheduling_mode == 'LIFO': @@ -389,7 +397,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): if self.task_mode == 'function-calls': t = FunctionCallDask(self, dag, k, sexpr, - category=category, + category=cat, extra_files=self.extra_files, retries=retries, worker_transfers=lazy, From ae9c3fdaf0501ce2baee5502c9cd9ebc7364cf3d Mon Sep 17 00:00:00 2001 From: JinZhou5042 Date: Mon, 18 Nov 2024 10:20:08 -0500 Subject: [PATCH 09/12] cat scheduling --- .../src/bindings/python3/ndcctools/taskvine/dask_executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py index 3d64f2bc43..9fcf2f5ad0 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py @@ -355,10 +355,10 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): priority = -task_depth elif self.scheduling_mode == 'longest-first': # if no tasks have been executed in this category, set a high priority so that we know more information about each category - priority = sum(self.category_execution_time[category]) / len(self.category_execution_time[category]) if len(self.category_execution_time[category]) else self.max_priority + priority = sum(self.category_execution_time[cat]) / len(self.category_execution_time[cat]) if len(self.category_execution_time[cat]) else self.max_priority elif self.scheduling_mode == 'shortest-first': # if no tasks have been executed in this category, set a high priority so that we know more information about each category - priority = -sum(self.category_execution_time[category]) / len(self.category_execution_time[category]) if len(self.category_execution_time[category]) else self.max_priority + priority = -sum(self.category_execution_time[cat]) / len(self.category_execution_time[cat]) if len(self.category_execution_time[cat]) else self.max_priority elif self.scheduling_mode == 'FIFO': priority = -round(time.time(), 6) elif self.scheduling_mode == 'LIFO': From 7e3e6bdcaf98312ee497aa0fdc733fc9c4954703 Mon Sep 17 00:00:00 2001 From: JinZhou5042 Date: Mon, 18 Nov 2024 13:24:08 -0500 Subject: [PATCH 10/12] default FIFO --- .../bindings/python3/ndcctools/taskvine/dask_executor.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py index 9fcf2f5ad0..e2fed570a6 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py @@ -126,7 +126,7 @@ def get(self, dsk, keys, *, lib_command=None, lib_modules=None, task_mode='tasks', - scheduling_mode=None, + scheduling_mode='FIFO', env_per_task=False, progress_disable=False, progress_label="[green]tasks", @@ -363,6 +363,11 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): priority = -round(time.time(), 6) elif self.scheduling_mode == 'LIFO': priority = round(time.time(), 6) + elif self.scheduling_mode == 'largest-input-first': + # best for saving disk space (with pruing) + priority = sum([len(dag.get_result(c)._file) for c in dag.get_children(k)]) + else: + raise ValueError(f"Unknown scheduling mode {self.scheduling_mode}") if self.task_mode == 'tasks': if cat not in self._categories_known: From 103fe67e0e05ccdea9032c6ae3a4742c2d8440a0 Mon Sep 17 00:00:00 2001 From: JinZhou5042 Date: Tue, 19 Nov 2024 10:14:40 -0500 Subject: [PATCH 11/12] change 1e100 to float inf --- .../bindings/python3/ndcctools/taskvine/dask_executor.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py index e2fed570a6..89157509d0 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_executor.py @@ -176,8 +176,8 @@ def get(self, dsk, keys, *, self.wrapper_proc = wrapper_proc self.prune_files = prune_files self.category_execution_time = defaultdict(list) - self.max_priority = 1e100 - self.min_priority = -1e100 + self.max_priority = float('inf') + self.min_priority = float('-inf') if submit_per_cycle is not None and submit_per_cycle < 1: submit_per_cycle = None @@ -350,8 +350,10 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): if self.scheduling_mode == 'random': priority = random.randint(self.min_priority, self.max_priority) elif self.scheduling_mode == 'depth-first': + # dig more information about different kinds of tasks priority = task_depth elif self.scheduling_mode == 'breadth-first': + # prefer to start all branches as soon as possible priority = -task_depth elif self.scheduling_mode == 'longest-first': # if no tasks have been executed in this category, set a high priority so that we know more information about each category @@ -360,8 +362,10 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls): # if no tasks have been executed in this category, set a high priority so that we know more information about each category priority = -sum(self.category_execution_time[cat]) / len(self.category_execution_time[cat]) if len(self.category_execution_time[cat]) else self.max_priority elif self.scheduling_mode == 'FIFO': + # first in first out, the default behavior priority = -round(time.time(), 6) elif self.scheduling_mode == 'LIFO': + # last in first out, the opposite of FIFO priority = round(time.time(), 6) elif self.scheduling_mode == 'largest-input-first': # best for saving disk space (with pruing) @@ -667,6 +671,7 @@ def __init__(self, m, self.set_category(category) if worker_transfers: self.enable_temp_output() + if extra_files: for f, name in extra_files.items(): self.add_input(f, name) From b3486ce83eacecb5d7739fba3e803429adba7c11 Mon Sep 17 00:00:00 2001 From: JinZhou5042 Date: Tue, 19 Nov 2024 12:46:00 -0500 Subject: [PATCH 12/12] set depth --- .../python3/ndcctools/taskvine/dask_dag.py | 27 +++++-------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py index f94c8064d1..682aff5085 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py @@ -77,7 +77,7 @@ def __init__(self, dsk, low_memory_mode=False): self._pending_parents_of = defaultdict(lambda: set()) # key->depth. The shallowest level the key is found - self._depth_of = defaultdict(lambda: 0) + self._depth_of = defaultdict(lambda: float("inf")) # target keys that the dag should compute self._targets = set() @@ -102,36 +102,23 @@ def depth_of(self, key): def initialize_graph(self): for key, sexpr in self._working_graph.items(): self.set_relations(key, sexpr) - for key, sexpr in self._working_graph.items(): - self.set_depth(key) - def find_dependencies(self, sexpr): + def find_dependencies(self, sexpr, depth=0): dependencies = set() if self.graph_keyp(sexpr): dependencies.add(sexpr) + self._depth_of[sexpr] = min(depth, self._depth_of[sexpr]) elif not DaskVineDag.symbolp(sexpr): for sub in sexpr: - dependencies.update(self.find_dependencies(sub)) + dependencies.update(self.find_dependencies(sub, depth + 1)) return dependencies - def set_depth(self, key): - if key not in self._children_of or not self._children_of[key]: - self._depth_of[key] = 1 - return 1 - - max_children_depth = 0 - for child in self._children_of[key]: - if child not in self._depth_of: - child_depth = self.set_depth(child) - else: - child_depth = self._depth_of[child] - max_children_depth = max(max_children_depth, child_depth) - self._depth_of[key] = max_children_depth + 1 - return self._depth_of[key] - def set_relations(self, key, sexpr): sexpr = self._working_graph[key] + self._children_of[key] = self.find_dependencies(sexpr) + self._depth_of[key] = max([self._depth_of[c] for c in self._children_of[key]]) + 1 if self._children_of[key] else 0 + self._missing_of[key] = set(self._children_of[key]) for c in self._children_of[key]: