diff --git a/dttools/src/priority_queue.c b/dttools/src/priority_queue.c index b95cfe8def..bc68ecd84e 100644 --- a/dttools/src/priority_queue.c +++ b/dttools/src/priority_queue.c @@ -24,9 +24,9 @@ struct priority_queue { int size; int capacity; struct element **elements; - int step_cursor; // iterate from the left to the right, keep the last position - int scheduling_cursor; // used in scheduling + int step_cursor; // iterate from the left to the right, remember the last position int sweep_cursor; // iterate from the left to the right, restart every time + int scheduling_cursor; // used for task scheduling }; struct priority_queue *priority_queue_create(double init_capacity) @@ -176,34 +176,12 @@ void *priority_queue_get_head(struct priority_queue *pq) return pq->elements[1]->data; } -void *priority_queue_get_element(struct priority_queue *pq, int index) +void *priority_queue_get_element(struct priority_queue *pq, int idx) { - if (!pq || pq->size < 1 || index < 1 || index > pq->size) + if (!pq || pq->size < 1 || idx < 1 || idx > pq->size) return NULL; - return pq->elements[index]->data; -} - -double priority_queue_get_max_priority(struct priority_queue *pq) -{ - if (!pq || pq->size == 0) - return MIN_PRIORITY; - - return pq->elements[1]->priority; -} - -double priority_queue_get_min_priority(struct priority_queue *pq) -{ - if (!pq || pq->size == 0) - return MAX_PRIORITY; - - double min_priority = pq->elements[1]->priority; - - for (int i = 2; i <= pq->size; i++) { - min_priority = pq->elements[i]->priority < min_priority ? pq->elements[i]->priority : min_priority; - } - - return min_priority; + return pq->elements[idx]->data; } int priority_queue_update_priority(struct priority_queue *pq, void *data, double new_priority) @@ -211,43 +189,58 @@ int priority_queue_update_priority(struct priority_queue *pq, void *data, double if (!pq) return 0; - int index = -1; + int idx = -1; for (int i = 1; i <= pq->size; i++) { if (pq->elements[i]->data == data) { - index = i; + idx = i; break; } } - if (index == -1) { + if (idx == -1) { return 0; } - double old_priority = pq->elements[index]->priority; - pq->elements[index]->priority = new_priority; + double old_priority = pq->elements[idx]->priority; + pq->elements[idx]->priority = new_priority; if (new_priority > old_priority) { - swim(pq, index); + swim(pq, idx); } else if (new_priority < old_priority) { - sink(pq, index); + sink(pq, idx); } return 1; } -void *priority_queue_step_next(struct priority_queue *pq) +int priority_queue_find_idx(struct priority_queue *pq, void *data) +{ + if (!pq) + return 0; + + for (int i = 1; i <= pq->size; i++) { + if (pq->elements[i]->data == data) { + return i; + } + } + + return 0; +} + +int priority_queue_step_next(struct priority_queue *pq) { if (!pq || pq->size == 0) - return NULL; + return 0; pq->step_cursor++; if (pq->step_cursor > pq->size) { pq->step_cursor = 1; } - return pq->elements[pq->step_cursor]->data; + return pq->step_cursor; } + void priority_queue_sweep_reset(struct priority_queue *pq) { if (!pq) @@ -257,30 +250,21 @@ void priority_queue_sweep_reset(struct priority_queue *pq) } /* -Advance the sweep cursor and reture the pointed data, -should be used only in PRIORITY_QUEUE_ITERATE +Advance the sweep cursor and return it, should be used only in PRIORITY_QUEUE_ITERATE */ -void *priority_queue_sweep_next(struct priority_queue *pq) +int priority_queue_sweep_next(struct priority_queue *pq) { if (!pq || pq->size == 0) - return NULL; + return 0; pq->sweep_cursor++; if (pq->sweep_cursor > pq->size) { priority_queue_sweep_reset(pq); - return NULL; + return 0; } - return pq->elements[pq->sweep_cursor]->data; -} - -int priority_queue_get_scheduling_cursor(struct priority_queue *pq) -{ - if (!pq) - return -1; - - return pq->scheduling_cursor; + return pq->sweep_cursor; } void priority_queue_scheduling_reset(struct priority_queue *pq) @@ -291,45 +275,40 @@ void priority_queue_scheduling_reset(struct priority_queue *pq) pq->scheduling_cursor = 0; } -void *priority_queue_scheduling_next(struct priority_queue *pq) +int priority_queue_scheduling_next(struct priority_queue *pq) { if (!pq || pq->size == 0) - return NULL; + return 0; pq->scheduling_cursor++; if (pq->scheduling_cursor > pq->size) { pq->scheduling_cursor = 1; } - return pq->elements[pq->scheduling_cursor]->data; + return pq->scheduling_cursor; } -int priority_queue_remove(struct priority_queue *pq, void *data) +int priority_queue_remove(struct priority_queue *pq, int idx) { - if (!pq) - return 0; - - for (int i = 1; i <= pq->size; i++) { - if (pq->elements[i]->data == data) { - struct element *e = pq->elements[i]; - pq->elements[i] = pq->elements[pq->size]; - pq->elements[pq->size--] = NULL; - sink(pq, i); - - if (pq->step_cursor == i) { - pq->step_cursor--; - } - if (pq->sweep_cursor == i) { - pq->sweep_cursor--; - } - if (pq->scheduling_cursor == i) { - pq->scheduling_cursor--; - } - free(e); - return 1; - } - } - return 0; + if (!pq || idx < 1 || idx > pq->size) + return 0; + + struct element *e = pq->elements[idx]; + pq->elements[idx] = pq->elements[pq->size]; + pq->elements[pq->size--] = NULL; + sink(pq, idx); + + if (pq->step_cursor == idx) { + pq->step_cursor--; + } + if (pq->sweep_cursor == idx) { + pq->sweep_cursor--; + } + if (pq->scheduling_cursor == idx) { + pq->scheduling_cursor--; + } + free(e); + return 1; } void priority_queue_delete(struct priority_queue *pq) diff --git a/dttools/src/priority_queue.h b/dttools/src/priority_queue.h index ec9ec9efa7..6750ebab68 100644 --- a/dttools/src/priority_queue.h +++ b/dttools/src/priority_queue.h @@ -78,9 +78,10 @@ for (int i = 1; i <= priority_queue_size(pq); i++) { Or use the PRIORITY_QUEUE_ITERATE macro:
+int idx; void *data; -PRIORITY_QUEUE_ITERATE (pq, data) { - printf("Priority queue contains: %p\n", data); +PRIORITY_QUEUE_ITERATE (pq, idx, data) { + printf("Data idx: %d\n", idx); }*/ @@ -128,18 +129,6 @@ The first accessible element is at index 1. */ void *priority_queue_get_element(struct priority_queue *pq, int index); -/** Get the highest priority of all elements from a priority queue. -@param pq A pointer to a priority queue. -@return The highest priority of the queue. -*/ -double priority_queue_get_max_priority(struct priority_queue *pq); - -/** Get the lowest priority of all elements from a priority queue. -@param pq A pointer to a priority queue. -@return The lowest priority of the queue. -*/ -double priority_queue_get_min_priority(struct priority_queue *pq); - /** Update the priority of an element in a priority queue. @param pq A pointer to a priority queue. @param data The pointer to the element to update. @@ -149,29 +138,27 @@ double priority_queue_get_min_priority(struct priority_queue *pq); int priority_queue_update_priority(struct priority_queue *pq, void *data, double new_priority); +int priority_queue_find_idx(struct priority_queue *pq, void *data); - -void *priority_queue_step_next(struct priority_queue *pq); +int priority_queue_step_next(struct priority_queue *pq); void priority_queue_sweep_reset(struct priority_queue *pq); -void *priority_queue_sweep_next(struct priority_queue *pq); - -int priority_queue_get_scheduling_cursor(struct priority_queue *pq); +int priority_queue_sweep_next(struct priority_queue *pq); void priority_queue_scheduling_reset(struct priority_queue *pq); -void *priority_queue_scheduling_next(struct priority_queue *pq); +int priority_queue_scheduling_next(struct priority_queue *pq); /** Remove the element with the specified index from a priority queue. @param pq A pointer to a priority queue. -@param index The index of the element to remove. +@param idx The index of the element to remove. @return One if the remove succeeded, failure otherwise */ -int priority_queue_remove(struct priority_queue *pq, void *data); +int priority_queue_remove(struct priority_queue *pq, int idx); /** Delete a priority queue. @param pq A pointer to a priority queue. @@ -182,15 +169,16 @@ void priority_queue_delete(struct priority_queue *pq); Use as follows:
+int idx; char *data; -PRIORITY_QUEUE_ITERATE(pq, data) { - printf("data: %s\n", data); +PRIORITY_QUEUE_ITERATE(pq, idx, data) { + printf("Data idx: %d\n", idx); }*/ -#define PRIORITY_QUEUE_ITERATE( pq, data ) priority_queue_sweep_reset(pq); while ((data = priority_queue_sweep_next(pq))) +#define PRIORITY_QUEUE_ITERATE( pq, idx, data ) priority_queue_sweep_reset(pq); while ((idx = priority_queue_sweep_next(pq)) && (data = priority_queue_get_element(pq, idx))) #endif diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index 0444e3304f..ebb42b2205 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -1343,19 +1343,20 @@ exceeded the maximum number of retries, or other policy issues. static int expire_waiting_tasks(struct vine_manager *q) { struct vine_task *t; + int t_idx; int expired = 0; int tasks_considered = 0; double current_time = timestamp_get() / ONE_SECOND; - while ((t = priority_queue_step_next(q->ready_tasks))) { + while ((t_idx = priority_queue_step_next(q->ready_tasks) && (t = priority_queue_get_element(q->ready_tasks, t_idx)))) { if (tasks_considered > q->attempt_schedule_depth) { return expired; } if (t->resources_requested->end > 0 && t->resources_requested->end <= current_time) { vine_task_set_result(t, VINE_RESULT_MAX_END_TIME); - priority_queue_remove(q->ready_tasks, t); + priority_queue_remove(q->ready_tasks, t_idx); change_task_state(q, t, VINE_TASK_RETRIEVED); expired++; } @@ -1372,15 +1373,16 @@ Terminate those to which no such worker exists. */ static int enforce_waiting_fixed_locations(struct vine_manager *q) { + int t_idx; struct vine_task *t; int terminated = 0; - PRIORITY_QUEUE_ITERATE(q->ready_tasks, t) + PRIORITY_QUEUE_ITERATE(q->ready_tasks, t_idx, t) { if (t->has_fixed_locations && !vine_schedule_check_fixed_location(q, t)) { vine_task_set_result(t, VINE_RESULT_FIXED_LOCATION_MISSING); change_task_state(q, t, VINE_TASK_RETRIEVED); - priority_queue_remove(q->ready_tasks, t); + priority_queue_remove(q->ready_tasks, t_idx); terminated++; } } @@ -1733,13 +1735,13 @@ of the manager's resource consumption for status reporting. static struct rmsummary *total_resources_needed(struct vine_manager *q) { - + int t_idx; struct vine_task *t; struct rmsummary *total = rmsummary_create(0); /* for waiting tasks, we use what they would request if dispatched right now. */ - PRIORITY_QUEUE_ITERATE(q->ready_tasks, t) + PRIORITY_QUEUE_ITERATE(q->ready_tasks, t_idx, t) { const struct rmsummary *s = vine_manager_task_resources_min(q, t); rmsummary_add(total, s); @@ -3201,6 +3203,7 @@ the task to the worker. static int send_one_task(struct vine_manager *q) { + int t_idx; struct vine_task *t; struct vine_worker_info *w = NULL; @@ -3212,7 +3215,8 @@ static int send_one_task(struct vine_manager *q) int tasks_to_consider = MIN(priority_queue_size(q->ready_tasks), q->attempt_schedule_depth); // First consider the task with the highest priority - t = priority_queue_get_head(q->ready_tasks); + t_idx = 1; + t = priority_queue_get_element(q->ready_tasks, t_idx); if (!t) { return 0; } @@ -3266,11 +3270,11 @@ static int send_one_task(struct vine_manager *q) } // Otherwise, remove it from the ready queue and start it: - priority_queue_remove(q->ready_tasks, t); + priority_queue_remove(q->ready_tasks, t_idx); commit_task_to_worker(q, w, t); return 1; - } while ((t = priority_queue_scheduling_next(q->ready_tasks))); + } while ((t_idx = priority_queue_scheduling_next(q->ready_tasks)) && (t = priority_queue_get_element(q->ready_tasks, t_idx))); // if we made it here we reached the end of the queue return 0; @@ -3601,12 +3605,12 @@ static void reset_task_to_state(struct vine_manager *q, struct vine_task *t, vin break; case VINE_TASK_READY: - priority_queue_remove(q->ready_tasks, t); + int t_idx = priority_queue_find_idx(q->ready_tasks, t); + priority_queue_remove(q->ready_tasks, t_idx); change_task_state(q, t, new_state); break; case VINE_TASK_RUNNING: - // t->worker must be set if in RUNNING state. assert(w); @@ -5111,9 +5115,10 @@ int vine_hungry(struct vine_manager *q) int64_t ready_task_disk = 0; int64_t ready_task_gpus = 0; + int t_idx; struct vine_task *t; - PRIORITY_QUEUE_ITERATE(q->ready_tasks, t) + PRIORITY_QUEUE_ITERATE(q->ready_tasks, t_idx, t) { ready_task_cores += MAX(1, t->resources_requested->cores); ready_task_memory += t->resources_requested->memory; diff --git a/taskvine/src/manager/vine_schedule.c b/taskvine/src/manager/vine_schedule.c index da5f3a8ce6..80a5704b67 100644 --- a/taskvine/src/manager/vine_schedule.c +++ b/taskvine/src/manager/vine_schedule.c @@ -575,6 +575,7 @@ This is quite an expensive function and so is invoked only periodically. void vine_schedule_check_for_large_tasks(struct vine_manager *q) { + int t_idx; struct vine_task *t; int unfit_core = 0; int unfit_mem = 0; @@ -583,7 +584,7 @@ void vine_schedule_check_for_large_tasks(struct vine_manager *q) struct rmsummary *largest_unfit_task = rmsummary_create(-1); - PRIORITY_QUEUE_ITERATE(q->ready_tasks, t) + PRIORITY_QUEUE_ITERATE(q->ready_tasks, t_idx, t) { // check each task against the queue of connected workers vine_resource_bitmask_t bit_set = is_task_larger_than_any_worker(q, t);