Skip to content

Commit

Permalink
Fix data movement task dependencies with clang-formatting (#110)
Browse files Browse the repository at this point in the history
* Fix missing data dependency by passing array writers to dependent tasks
* style: apply ruff format
* remove unnecessary file
  • Loading branch information
nicelhc13 authored Jan 18, 2024
1 parent dfe4c2b commit 6bd3166
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 29 deletions.
1 change: 1 addition & 0 deletions src/c/backend/include/phases.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ class MemoryReserver : virtual public SchedulerPhase {
* @param task The task to create data movement tasks for.
*/
void create_datamove_tasks(InnerTask *task);
void create_datamove_tasks2(InnerTask *task);
};

/**
Expand Down
15 changes: 14 additions & 1 deletion src/c/backend/include/runtime.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ class InnerTask {
/* Task Assigned Device Set*/
std::vector<Device *> assigned_devices;

/*Resource Requirements for each assigned device*/
/* Resource Requirements for each assigned device*/
std::unordered_map<int, ResourcePool_t> device_constraints;

/* Task is data movement task */
Expand All @@ -293,6 +293,15 @@ class InnerTask {
std::vector<std::vector<std::pair<parray::InnerPArray *, AccessMode>>>
parray_list;

/* A list of dependency tasks of a parray for this task's dependent tasks.
To be specific, a task sets dependencies of a parray for dependent tasks.
If this task's access permission to a parray includes write, it sets
itself as the dependency of the parray.
If this task's access permission to the parray is read-only, it pulls
this list of the dependencies to this map.
*/
std::unordered_map<uint64_t, std::vector<InnerTask*>> parray_dependencies_map;

InnerTask();
InnerTask(long long int id, void *py_task);
InnerTask(std::string name, long long int id, void *py_task);
Expand Down Expand Up @@ -623,6 +632,10 @@ class InnerTask {
void begin_multidev_req_addition();
void end_multidev_req_addition();

std::vector<InnerTask*>& get_parray_dependencies(uint64_t parray_parent_id) {
return this->parray_dependencies_map[parray_parent_id];
}

PlacementRequirementCollections &get_placement_req_options() {
return placement_req_options_;
}
Expand Down
83 changes: 82 additions & 1 deletion src/c/backend/phases.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,87 @@ void MemoryReserver::create_datamove_tasks(InnerTask *task) {
task->add_dependencies(data_tasks, true);
}



// TODO(hc): need to think about better naming before it is merged.
// first, need peer-review on this.
void MemoryReserver::create_datamove_tasks2(InnerTask *task) {
// Get a list of the parrays the current task holds.
const std::vector<std::vector<std::pair<parray::InnerPArray *, AccessMode>>>
&parray_list = task->parray_list;
std::string task_base_name = task->get_name();
std::vector<InnerTask *> data_tasks;
data_tasks.reserve(parray_list.size());

for (size_t i = 0; i < parray_list.size(); ++i) {
for (size_t j = 0; j < parray_list[i].size(); ++j) {
// Create a data movement task for each PArray.
parray::InnerPArray *parray = parray_list[i][j].first;
AccessMode access_mode = parray_list[i][j].second;
InnerDataTask *datamove_task = new InnerDataTask(
// TODO(hc): id should be updated!
task_base_name + ".dm." + std::to_string(i), 0, parray, access_mode,
i);
uint64_t parray_parent_id = parray->get_parent_parray()->id;
// Get dependencies
std::vector<void *> compute_task_dependencies = task->get_dependencies();
std::vector<InnerTask *> data_task_dependencies;
for (size_t k = 0; k < compute_task_dependencies.size(); ++k) {
InnerTask *parray_dependency =
static_cast<InnerTask *>(compute_task_dependencies[k]);
// Get dependencies of a parray having `parray_parent_id` that have
// registered to the traversed dependency task
std::vector<InnerTask*>& dep_parray_dependencies =
parray_dependency->get_parray_dependencies(parray_parent_id);

//std::cout << parray_dependency->name << " is being traversed\n";
for (size_t t = 0; t < dep_parray_dependencies.size(); ++t) {
data_task_dependencies.push_back(parray_dependency);
// If the current processing parray's access mode is READ ONLY,
// add this dependency as a dependency for this parray.
//std::cout << "access mode:" << int(access_mode) << "\n";
if (access_mode == AccessMode::IN) {
//std::cout << "IN parray is added:" << parray_parent_id << "\n";
task->get_parray_dependencies(parray_parent_id).push_back(parray_dependency);
}
}
}

// If the current processing parray's access mode is not READ ONLY,
// add itself as a dependency for this parray.
//std::cout << task->name << " is being traversed access id :" << int(access_mode) << "\n";
if (access_mode != AccessMode::IN) {
//std::cout << "IN/OUT OUT parray is added:" << parray_parent_id << "\n";
task->get_parray_dependencies(parray_parent_id).push_back(task);
}

// TODO(hc): pass false to add_dependencies() as optimization.
datamove_task->add_dependencies(data_task_dependencies, true);
// Copy assigned devices to a compute task to a data movement task.
// TODO(hc): When we support xpy, it should be devices corresponding
// to placements of the local partition.
auto device = task->get_assigned_devices()[i];
datamove_task->add_assigned_device(device);

datamove_task->device_constraints.emplace(
std::piecewise_construct,
std::forward_as_tuple(device->get_global_id()),
std::forward_as_tuple(0, 0, 1));

data_tasks.push_back(datamove_task);
// Add the created data movement task to a reserved task queue.
this->scheduler->increase_num_active_tasks();
this->reserved_tasks_buffer.push_back(datamove_task);
}
}

// Create dependencies between data move task and compute tasks.
task->add_dependencies(data_tasks, true);
}




void MemoryReserver::run(SchedulerPhase *next_phase) {
NVTX_RANGE("MemoryReserver::run", NVTX_COLOR_LIGHT_GREEN)

Expand All @@ -263,7 +344,7 @@ void MemoryReserver::run(SchedulerPhase *next_phase) {
if (can_reserve) {
this->reserve_resources(task);
this->reservable_tasks->pop();
this->create_datamove_tasks(task);
this->create_datamove_tasks2(task);
this->reserved_tasks_buffer.push_back(task);
} else {
// TODO:(wlr) we need some break condition to allow the scheduler to
Expand Down
2 changes: 1 addition & 1 deletion src/python/parla/common/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def process_crosspys(
_out.append((parray, i))
else:
raise TypeError(
f"Invalid Type: {type(element)}. Dataflow should be PArray, CrossPyArray, or Tuple[PArray, int]"
f"Invalid Type: {type(element)}. Dataflow should be PArray, CrossPyArray, or Tuple[PArray, int]"
)
return _out

Expand Down
1 change: 1 addition & 0 deletions src/python/parla/common/globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def print_config():
print("Default Runahead Behavior: ", default_sync, flush=True)
print("VCU Precision: ", VCU_BASELINE, flush=True)


class DeviceType(IntEnum):
"""
This class declares device types.
Expand Down
16 changes: 7 additions & 9 deletions src/python/parla/common/parray/coherence.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ class MemoryOperation:

# Flag
SWITCH_DEVICE_FLAG = (
101
) # if the flag is set, it means dst is not the current device
101 # if the flag is set, it means dst is not the current device
)
LOAD_SUBARRAY = (
102
) # if the flag is set, it means a subarray of src should be loaded
102 # if the flag is set, it means a subarray of src should be loaded
)
ENSURE_IS_COMPLETE = (
103
) # if the flag is set, check data will also check if the data is complete
103 # if the flag is set, check data will also check if the data is complete
)

def __init__(self, inst: int = NOOP, dst: int = -1, src: int = -1, flag: int = []):
self.inst = inst
Expand Down Expand Up @@ -125,9 +125,7 @@ def __init__(self, init_owner: int, num_gpu: int, cyparray_state: CyPArrayState)
self._is_complete[CPU_INDEX] = None

self._local_states[init_owner] = self.MODIFIED # initial state is MODIFIED
self.owner = (
init_owner
) # the device that has the complete copy (take the role of main memory)
self.owner = init_owner # the device that has the complete copy (take the role of main memory)
self._versions[init_owner] = 0 # the first version is 0
self._is_complete[init_owner] = True # the copy is complete
self._latest_version = 0 # the latest version in the system
Expand Down
9 changes: 3 additions & 6 deletions src/python/parla/common/parray/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,11 @@ def get_array(self, device_idx: Optional[int] = None) -> ndarray:
"""

if device_idx is None:
device_idx = self._current_device_index
device_idx = self._current_device_index

if self._slices: # so this is a sub-parray object
# index into origin array by saved slices
ret = self._array.get_by_global_slices(
device_idx, self._slices[0]
)
ret = self._array.get_by_global_slices(device_idx, self._slices[0])
for s in self._slices[1:]:
ret = ret[s]
return ret
Expand Down Expand Up @@ -214,13 +212,12 @@ def _current_device_index(self) -> int:
# to avoid import gpu context, which is slow to setup.
return device.device.id # device.device should be a cupy.cuda.Device object


# Public API:

def set_name(self, name: str):
self._name = name

def get(self, device: Optional[PyDevice] = None) -> 'np.ndarray' | 'cp.ndarray':
def get(self, device: Optional[PyDevice] = None) -> "np.ndarray" | "cp.ndarray":
if device is None:
return self.array
else:
Expand Down
4 changes: 2 additions & 2 deletions src/python/parla/common/parray/from_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,12 @@ def get_parray(object, count=0): # recursively process Sequence or Dictionary
elif isinstance(object, dict):
accumulator = {}
for key, value in object.items():
accumulator[key] = get_parray(value, count+1)
accumulator[key] = get_parray(value, count + 1)
return accumulator
elif isinstance(object, (list, tuple, set)):
accumulator = []
for item in object:
accumulator.append(get_parray(item, count+1))
accumulator.append(get_parray(item, count + 1))
return type(object)(accumulator)
else:
raise TypeError(f"Unsupported Type: {type(object)}")
Expand Down
6 changes: 0 additions & 6 deletions src/python/parla/cython/tasks.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1789,9 +1789,3 @@ class BackendTaskSpace(TaskSpace):

def wait(self):
self.inner_space.wait()






6 changes: 3 additions & 3 deletions src/python/parla/utility/graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ class RunConfig:
"""

outer_iterations: int = (
1
) # Number of times to launch the Parla runtime and execute the task graph
1 # Number of times to launch the Parla runtime and execute the task graph
)
# Number of times to execute the task graph within the same Parla runtime
inner_iterations: int = 1
inner_sync: bool = False # Whether to synchronize after each kernel launch
Expand Down Expand Up @@ -504,7 +504,7 @@ def get_task_properties(line: str):


def parse_blog(
filename: str = "parla.blog"
filename: str = "parla.blog",
) -> Tuple[Dict[TaskID, TaskTime], Dict[TaskID, List[TaskID]]]:
try:
result = subprocess.run(
Expand Down

0 comments on commit 6bd3166

Please sign in to comment.