Skip to content

Commit

Permalink
#16: bindings: Make json processing concurrent
Browse files Browse the repository at this point in the history
  • Loading branch information
pierrepebay committed Dec 16, 2023
1 parent 146901c commit 3ca694d
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 127 deletions.
264 changes: 138 additions & 126 deletions bindings/python/tv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,162 +75,174 @@ void tv_from_json(const std::vector<std::string>& input_json_per_rank_list, cons
fmt::print(" font_size: {}\n", font_size);

using json = nlohmann::json;
// Read the json for the rank
std::unique_ptr<Info> info = std::make_unique<Info>();

assert(input_json_per_rank_list.size() == num_ranks && "Must have the same number of json files as ranks");

for (NodeType rank_id = 0; rank_id < num_ranks; rank_id++) {
#ifdef VT_TV_NUM_THREADS
const int threads = VT_TV_NUM_THREADS;
#else
const int threads = 2;
#endif
omp_set_num_threads(threads);
// print number of threads
fmt::print("vt-tv: Using {} threads\n", threads);

// Initialize the info object, that will hold data for all ranks for all phases
std::unique_ptr<Info> info = std::make_unique<Info>();

# pragma omp parallel for
for (int64_t rank_id = 0; rank_id < num_ranks; rank_id++) {
std::string rank_json_str = input_json_per_rank_list[rank_id];
try {
std::cerr << "vt-tv: Parsing JSON for rank " << rank_id << "\n";
auto j = json::parse(rank_json_str);
assert(j != nullptr && "Must have valid json");

std::cerr << "vt-tv: Writing JSON to disk for rank " << rank_id << "\n";
std::string filename = fmt::format("/home/pierrelp/Develop/NGA/vt-tv/output_{}.json", rank_id);
std::ofstream o(filename);
o << std::setw(2) << j << std::endl;
o.close();

std::unordered_map<ElementIDType, ObjectInfo> object_info;
std::unordered_map<PhaseType, PhaseWork> phase_info;

std::cerr << "vt-tv: Reading rank " << rank_id << "\n";

auto phases = j["phases"];

if (phases.is_array()) {
for (auto const& phase : phases) {
auto id = phase["id"];
std::cerr << "vt-tv: Reading phase " << id << "\n";
auto tasks = phase["tasks"];

std::unordered_map<ElementIDType, ObjectWork> objects;

if (tasks.is_array()) {
for (auto const& task : tasks) {
auto node = task["node"];
auto time = task["time"];
auto etype = task["entity"]["type"];
assert(time.is_number());
assert(node.is_number());

if (etype == "object") {
auto object = task["entity"]["id"];
auto home = task["entity"]["home"];
bool migratable = task["entity"]["migratable"];
assert(object.is_number());
assert(home.is_number());

std::cerr << "vt-tv: Processing object " << object << " in phase " << id << "\n";
std::vector<UniqueIndexBitType> index_arr;


if (
task["entity"].find("collection_id") != task["entity"].end() and
task["entity"].find("index") != task["entity"].end()
) {
auto cid = task["entity"]["collection_id"];
auto idx = task["entity"]["index"];
if (cid.is_number() && idx.is_array()) {
std::vector<UniqueIndexBitType> arr = idx;
index_arr = std::move(arr);
try {
std::cerr << "vt-tv: Parsing JSON for rank " << rank_id << "\n";
auto j = json::parse(rank_json_str);
assert(j != nullptr && "Must have valid json");

// std::cerr << "vt-tv: Writing JSON to disk for rank " << rank_id << "\n";
std::string filename = fmt::format("/home/pierrelp/Develop/NGA/vt-tv/output_{}.json", rank_id);
std::ofstream o(filename);
o << std::setw(2) << j << std::endl;
o.close();

std::unordered_map<ElementIDType, ObjectInfo> object_info;
std::unordered_map<PhaseType, PhaseWork> phase_info;

// std::cerr << "vt-tv: Reading rank " << rank_id << "\n";

auto phases = j["phases"];

if (phases.is_array()) {
for (auto const& phase : phases) {
auto id = phase["id"];
// std::cerr << "vt-tv: Reading phase " << id << "\n";
auto tasks = phase["tasks"];

std::unordered_map<ElementIDType, ObjectWork> objects;

if (tasks.is_array()) {
for (auto const& task : tasks) {
auto node = task["node"];
auto time = task["time"];
auto etype = task["entity"]["type"];
assert(time.is_number());
assert(node.is_number());

if (etype == "object") {
auto object = task["entity"]["id"];
auto home = task["entity"]["home"];
bool migratable = task["entity"]["migratable"];
assert(object.is_number());
assert(home.is_number());

// std::cerr << "vt-tv: Processing object " << object << " in phase " << id << "\n";
std::vector<UniqueIndexBitType> index_arr;


if (
task["entity"].find("collection_id") != task["entity"].end() and
task["entity"].find("index") != task["entity"].end()
) {
auto cid = task["entity"]["collection_id"];
auto idx = task["entity"]["index"];
if (cid.is_number() && idx.is_array()) {
std::vector<UniqueIndexBitType> arr = idx;
index_arr = std::move(arr);
}
}
}

ObjectInfo oi{object, home, migratable, std::move(index_arr)};
ObjectInfo oi{object, home, migratable, std::move(index_arr)};

if (task["entity"].find("collection_id") != task["entity"].end()) {
oi.setIsCollection(true);
oi.setMetaID(task["entity"]["collection_id"]);
}
if (task["entity"].find("collection_id") != task["entity"].end()) {
oi.setIsCollection(true);
oi.setMetaID(task["entity"]["collection_id"]);
}

if (task["entity"].find("objgroup_id") != task["entity"].end()) {
oi.setIsObjGroup(true);
oi.setMetaID(task["entity"]["objgroup_id"]);
}
if (task["entity"].find("objgroup_id") != task["entity"].end()) {
oi.setIsObjGroup(true);
oi.setMetaID(task["entity"]["objgroup_id"]);
}

object_info.try_emplace(object, std::move(oi));
object_info.try_emplace(object, std::move(oi));

std::unordered_map<SubphaseType, TimeType> subphase_loads;
std::unordered_map<SubphaseType, TimeType> subphase_loads;

if (task.find("subphases") != task.end()) {
auto subphases = task["subphases"];
if (subphases.is_array()) {
for (auto const& s : subphases) {
auto sid = s["id"];
auto stime = s["time"];
if (task.find("subphases") != task.end()) {
auto subphases = task["subphases"];
if (subphases.is_array()) {
for (auto const& s : subphases) {
auto sid = s["id"];
auto stime = s["time"];

assert(sid.is_number());
assert(stime.is_number());
assert(sid.is_number());
assert(stime.is_number());

subphase_loads[sid] = stime;
subphase_loads[sid] = stime;
}
}
}
}

std::unordered_map<std::string, ObjectWork::VariantType> user_defined;
if (task.find("user_defined") != task.end()) {
auto user_defined = task["user_defined"];
if (user_defined.is_object()) {
for (auto& [key, value] : user_defined.items()) {
user_defined[key] = value;
std::unordered_map<std::string, ObjectWork::VariantType> user_defined;
if (task.find("user_defined") != task.end()) {
auto user_defined = task["user_defined"];
if (user_defined.is_object()) {
for (auto& [key, value] : user_defined.items()) {
user_defined[key] = value;
}
}
}
// fmt::print(" Add object {}\n", (ElementIDType)object);
objects.try_emplace(
object,
ObjectWork{
object, time, std::move(subphase_loads), std::move(user_defined)
}
);
}
// fmt::print(" Add object {}\n", (ElementIDType)object);
objects.try_emplace(
object,
ObjectWork{
object, time, std::move(subphase_loads), std::move(user_defined)
}
);
}
}
}

if (phase.find("communications") != phase.end()) {
auto communications = phase["communications"];
if (communications.is_array()) {
for (auto const& comm : communications) {
auto type = comm["type"];
if (type == "SendRecv") {
auto bytes = comm["bytes"];
auto messages = comm["messages"];

auto from = comm["from"];
auto to = comm["to"];

ElementIDType from_id = from["id"];
ElementIDType to_id = to["id"];

assert(bytes.is_number());
assert(from.is_number());
assert(to.is_number());

// fmt::print(" From: {}, to: {}\n", from_id, to_id);
// Object on this rank sent data
if (objects.find(from_id) != objects.end()) {
objects.at(from_id).addSentCommunications(to_id, bytes);
} else if (objects.find(to_id) != objects.end()) {
objects.at(to_id).addReceivedCommunications(from_id, bytes);
if (phase.find("communications") != phase.end()) {
auto communications = phase["communications"];
if (communications.is_array()) {
for (auto const& comm : communications) {
auto type = comm["type"];
if (type == "SendRecv") {
auto bytes = comm["bytes"];
auto messages = comm["messages"];

auto from = comm["from"];
auto to = comm["to"];

ElementIDType from_id = from["id"];
ElementIDType to_id = to["id"];

assert(bytes.is_number());
assert(from.is_number());
assert(to.is_number());

// fmt::print(" From: {}, to: {}\n", from_id, to_id);
// Object on this rank sent data
if (objects.find(from_id) != objects.end()) {
objects.at(from_id).addSentCommunications(to_id, bytes);
} else if (objects.find(to_id) != objects.end()) {
objects.at(to_id).addReceivedCommunications(from_id, bytes);
}
}
}
}
}
phase_info.try_emplace(id, PhaseWork{id, std::move(objects)});
}
phase_info.try_emplace(id, PhaseWork{id, std::move(objects)});
}
}
fmt::print(" vt-tv: Adding rank {}\n", rank_id);
Rank r{rank_id, std::move(phase_info)};
fmt::print(" vt-tv: Adding rank {}\n", rank_id);
Rank r{rank_id, std::move(phase_info)};

info->addInfo(std::move(object_info), std::move(r));
# pragma omp critical
{
info->addInfo(std::move(object_info), std::move(r));
}

}
catch(const std::exception& e)
{
} catch(const std::exception& e) {
std::cerr << "vt-tv: Error reading data for rank " << rank_id << ": " << e.what() << '\n';
}
}
Expand Down
2 changes: 2 additions & 0 deletions bindings/python/tv.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
#include <filesystem>
#include <map>

#include <omp.h>

namespace vt::tv::bindings::python {

void tv_from_json(const std::vector<std::string>&, const std::string&, uint64_t);
Expand Down
10 changes: 9 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,17 @@ def build_extension(self, ext):

jobs = os.environ.get('VTTV_JOBS', os.cpu_count())

n_threads = os.environ.get('VTTV_N_THREADS', 1)
# check if n_threads is a valid integer
try:
n_threads = int(n_threads)
except ValueError:
raise RuntimeError("Environment variable VTTV_N_THREADS must be an integer")

cmake_args = ['-DCMAKE_LIBRARY_OUTPUT_DIRECTORY=' + extdir,
'-DPYTHON_EXECUTABLE=' + sys.executable,
'-DVTK_DIR=' + vtk_dir]
'-DVTK_DIR=' + vtk_dir,
'-DVT_TV_NUM_THREADS=' + str(n_threads)]

if sys.platform == "darwin":
import platform
Expand Down

0 comments on commit 3ca694d

Please sign in to comment.