Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data movement task dependencies with clang-formatting #110

Merged
merged 6 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/c/backend/include/phases.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@
* @details Used to record counts of phase execution for tracing and debugging.
* For example, the number of successful mappings per call.
*/
template <typename S> class PhaseStatus {

Check warning on line 58 in src/c/backend/include/phases.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

src/c/backend/include/phases.hpp:58:29 [cppcoreguidelines-pro-type-member-init]

constructor does not initialize these fields: status
protected:
const int size{static_cast<int>(S::MAX)};

Check warning on line 60 in src/c/backend/include/phases.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

src/c/backend/include/phases.hpp:60:13 [cppcoreguidelines-non-private-member-variables-in-classes]

member variable 'size' has protected visibility
std::string name{"Status"};

Check warning on line 61 in src/c/backend/include/phases.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

src/c/backend/include/phases.hpp:61:15 [cppcoreguidelines-non-private-member-variables-in-classes]

member variable 'name' has public visibility

public:
int status[static_cast<int>(S::MAX)];

Check warning on line 64 in src/c/backend/include/phases.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

src/c/backend/include/phases.hpp:64:7 [cppcoreguidelines-non-private-member-variables-in-classes]

member variable 'status' has public visibility

PhaseStatus() = default;
PhaseStatus(std::string name) : name(name) {}

Check warning on line 67 in src/c/backend/include/phases.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

src/c/backend/include/phases.hpp:67:3 [cppcoreguidelines-pro-type-member-init]

constructor does not initialize these fields: status

Check warning on line 67 in src/c/backend/include/phases.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

src/c/backend/include/phases.hpp:67:27 [performance-unnecessary-value-param]

the parameter 'name' is copied for each invocation but only used as a const reference; consider making it a const reference

void reset() {
/*!
Expand All @@ -78,7 +78,7 @@
inline void set(S state, int value) {
this->status[static_cast<int>(state)] = value;
}
inline const int get(S state) const {

Check warning on line 81 in src/c/backend/include/phases.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

src/c/backend/include/phases.hpp:81:3 [readability-const-return-type]

return type 'const int' is 'const'-qualified at the top level, which may reduce code readability without improving const correctness

Check warning on line 81 in src/c/backend/include/phases.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

src/c/backend/include/phases.hpp:81:20 [modernize-use-trailing-return-type]

use a trailing return type for this function
return this->status[static_cast<int>(state)];
}
inline void increase(S state) { this->status[static_cast<int>(state)]++; }
Expand All @@ -101,7 +101,7 @@
/*!
* @brief Abstract Interface for general scheduler runtime phase.
*/
class SchedulerPhase {

Check warning on line 104 in src/c/backend/include/phases.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

src/c/backend/include/phases.hpp:104:7 [cppcoreguidelines-pro-type-member-init]

constructor does not initialize these fields: scheduler, device_manager, enqueue_buffer
public:
SchedulerPhase() = default;

Expand All @@ -110,7 +110,7 @@
* @param scheduler The scheduler that the phase belongs to.
* @param devices The device manager that the phase uses.
*/
SchedulerPhase(InnerScheduler *scheduler, DeviceManager *devices)

Check warning on line 113 in src/c/backend/include/phases.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

src/c/backend/include/phases.hpp:113:3 [cppcoreguidelines-pro-type-member-init]

constructor does not initialize these fields: enqueue_buffer
: scheduler(scheduler), device_manager(devices) {}

/*!
Expand Down Expand Up @@ -277,6 +277,7 @@
* @param task The task to create data movement tasks for.
*/
void create_datamove_tasks(InnerTask *task);
void create_datamove_tasks2(InnerTask *task);
};

/**
Expand Down
15 changes: 14 additions & 1 deletion src/c/backend/include/runtime.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#pragma once

Check notice on line 1 in src/c/backend/include/runtime.hpp

View workflow job for this annotation

GitHub Actions / cpp-linter

Run clang-format on src/c/backend/include/runtime.hpp

File src/c/backend/include/runtime.hpp does not conform to LLVM style guidelines. (lines 303, 635)
#ifndef PARLA_BACKEND_HPP
#define PARLA_BACKEND_HPP

Expand Down Expand Up @@ -277,7 +277,7 @@
/* Task Assigned Device Set*/
std::vector<Device *> assigned_devices;

/*Resource Requirements for each assigned device*/
/* Resource Requirements for each assigned device*/
std::unordered_map<int, ResourcePool_t> device_constraints;

/* Task is data movement task */
Expand All @@ -293,6 +293,15 @@
std::vector<std::vector<std::pair<parray::InnerPArray *, AccessMode>>>
parray_list;

/* A list of dependency tasks of a parray for this task's dependent tasks.
To be specific, a task sets dependencies of a parray for dependent tasks.
If this task's access permission to a parray includes write, it sets
itself as the dependency of the parray.
If this task's access permission to the parray is read-only, it pulls
this list of the dependencies to this map.
*/
std::unordered_map<uint64_t, std::vector<InnerTask*>> parray_dependencies_map;

InnerTask();
InnerTask(long long int id, void *py_task);
InnerTask(std::string name, long long int id, void *py_task);
Expand Down Expand Up @@ -623,6 +632,10 @@
void begin_multidev_req_addition();
void end_multidev_req_addition();

std::vector<InnerTask*>& get_parray_dependencies(uint64_t parray_parent_id) {
return this->parray_dependencies_map[parray_parent_id];
}

PlacementRequirementCollections &get_placement_req_options() {
return placement_req_options_;
}
Expand Down
83 changes: 82 additions & 1 deletion src/c/backend/phases.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "include/phases.hpp"

Check notice on line 1 in src/c/backend/phases.cpp

View workflow job for this annotation

GitHub Actions / cpp-linter

Run clang-format on src/c/backend/phases.cpp

File src/c/backend/phases.cpp does not conform to LLVM style guidelines. (lines 238, 270, 273, 278, 280, 281, 288, 290, 316)
#include "include/device.hpp"
#include "include/parray.hpp"
#include "include/policy.hpp"
Expand Down Expand Up @@ -237,6 +237,87 @@
task->add_dependencies(data_tasks, true);
}



// TODO(hc): need to think about better naming before it is merged.
// first, need peer-review on this.
void MemoryReserver::create_datamove_tasks2(InnerTask *task) {
// Get a list of the parrays the current task holds.
const std::vector<std::vector<std::pair<parray::InnerPArray *, AccessMode>>>
&parray_list = task->parray_list;
std::string task_base_name = task->get_name();
std::vector<InnerTask *> data_tasks;
data_tasks.reserve(parray_list.size());

for (size_t i = 0; i < parray_list.size(); ++i) {
for (size_t j = 0; j < parray_list[i].size(); ++j) {
// Create a data movement task for each PArray.
parray::InnerPArray *parray = parray_list[i][j].first;
AccessMode access_mode = parray_list[i][j].second;
InnerDataTask *datamove_task = new InnerDataTask(
// TODO(hc): id should be updated!
task_base_name + ".dm." + std::to_string(i), 0, parray, access_mode,
i);
uint64_t parray_parent_id = parray->get_parent_parray()->id;
// Get dependencies
std::vector<void *> compute_task_dependencies = task->get_dependencies();
std::vector<InnerTask *> data_task_dependencies;
for (size_t k = 0; k < compute_task_dependencies.size(); ++k) {
InnerTask *parray_dependency =
static_cast<InnerTask *>(compute_task_dependencies[k]);
// Get dependencies of a parray having `parray_parent_id` that have
// registered to the traversed dependency task
std::vector<InnerTask*>& dep_parray_dependencies =
parray_dependency->get_parray_dependencies(parray_parent_id);

//std::cout << parray_dependency->name << " is being traversed\n";
for (size_t t = 0; t < dep_parray_dependencies.size(); ++t) {
data_task_dependencies.push_back(parray_dependency);
// If the current processing parray's access mode is READ ONLY,
// add this dependency as a dependency for this parray.
//std::cout << "access mode:" << int(access_mode) << "\n";
if (access_mode == AccessMode::IN) {
//std::cout << "IN parray is added:" << parray_parent_id << "\n";
task->get_parray_dependencies(parray_parent_id).push_back(parray_dependency);
}
}
}

// If the current processing parray's access mode is not READ ONLY,
// add itself as a dependency for this parray.
//std::cout << task->name << " is being traversed access id :" << int(access_mode) << "\n";
if (access_mode != AccessMode::IN) {
//std::cout << "IN/OUT OUT parray is added:" << parray_parent_id << "\n";
task->get_parray_dependencies(parray_parent_id).push_back(task);
}

// TODO(hc): pass false to add_dependencies() as optimization.
datamove_task->add_dependencies(data_task_dependencies, true);
// Copy assigned devices to a compute task to a data movement task.
// TODO(hc): When we support xpy, it should be devices corresponding
// to placements of the local partition.
auto device = task->get_assigned_devices()[i];
datamove_task->add_assigned_device(device);

datamove_task->device_constraints.emplace(
std::piecewise_construct,
std::forward_as_tuple(device->get_global_id()),
std::forward_as_tuple(0, 0, 1));

data_tasks.push_back(datamove_task);
// Add the created data movement task to a reserved task queue.
this->scheduler->increase_num_active_tasks();
this->reserved_tasks_buffer.push_back(datamove_task);
}
}

// Create dependencies between data move task and compute tasks.
task->add_dependencies(data_tasks, true);
}




void MemoryReserver::run(SchedulerPhase *next_phase) {
NVTX_RANGE("MemoryReserver::run", NVTX_COLOR_LIGHT_GREEN)

Expand All @@ -263,7 +344,7 @@
if (can_reserve) {
this->reserve_resources(task);
this->reservable_tasks->pop();
this->create_datamove_tasks(task);
this->create_datamove_tasks2(task);
this->reserved_tasks_buffer.push_back(task);
} else {
// TODO:(wlr) we need some break condition to allow the scheduler to
Expand Down
2 changes: 1 addition & 1 deletion src/python/parla/common/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def process_crosspys(
_out.append((parray, i))
else:
raise TypeError(
f"Invalid Type: {type(element)}. Dataflow should be PArray, CrossPyArray, or Tuple[PArray, int]"
f"Invalid Type: {type(element)}. Dataflow should be PArray, CrossPyArray, or Tuple[PArray, int]"
)
return _out

Expand Down
16 changes: 7 additions & 9 deletions src/python/parla/common/parray/coherence.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ class MemoryOperation:

# Flag
SWITCH_DEVICE_FLAG = (
101
) # if the flag is set, it means dst is not the current device
101 # if the flag is set, it means dst is not the current device
)
LOAD_SUBARRAY = (
102
) # if the flag is set, it means a subarray of src should be loaded
102 # if the flag is set, it means a subarray of src should be loaded
)
ENSURE_IS_COMPLETE = (
103
) # if the flag is set, check data will also check if the data is complete
103 # if the flag is set, check data will also check if the data is complete
)

def __init__(self, inst: int = NOOP, dst: int = -1, src: int = -1, flag: int = []):
self.inst = inst
Expand Down Expand Up @@ -125,9 +125,7 @@ def __init__(self, init_owner: int, num_gpu: int, cyparray_state: CyPArrayState)
self._is_complete[CPU_INDEX] = None

self._local_states[init_owner] = self.MODIFIED # initial state is MODIFIED
self.owner = (
init_owner
) # the device that has the complete copy (take the role of main memory)
self.owner = init_owner # the device that has the complete copy (take the role of main memory)
self._versions[init_owner] = 0 # the first version is 0
self._is_complete[init_owner] = True # the copy is complete
self._latest_version = 0 # the latest version in the system
Expand Down
9 changes: 3 additions & 6 deletions src/python/parla/common/parray/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,11 @@ def get_array(self, device_idx: Optional[int] = None) -> ndarray:
"""

if device_idx is None:
device_idx = self._current_device_index
device_idx = self._current_device_index

if self._slices: # so this is a sub-parray object
# index into origin array by saved slices
ret = self._array.get_by_global_slices(
device_idx, self._slices[0]
)
ret = self._array.get_by_global_slices(device_idx, self._slices[0])
for s in self._slices[1:]:
ret = ret[s]
return ret
Expand Down Expand Up @@ -214,10 +212,9 @@ def _current_device_index(self) -> int:
# to avoid import gpu context, which is slow to setup.
return device.device.id # device.device should be a cupy.cuda.Device object


# Public API:

def get(self, device: Optional[PyDevice] = None) -> 'np.ndarray' | 'cp.ndarray':
def get(self, device: Optional[PyDevice] = None) -> "np.ndarray" | "cp.ndarray":
if device is None:
return self.array
else:
Expand Down
7 changes: 1 addition & 6 deletions src/python/parla/cython/tasks.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@ class DataMovementTask(Task):
global_id = target_dev.get_global_id()
parray_id = device_manager.globalid_to_parrayid(global_id)

print(self.name, " moves parray:", parray_id, " on device:", target_dev)
self.parray._auto_move(parray_id, write_flag)
return TaskRunahead(0)

Expand Down Expand Up @@ -1787,9 +1788,3 @@ class BackendTaskSpace(TaskSpace):

def wait(self):
self.inner_space.wait()






6 changes: 3 additions & 3 deletions src/python/parla/utility/graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ class RunConfig:
"""

outer_iterations: int = (
1
) # Number of times to launch the Parla runtime and execute the task graph
1 # Number of times to launch the Parla runtime and execute the task graph
)
# Number of times to execute the task graph within the same Parla runtime
inner_iterations: int = 1
inner_sync: bool = False # Whether to synchronize after each kernel launch
Expand Down Expand Up @@ -504,7 +504,7 @@ def get_task_properties(line: str):


def parse_blog(
filename: str = "parla.blog"
filename: str = "parla.blog",
) -> Tuple[Dict[TaskID, TaskTime], Dict[TaskID, List[TaskID]]]:
try:
result = subprocess.run(
Expand Down
Loading