Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vine: daskvine priority scheduling #3923

5 changes: 4 additions & 1 deletion taskvine/src/bindings/python3/ndcctools/taskvine/dask_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def __init__(self, dsk, low_memory_mode=False):
self._pending_parents_of = defaultdict(lambda: set())

# key->depth. The shallowest level the key is found
self._depth_of = defaultdict(lambda: float('inf'))
self._depth_of = defaultdict(lambda: float("inf"))

# target keys that the dag should compute
self._targets = set()
Expand Down Expand Up @@ -115,7 +115,10 @@ def find_dependencies(self, sexpr, depth=0):

def set_relations(self, key, sexpr):
sexpr = self._working_graph[key]

self._children_of[key] = self.find_dependencies(sexpr)
self._depth_of[key] = max([self._depth_of[c] for c in self._children_of[key]]) + 1 if self._children_of[key] else 0

self._missing_of[key] = set(self._children_of[key])

for c in self._children_of[key]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
from .dask_dag import DaskVineDag
from .cvine import VINE_TEMP

import os
import time
import random
import contextlib
import cloudpickle
import os
from uuid import uuid4
from collections import defaultdict

try:
import rich
Expand Down Expand Up @@ -123,6 +126,7 @@ def get(self, dsk, keys, *,
lib_command=None,
lib_modules=None,
task_mode='tasks',
scheduling_mode='FIFO',
env_per_task=False,
progress_disable=False,
progress_label="[green]tasks",
Expand Down Expand Up @@ -164,12 +168,16 @@ def get(self, dsk, keys, *,
else:
self.lib_modules = hoisting_modules if hoisting_modules else import_modules # Deprecated
self.task_mode = task_mode
self.scheduling_mode = scheduling_mode
self.env_per_task = env_per_task
self.progress_disable = progress_disable
self.progress_label = progress_label
self.wrapper = wrapper
self.wrapper_proc = wrapper_proc
self.prune_files = prune_files
self.category_execution_time = defaultdict(list)
self.max_priority = float('inf')
self.min_priority = float('-inf')

if submit_per_cycle is not None and submit_per_cycle < 1:
submit_per_cycle = None
Expand Down Expand Up @@ -273,6 +281,7 @@ def _dask_execute(self, dsk, keys):
print(f"{t.key} ran on {t.hostname}")

if t.successful():
self.category_execution_time[t.category].append(t.execution_time)
result_file = DaskVineFile(t.output_file, t.key, dag, self.task_mode)
rs = dag.set_result(t.key, result_file)
self._enqueue_dask_calls(dag, tag, rs, self.retries, enqueued_calls)
Expand Down Expand Up @@ -334,7 +343,36 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls):
if lazy and self.checkpoint_fn:
lazy = self.checkpoint_fn(dag, k)

# each task has a category name
cat = self.category_name(sexpr)

task_depth = dag.depth_of(k)
if self.scheduling_mode == 'random':
priority = random.randint(self.min_priority, self.max_priority)
elif self.scheduling_mode == 'depth-first':
# dig more information about different kinds of tasks
priority = task_depth
elif self.scheduling_mode == 'breadth-first':
# prefer to start all branches as soon as possible
priority = -task_depth
elif self.scheduling_mode == 'longest-first':
# if no tasks have been executed in this category, set a high priority so that we know more information about each category
priority = sum(self.category_execution_time[cat]) / len(self.category_execution_time[cat]) if len(self.category_execution_time[cat]) else self.max_priority
elif self.scheduling_mode == 'shortest-first':
# if no tasks have been executed in this category, set a high priority so that we know more information about each category
priority = -sum(self.category_execution_time[cat]) / len(self.category_execution_time[cat]) if len(self.category_execution_time[cat]) else self.max_priority
elif self.scheduling_mode == 'FIFO':
# first in first out, the default behavior
priority = -round(time.time(), 6)
elif self.scheduling_mode == 'LIFO':
# last in first out, the opposite of FIFO
priority = round(time.time(), 6)
elif self.scheduling_mode == 'largest-input-first':
# best for saving disk space (with pruing)
priority = sum([len(dag.get_result(c)._file) for c in dag.get_children(k)])
else:
raise ValueError(f"Unknown scheduling mode {self.scheduling_mode}")

if self.task_mode == 'tasks':
if cat not in self._categories_known:
if self.resources:
Expand All @@ -356,6 +394,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls):
worker_transfers=lazy,
wrapper=self.wrapper)

t.set_priority(priority)
if self.env_per_task:
t.set_command(
f"mkdir envdir && tar -xf {self._environment_name} -C envdir && envdir/bin/run_in_env {t._command}")
Expand All @@ -373,6 +412,7 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls):
worker_transfers=lazy,
wrapper=self.wrapper)

t.set_priority(priority)
t.set_tag(tag) # tag that identifies this dag

enqueued_calls.append(t)
Expand Down Expand Up @@ -631,6 +671,7 @@ def __init__(self, m,
self.set_category(category)
if worker_transfers:
self.enable_temp_output()

if extra_files:
for f, name in extra_files.items():
self.add_input(f, name)
Expand Down
7 changes: 7 additions & 0 deletions taskvine/src/bindings/python3/ndcctools/taskvine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,13 @@ def resources_allocated(self):
return None
return cvine.vine_task_get_resources(self._task, "allocated")

##
# Get the execution time of the task in seconds.
#
@property
def execution_time(self):
return cvine.vine_task_get_execution_time(self._task) / 1e6

##
# Adds inputs for nopen library and rules file and sets LD_PRELOAD
#
Expand Down
7 changes: 7 additions & 0 deletions taskvine/src/manager/taskvine.h
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,13 @@ regardless of the priority.

void vine_task_set_priority(struct vine_task *t, double priority);

/** Get the actual execution time of the task.
execution_time = t->time_workers_execute_last_end - t->time_workers_execute_last_start.
@param t A task object.
@return The actual execution time of the task in seconds.
*/
double vine_task_get_execution_time(struct vine_task *t);

/** Specify an environment variable to be added to the task.
@param t A task object
@param name Name of the variable.
Expand Down
5 changes: 5 additions & 0 deletions taskvine/src/manager/vine_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,11 @@ void vine_task_set_priority(struct vine_task *t, double priority)
t->priority = priority;
}

double vine_task_get_execution_time(struct vine_task *t)
{
return t->time_workers_execute_last_end - t->time_workers_execute_last_start;
}

int vine_task_set_monitor_output(struct vine_task *t, const char *monitor_output_directory)
{

Expand Down
Loading