From 3ca694dbccba50ce75e6745d92a5010b41befecd Mon Sep 17 00:00:00 2001 From: pierrepebay Date: Sat, 16 Dec 2023 13:41:44 +0100 Subject: [PATCH] #16: bindings: Make json processing concurrent --- bindings/python/tv.cc | 264 ++++++++++++++++++++++-------------------- bindings/python/tv.h | 2 + setup.py | 10 +- 3 files changed, 149 insertions(+), 127 deletions(-) diff --git a/bindings/python/tv.cc b/bindings/python/tv.cc index 670e671443..67d315653a 100644 --- a/bindings/python/tv.cc +++ b/bindings/python/tv.cc @@ -75,162 +75,174 @@ void tv_from_json(const std::vector& input_json_per_rank_list, cons fmt::print(" font_size: {}\n", font_size); using json = nlohmann::json; - // Read the json for the rank - std::unique_ptr info = std::make_unique(); assert(input_json_per_rank_list.size() == num_ranks && "Must have the same number of json files as ranks"); - for (NodeType rank_id = 0; rank_id < num_ranks; rank_id++) { + #ifdef VT_TV_NUM_THREADS + const int threads = VT_TV_NUM_THREADS; + #else + const int threads = 2; + #endif + omp_set_num_threads(threads); + // print number of threads + fmt::print("vt-tv: Using {} threads\n", threads); + + // Initialize the info object, that will hold data for all ranks for all phases + std::unique_ptr info = std::make_unique(); + + # pragma omp parallel for + for (int64_t rank_id = 0; rank_id < num_ranks; rank_id++) { std::string rank_json_str = input_json_per_rank_list[rank_id]; - try { - std::cerr << "vt-tv: Parsing JSON for rank " << rank_id << "\n"; - auto j = json::parse(rank_json_str); - assert(j != nullptr && "Must have valid json"); - - std::cerr << "vt-tv: Writing JSON to disk for rank " << rank_id << "\n"; - std::string filename = fmt::format("/home/pierrelp/Develop/NGA/vt-tv/output_{}.json", rank_id); - std::ofstream o(filename); - o << std::setw(2) << j << std::endl; - o.close(); - - std::unordered_map object_info; - std::unordered_map phase_info; - - std::cerr << "vt-tv: Reading rank " << rank_id << "\n"; - - auto phases = j["phases"]; - - if (phases.is_array()) { - for (auto const& phase : phases) { - auto id = phase["id"]; - std::cerr << "vt-tv: Reading phase " << id << "\n"; - auto tasks = phase["tasks"]; - - std::unordered_map objects; - - if (tasks.is_array()) { - for (auto const& task : tasks) { - auto node = task["node"]; - auto time = task["time"]; - auto etype = task["entity"]["type"]; - assert(time.is_number()); - assert(node.is_number()); - - if (etype == "object") { - auto object = task["entity"]["id"]; - auto home = task["entity"]["home"]; - bool migratable = task["entity"]["migratable"]; - assert(object.is_number()); - assert(home.is_number()); - - std::cerr << "vt-tv: Processing object " << object << " in phase " << id << "\n"; - std::vector index_arr; - - - if ( - task["entity"].find("collection_id") != task["entity"].end() and - task["entity"].find("index") != task["entity"].end() - ) { - auto cid = task["entity"]["collection_id"]; - auto idx = task["entity"]["index"]; - if (cid.is_number() && idx.is_array()) { - std::vector arr = idx; - index_arr = std::move(arr); + try { + std::cerr << "vt-tv: Parsing JSON for rank " << rank_id << "\n"; + auto j = json::parse(rank_json_str); + assert(j != nullptr && "Must have valid json"); + + // std::cerr << "vt-tv: Writing JSON to disk for rank " << rank_id << "\n"; + std::string filename = fmt::format("/home/pierrelp/Develop/NGA/vt-tv/output_{}.json", rank_id); + std::ofstream o(filename); + o << std::setw(2) << j << std::endl; + o.close(); + + std::unordered_map object_info; + std::unordered_map phase_info; + + // std::cerr << "vt-tv: Reading rank " << rank_id << "\n"; + + auto phases = j["phases"]; + + if (phases.is_array()) { + for (auto const& phase : phases) { + auto id = phase["id"]; + // std::cerr << "vt-tv: Reading phase " << id << "\n"; + auto tasks = phase["tasks"]; + + std::unordered_map objects; + + if (tasks.is_array()) { + for (auto const& task : tasks) { + auto node = task["node"]; + auto time = task["time"]; + auto etype = task["entity"]["type"]; + assert(time.is_number()); + assert(node.is_number()); + + if (etype == "object") { + auto object = task["entity"]["id"]; + auto home = task["entity"]["home"]; + bool migratable = task["entity"]["migratable"]; + assert(object.is_number()); + assert(home.is_number()); + + // std::cerr << "vt-tv: Processing object " << object << " in phase " << id << "\n"; + std::vector index_arr; + + + if ( + task["entity"].find("collection_id") != task["entity"].end() and + task["entity"].find("index") != task["entity"].end() + ) { + auto cid = task["entity"]["collection_id"]; + auto idx = task["entity"]["index"]; + if (cid.is_number() && idx.is_array()) { + std::vector arr = idx; + index_arr = std::move(arr); + } } - } - ObjectInfo oi{object, home, migratable, std::move(index_arr)}; + ObjectInfo oi{object, home, migratable, std::move(index_arr)}; - if (task["entity"].find("collection_id") != task["entity"].end()) { - oi.setIsCollection(true); - oi.setMetaID(task["entity"]["collection_id"]); - } + if (task["entity"].find("collection_id") != task["entity"].end()) { + oi.setIsCollection(true); + oi.setMetaID(task["entity"]["collection_id"]); + } - if (task["entity"].find("objgroup_id") != task["entity"].end()) { - oi.setIsObjGroup(true); - oi.setMetaID(task["entity"]["objgroup_id"]); - } + if (task["entity"].find("objgroup_id") != task["entity"].end()) { + oi.setIsObjGroup(true); + oi.setMetaID(task["entity"]["objgroup_id"]); + } - object_info.try_emplace(object, std::move(oi)); + object_info.try_emplace(object, std::move(oi)); - std::unordered_map subphase_loads; + std::unordered_map subphase_loads; - if (task.find("subphases") != task.end()) { - auto subphases = task["subphases"]; - if (subphases.is_array()) { - for (auto const& s : subphases) { - auto sid = s["id"]; - auto stime = s["time"]; + if (task.find("subphases") != task.end()) { + auto subphases = task["subphases"]; + if (subphases.is_array()) { + for (auto const& s : subphases) { + auto sid = s["id"]; + auto stime = s["time"]; - assert(sid.is_number()); - assert(stime.is_number()); + assert(sid.is_number()); + assert(stime.is_number()); - subphase_loads[sid] = stime; + subphase_loads[sid] = stime; + } } } - } - std::unordered_map user_defined; - if (task.find("user_defined") != task.end()) { - auto user_defined = task["user_defined"]; - if (user_defined.is_object()) { - for (auto& [key, value] : user_defined.items()) { - user_defined[key] = value; + std::unordered_map user_defined; + if (task.find("user_defined") != task.end()) { + auto user_defined = task["user_defined"]; + if (user_defined.is_object()) { + for (auto& [key, value] : user_defined.items()) { + user_defined[key] = value; + } } } + // fmt::print(" Add object {}\n", (ElementIDType)object); + objects.try_emplace( + object, + ObjectWork{ + object, time, std::move(subphase_loads), std::move(user_defined) + } + ); } - // fmt::print(" Add object {}\n", (ElementIDType)object); - objects.try_emplace( - object, - ObjectWork{ - object, time, std::move(subphase_loads), std::move(user_defined) - } - ); } } - } - if (phase.find("communications") != phase.end()) { - auto communications = phase["communications"]; - if (communications.is_array()) { - for (auto const& comm : communications) { - auto type = comm["type"]; - if (type == "SendRecv") { - auto bytes = comm["bytes"]; - auto messages = comm["messages"]; - - auto from = comm["from"]; - auto to = comm["to"]; - - ElementIDType from_id = from["id"]; - ElementIDType to_id = to["id"]; - - assert(bytes.is_number()); - assert(from.is_number()); - assert(to.is_number()); - - // fmt::print(" From: {}, to: {}\n", from_id, to_id); - // Object on this rank sent data - if (objects.find(from_id) != objects.end()) { - objects.at(from_id).addSentCommunications(to_id, bytes); - } else if (objects.find(to_id) != objects.end()) { - objects.at(to_id).addReceivedCommunications(from_id, bytes); + if (phase.find("communications") != phase.end()) { + auto communications = phase["communications"]; + if (communications.is_array()) { + for (auto const& comm : communications) { + auto type = comm["type"]; + if (type == "SendRecv") { + auto bytes = comm["bytes"]; + auto messages = comm["messages"]; + + auto from = comm["from"]; + auto to = comm["to"]; + + ElementIDType from_id = from["id"]; + ElementIDType to_id = to["id"]; + + assert(bytes.is_number()); + assert(from.is_number()); + assert(to.is_number()); + + // fmt::print(" From: {}, to: {}\n", from_id, to_id); + // Object on this rank sent data + if (objects.find(from_id) != objects.end()) { + objects.at(from_id).addSentCommunications(to_id, bytes); + } else if (objects.find(to_id) != objects.end()) { + objects.at(to_id).addReceivedCommunications(from_id, bytes); + } } } } } + phase_info.try_emplace(id, PhaseWork{id, std::move(objects)}); } - phase_info.try_emplace(id, PhaseWork{id, std::move(objects)}); } - } - fmt::print(" vt-tv: Adding rank {}\n", rank_id); - Rank r{rank_id, std::move(phase_info)}; + fmt::print(" vt-tv: Adding rank {}\n", rank_id); + Rank r{rank_id, std::move(phase_info)}; - info->addInfo(std::move(object_info), std::move(r)); + # pragma omp critical + { + info->addInfo(std::move(object_info), std::move(r)); + } - } - catch(const std::exception& e) - { + } catch(const std::exception& e) { std::cerr << "vt-tv: Error reading data for rank " << rank_id << ": " << e.what() << '\n'; } } diff --git a/bindings/python/tv.h b/bindings/python/tv.h index 4698de1800..34b9dea243 100644 --- a/bindings/python/tv.h +++ b/bindings/python/tv.h @@ -63,6 +63,8 @@ #include #include +#include + namespace vt::tv::bindings::python { void tv_from_json(const std::vector&, const std::string&, uint64_t); diff --git a/setup.py b/setup.py index a66b0b88c8..d753ffa0f0 100644 --- a/setup.py +++ b/setup.py @@ -35,9 +35,17 @@ def build_extension(self, ext): jobs = os.environ.get('VTTV_JOBS', os.cpu_count()) + n_threads = os.environ.get('VTTV_N_THREADS', 1) + # check if n_threads is a valid integer + try: + n_threads = int(n_threads) + except ValueError: + raise RuntimeError("Environment variable VTTV_N_THREADS must be an integer") + cmake_args = ['-DCMAKE_LIBRARY_OUTPUT_DIRECTORY=' + extdir, '-DPYTHON_EXECUTABLE=' + sys.executable, - '-DVTK_DIR=' + vtk_dir] + '-DVTK_DIR=' + vtk_dir, + '-DVT_TV_NUM_THREADS=' + str(n_threads)] if sys.platform == "darwin": import platform