From 6bd3166e5582b58c1a80c453711db0beaddf6b90 Mon Sep 17 00:00:00 2001 From: Hochan Lee Date: Wed, 17 Jan 2024 21:48:53 -0600 Subject: [PATCH] Fix data movement task dependencies with clang-formatting (#110) * Fix missing data dependency by passing array writers to dependent tasks * style: apply ruff format * remove unnecessary file --- src/c/backend/include/phases.hpp | 1 + src/c/backend/include/runtime.hpp | 15 +++- src/c/backend/phases.cpp | 83 ++++++++++++++++++++- src/python/parla/common/dataflow.py | 2 +- src/python/parla/common/globals.py | 1 + src/python/parla/common/parray/coherence.py | 16 ++-- src/python/parla/common/parray/core.py | 9 +-- src/python/parla/common/parray/from_data.py | 4 +- src/python/parla/cython/tasks.pyx | 6 -- src/python/parla/utility/graphs.py | 6 +- 10 files changed, 114 insertions(+), 29 deletions(-) diff --git a/src/c/backend/include/phases.hpp b/src/c/backend/include/phases.hpp index 8b3e4a6a..2871c937 100644 --- a/src/c/backend/include/phases.hpp +++ b/src/c/backend/include/phases.hpp @@ -277,6 +277,7 @@ class MemoryReserver : virtual public SchedulerPhase { * @param task The task to create data movement tasks for. */ void create_datamove_tasks(InnerTask *task); + void create_datamove_tasks2(InnerTask *task); }; /** diff --git a/src/c/backend/include/runtime.hpp b/src/c/backend/include/runtime.hpp index de7da5da..d4ab7214 100644 --- a/src/c/backend/include/runtime.hpp +++ b/src/c/backend/include/runtime.hpp @@ -277,7 +277,7 @@ class InnerTask { /* Task Assigned Device Set*/ std::vector assigned_devices; - /*Resource Requirements for each assigned device*/ + /* Resource Requirements for each assigned device*/ std::unordered_map device_constraints; /* Task is data movement task */ @@ -293,6 +293,15 @@ class InnerTask { std::vector>> parray_list; + /* A list of dependency tasks of a parray for this task's dependent tasks. + To be specific, a task sets dependencies of a parray for dependent tasks. + If this task's access permission to a parray includes write, it sets + itself as the dependency of the parray. + If this task's access permission to the parray is read-only, it pulls + this list of the dependencies to this map. + */ + std::unordered_map> parray_dependencies_map; + InnerTask(); InnerTask(long long int id, void *py_task); InnerTask(std::string name, long long int id, void *py_task); @@ -623,6 +632,10 @@ class InnerTask { void begin_multidev_req_addition(); void end_multidev_req_addition(); + std::vector& get_parray_dependencies(uint64_t parray_parent_id) { + return this->parray_dependencies_map[parray_parent_id]; + } + PlacementRequirementCollections &get_placement_req_options() { return placement_req_options_; } diff --git a/src/c/backend/phases.cpp b/src/c/backend/phases.cpp index 0bcb091a..ac48bca8 100644 --- a/src/c/backend/phases.cpp +++ b/src/c/backend/phases.cpp @@ -237,6 +237,87 @@ void MemoryReserver::create_datamove_tasks(InnerTask *task) { task->add_dependencies(data_tasks, true); } + + +// TODO(hc): need to think about better naming before it is merged. +// first, need peer-review on this. +void MemoryReserver::create_datamove_tasks2(InnerTask *task) { + // Get a list of the parrays the current task holds. + const std::vector>> + &parray_list = task->parray_list; + std::string task_base_name = task->get_name(); + std::vector data_tasks; + data_tasks.reserve(parray_list.size()); + + for (size_t i = 0; i < parray_list.size(); ++i) { + for (size_t j = 0; j < parray_list[i].size(); ++j) { + // Create a data movement task for each PArray. + parray::InnerPArray *parray = parray_list[i][j].first; + AccessMode access_mode = parray_list[i][j].second; + InnerDataTask *datamove_task = new InnerDataTask( + // TODO(hc): id should be updated! + task_base_name + ".dm." + std::to_string(i), 0, parray, access_mode, + i); + uint64_t parray_parent_id = parray->get_parent_parray()->id; + // Get dependencies + std::vector compute_task_dependencies = task->get_dependencies(); + std::vector data_task_dependencies; + for (size_t k = 0; k < compute_task_dependencies.size(); ++k) { + InnerTask *parray_dependency = + static_cast(compute_task_dependencies[k]); + // Get dependencies of a parray having `parray_parent_id` that have + // registered to the traversed dependency task + std::vector& dep_parray_dependencies = + parray_dependency->get_parray_dependencies(parray_parent_id); + + //std::cout << parray_dependency->name << " is being traversed\n"; + for (size_t t = 0; t < dep_parray_dependencies.size(); ++t) { + data_task_dependencies.push_back(parray_dependency); + // If the current processing parray's access mode is READ ONLY, + // add this dependency as a dependency for this parray. + //std::cout << "access mode:" << int(access_mode) << "\n"; + if (access_mode == AccessMode::IN) { + //std::cout << "IN parray is added:" << parray_parent_id << "\n"; + task->get_parray_dependencies(parray_parent_id).push_back(parray_dependency); + } + } + } + + // If the current processing parray's access mode is not READ ONLY, + // add itself as a dependency for this parray. + //std::cout << task->name << " is being traversed access id :" << int(access_mode) << "\n"; + if (access_mode != AccessMode::IN) { + //std::cout << "IN/OUT OUT parray is added:" << parray_parent_id << "\n"; + task->get_parray_dependencies(parray_parent_id).push_back(task); + } + + // TODO(hc): pass false to add_dependencies() as optimization. + datamove_task->add_dependencies(data_task_dependencies, true); + // Copy assigned devices to a compute task to a data movement task. + // TODO(hc): When we support xpy, it should be devices corresponding + // to placements of the local partition. + auto device = task->get_assigned_devices()[i]; + datamove_task->add_assigned_device(device); + + datamove_task->device_constraints.emplace( + std::piecewise_construct, + std::forward_as_tuple(device->get_global_id()), + std::forward_as_tuple(0, 0, 1)); + + data_tasks.push_back(datamove_task); + // Add the created data movement task to a reserved task queue. + this->scheduler->increase_num_active_tasks(); + this->reserved_tasks_buffer.push_back(datamove_task); + } + } + + // Create dependencies between data move task and compute tasks. + task->add_dependencies(data_tasks, true); +} + + + + void MemoryReserver::run(SchedulerPhase *next_phase) { NVTX_RANGE("MemoryReserver::run", NVTX_COLOR_LIGHT_GREEN) @@ -263,7 +344,7 @@ void MemoryReserver::run(SchedulerPhase *next_phase) { if (can_reserve) { this->reserve_resources(task); this->reservable_tasks->pop(); - this->create_datamove_tasks(task); + this->create_datamove_tasks2(task); this->reserved_tasks_buffer.push_back(task); } else { // TODO:(wlr) we need some break condition to allow the scheduler to diff --git a/src/python/parla/common/dataflow.py b/src/python/parla/common/dataflow.py index 8bdfcf74..254d0b9f 100644 --- a/src/python/parla/common/dataflow.py +++ b/src/python/parla/common/dataflow.py @@ -103,7 +103,7 @@ def process_crosspys( _out.append((parray, i)) else: raise TypeError( - f"Invalid Type: {type(element)}. Dataflow should be PArray, CrossPyArray, or Tuple[PArray, int]" + f"Invalid Type: {type(element)}. Dataflow should be PArray, CrossPyArray, or Tuple[PArray, int]" ) return _out diff --git a/src/python/parla/common/globals.py b/src/python/parla/common/globals.py index 75d9fe41..afc359b0 100644 --- a/src/python/parla/common/globals.py +++ b/src/python/parla/common/globals.py @@ -72,6 +72,7 @@ def print_config(): print("Default Runahead Behavior: ", default_sync, flush=True) print("VCU Precision: ", VCU_BASELINE, flush=True) + class DeviceType(IntEnum): """ This class declares device types. diff --git a/src/python/parla/common/parray/coherence.py b/src/python/parla/common/parray/coherence.py index 62e103a8..ba2c546d 100644 --- a/src/python/parla/common/parray/coherence.py +++ b/src/python/parla/common/parray/coherence.py @@ -27,14 +27,14 @@ class MemoryOperation: # Flag SWITCH_DEVICE_FLAG = ( - 101 - ) # if the flag is set, it means dst is not the current device + 101 # if the flag is set, it means dst is not the current device + ) LOAD_SUBARRAY = ( - 102 - ) # if the flag is set, it means a subarray of src should be loaded + 102 # if the flag is set, it means a subarray of src should be loaded + ) ENSURE_IS_COMPLETE = ( - 103 - ) # if the flag is set, check data will also check if the data is complete + 103 # if the flag is set, check data will also check if the data is complete + ) def __init__(self, inst: int = NOOP, dst: int = -1, src: int = -1, flag: int = []): self.inst = inst @@ -125,9 +125,7 @@ def __init__(self, init_owner: int, num_gpu: int, cyparray_state: CyPArrayState) self._is_complete[CPU_INDEX] = None self._local_states[init_owner] = self.MODIFIED # initial state is MODIFIED - self.owner = ( - init_owner - ) # the device that has the complete copy (take the role of main memory) + self.owner = init_owner # the device that has the complete copy (take the role of main memory) self._versions[init_owner] = 0 # the first version is 0 self._is_complete[init_owner] = True # the copy is complete self._latest_version = 0 # the latest version in the system diff --git a/src/python/parla/common/parray/core.py b/src/python/parla/common/parray/core.py index c6b4f52c..3833b7f5 100644 --- a/src/python/parla/common/parray/core.py +++ b/src/python/parla/common/parray/core.py @@ -170,13 +170,11 @@ def get_array(self, device_idx: Optional[int] = None) -> ndarray: """ if device_idx is None: - device_idx = self._current_device_index + device_idx = self._current_device_index if self._slices: # so this is a sub-parray object # index into origin array by saved slices - ret = self._array.get_by_global_slices( - device_idx, self._slices[0] - ) + ret = self._array.get_by_global_slices(device_idx, self._slices[0]) for s in self._slices[1:]: ret = ret[s] return ret @@ -214,13 +212,12 @@ def _current_device_index(self) -> int: # to avoid import gpu context, which is slow to setup. return device.device.id # device.device should be a cupy.cuda.Device object - # Public API: def set_name(self, name: str): self._name = name - def get(self, device: Optional[PyDevice] = None) -> 'np.ndarray' | 'cp.ndarray': + def get(self, device: Optional[PyDevice] = None) -> "np.ndarray" | "cp.ndarray": if device is None: return self.array else: diff --git a/src/python/parla/common/parray/from_data.py b/src/python/parla/common/parray/from_data.py index 9f72bc0e..930cbb16 100644 --- a/src/python/parla/common/parray/from_data.py +++ b/src/python/parla/common/parray/from_data.py @@ -176,12 +176,12 @@ def get_parray(object, count=0): # recursively process Sequence or Dictionary elif isinstance(object, dict): accumulator = {} for key, value in object.items(): - accumulator[key] = get_parray(value, count+1) + accumulator[key] = get_parray(value, count + 1) return accumulator elif isinstance(object, (list, tuple, set)): accumulator = [] for item in object: - accumulator.append(get_parray(item, count+1)) + accumulator.append(get_parray(item, count + 1)) return type(object)(accumulator) else: raise TypeError(f"Unsupported Type: {type(object)}") diff --git a/src/python/parla/cython/tasks.pyx b/src/python/parla/cython/tasks.pyx index 2cf05b01..b800218c 100644 --- a/src/python/parla/cython/tasks.pyx +++ b/src/python/parla/cython/tasks.pyx @@ -1789,9 +1789,3 @@ class BackendTaskSpace(TaskSpace): def wait(self): self.inner_space.wait() - - - - - - diff --git a/src/python/parla/utility/graphs.py b/src/python/parla/utility/graphs.py index 072f44f3..d9852dea 100644 --- a/src/python/parla/utility/graphs.py +++ b/src/python/parla/utility/graphs.py @@ -251,8 +251,8 @@ class RunConfig: """ outer_iterations: int = ( - 1 - ) # Number of times to launch the Parla runtime and execute the task graph + 1 # Number of times to launch the Parla runtime and execute the task graph + ) # Number of times to execute the task graph within the same Parla runtime inner_iterations: int = 1 inner_sync: bool = False # Whether to synchronize after each kernel launch @@ -504,7 +504,7 @@ def get_task_properties(line: str): def parse_blog( - filename: str = "parla.blog" + filename: str = "parla.blog", ) -> Tuple[Dict[TaskID, TaskTime], Dict[TaskID, List[TaskID]]]: try: result = subprocess.run(