diff --git a/ci/docker/build-and-test-ubuntu.dockerfile b/ci/docker/build-and-test-ubuntu.dockerfile index 9a0e008a9..039bef8e5 100644 --- a/ci/docker/build-and-test-ubuntu.dockerfile +++ b/ci/docker/build-and-test-ubuntu.dockerfile @@ -8,6 +8,12 @@ FROM ${BASE_IMAGE} AS base ENV CONDA_PATH=/opt/conda ENV PATH=$PATH:$CONDA_PATH/bin +# Setup python requirements for JSON datafile validation +RUN pip install PyYAML +RUN pip install Brotli +RUN pip install schema +RUN pip install nanobind + COPY . /opt/src/vt-tv RUN mkdir -p /opt/build/vt-tv diff --git a/ci/python_build.sh b/ci/python_build.sh index 6c3fddb50..71b8022de 100644 --- a/ci/python_build.sh +++ b/ci/python_build.sh @@ -19,6 +19,9 @@ for env in $(conda env list | grep ^py | perl -lane 'print $F[-1]' | xargs ls -l # Build VT-TV python package pip install PyYAML + pip install Brotli + pip install schema + pip install nanobind pip install $VT_TV_SRC_DIR # Deactivate conda environment diff --git a/ci/setup_conda.sh b/ci/setup_conda.sh index 9d89c5c01..abb528457 100644 --- a/ci/setup_conda.sh +++ b/ci/setup_conda.sh @@ -41,6 +41,9 @@ do . $CONDA_PATH/etc/profile.d/conda.sh && conda activate py${python_version} echo "Python version: $(python --version)" + pip install PyYAML + pip install Brotli + pip install schema pip install nanobind conda deactivate echo "::endgroup::" diff --git a/scripts/json_datafile_validator.py b/scripts/json_datafile_validator.py new file mode 100644 index 000000000..fe1dc111f --- /dev/null +++ b/scripts/json_datafile_validator.py @@ -0,0 +1,488 @@ +""" JSON datafile validator +""" +import sys +import os +import re +import argparse +from collections import Counter +from collections.abc import Iterable +import json +import logging +import brotli +from lb_datafile_schema import LBDatafile_schema + +try: + project_path = f"{os.sep}".join(os.path.abspath(__file__).split(os.sep)[:-1]) + sys.path.append(project_path) +except Exception as e: + print(f"Can not add project path to system path! Exiting!\nERROR: {e}") + sys.exit(1) + + +from schema import And, Optional, Schema +# Import VT related schemas + +def exc_handler(exception_type, exception, traceback): + """ Exception handler for hiding traceback. + """ + module_name = f"[{os.path.splitext(os.path.split(__file__)[-1])[0]}]" + print(f"{module_name} {exception_type.__name__} {exception}") + + +class SchemaValidator: + """ Validates schema of VT Object Map files (json) + """ + def __init__(self, schema_type: str): + self.schema_type = schema_type + self.valid_schema = self._get_valid_schema() + + @staticmethod + def get_error_message(iterable_collection: Iterable) -> str: + """ Return error message. + """ + return " or ".join(iterable_collection) + + def _get_valid_schema(self) -> Schema: + """ Returns representation of a valid schema + """ + valid_schema_data = LBDatafile_schema + allowed_types_stats = "LBStatsfile" + valid_schema_stats = Schema( + { + Optional('type'): And( + str, + lambda a: a in allowed_types_stats, + error = f"{self.get_error_message(allowed_types_stats)} must be chosen" + ), + Optional('metadata'): { + Optional('type'): And( + str, + lambda a: a in allowed_types_stats, + error = f"{self.get_error_message(allowed_types_stats)} must be chosen" + ), + }, + 'phases': [ + { + "id": int, + Optional("migration count"): int, + Optional("post-LB"): { + "Object_comm": { + "avg": float, + "car": float, + "imb": float, + "kur": float, + "max": float, + "min": float, + "npr": float, + "skw": float, + "std": float, + "sum": float, + "var": float + }, + "Object_load_modeled": { + "avg": float, + "car": float, + "imb": float, + "kur": float, + "max": float, + "min": float, + "npr": float, + "skw": float, + "std": float, + "sum": float, + "var": float + }, + "Object_load_raw": { + "avg": float, + "car": float, + "imb": float, + "kur": float, + "max": float, + "min": float, + "npr": float, + "skw": float, + "std": float, + "sum": float, + "var": float + }, + Optional("Object_strategy_specific_load_modeled"): { + "avg": float, + "car": float, + "imb": float, + "kur": float, + "max": float, + "min": float, + "npr": float, + "skw": float, + "std": float, + "sum": float, + "var": float + }, + "Rank_comm": { + "avg": float, + "car": float, + "imb": float, + "kur": float, + "max": float, + "min": float, + "npr": float, + "skw": float, + "std": float, + "sum": float, + "var": float + }, + "Rank_load_modeled": { + "avg": float, + "car": float, + "imb": float, + "kur": float, + "max": float, + "min": float, + "npr": float, + "skw": float, + "std": float, + "sum": float, + "var": float + }, + "Rank_load_raw": { + "avg": float, + "car": float, + "imb": float, + "kur": float, + "max": float, + "min": float, + "npr": float, + "skw": float, + "std": float, + "sum": float, + "var": float + }, + Optional("Rank_strategy_specific_load_modeled"): { + "avg": float, + "car": float, + "imb": float, + "kur": float, + "max": float, + "min": float, + "npr": float, + "skw": float, + "std": float, + "sum": float, + "var": float + } + }, + "pre-LB": { + "Object_comm": { + "avg": float, + "car": float, + "imb": float, + "kur": float, + "max": float, + "min": float, + "npr": float, + "skw": float, + "std": float, + "sum": float, + "var": float + }, + "Object_load_modeled": { + "avg": float, + "car": float, + "imb": float, + "kur": float, + "max": float, + "min": float, + "npr": float, + "skw": float, + "std": float, + "sum": float, + "var": float + }, + "Object_load_raw": { + "avg": float, + "car": float, + "imb": float, + "kur": float, + "max": float, + "min": float, + "npr": float, + "skw": float, + "std": float, + "sum": float, + "var": float + }, + Optional("Object_strategy_specific_load_modeled"): { + "avg": float, + "car": float, + "imb": float, + "kur": float, + "max": float, + "min": float, + "npr": float, + "skw": float, + "std": float, + "sum": float, + "var": float + }, + "Rank_comm": { + "avg": float, + "car": float, + "imb": float, + "kur": float, + "max": float, + "min": float, + "npr": float, + "skw": float, + "std": float, + "sum": float, + "var": float + }, + "Rank_load_modeled": { + "avg": float, + "car": float, + "imb": float, + "kur": float, + "max": float, + "min": float, + "npr": float, + "skw": float, + "std": float, + "sum": float, + "var": float + }, + "Rank_load_raw": { + "avg": float, + "car": float, + "imb": float, + "kur": float, + "max": float, + "min": float, + "npr": float, + "skw": float, + "std": float, + "sum": float, + "var": float + }, + Optional("Rank_strategy_specific_load_modeled"): { + "avg": float, + "car": float, + "imb": float, + "kur": float, + "max": float, + "min": float, + "npr": float, + "skw": float, + "std": float, + "sum": float, + "var": float + } + } + }, + ] + } + ) + + if self.schema_type == "LBDatafile": + return valid_schema_data + elif self.schema_type == "LBStatsfile": + return valid_schema_stats + + sys.excepthook = exc_handler + raise TypeError(f"Unsupported schema type: {self.schema_type} was given") + + def is_valid(self, schema_to_validate: dict) -> bool: + """ Returns True if schema_to_validate is valid with self.valid_schema else False. """ + is_valid = self.valid_schema.is_valid(schema_to_validate) + return is_valid + + def validate(self, schema_to_validate: dict): + """ Return validated schema. """ + sys.excepthook = exc_handler + return self.valid_schema.validate(schema_to_validate) + + +def get_json(file_path): + """ Always try to decompress in case '.br' extension is missing. """ + with open(file_path, "rb") as json_file: + content = json_file.read() + try: + content = brotli.decompress(content) + except brotli.error as e: + logging.debug(f"No decompression applied for {file_path}: {e}") + return json.loads(content.decode("utf-8")) + +class JSONDataFilesValidator: + """ Class validating VT data files according do defined schema. """ + def __init__(self, file_path: str = None, dir_path: str = None, + file_prefix: str = None, file_suffix: str = None, + validate_comm_links: bool = False): + self.__file_path = file_path + self.__dir_path = dir_path + self.__file_prefix = file_prefix + self.__file_suffix = file_suffix + self.__validate_comm_links = validate_comm_links + self.__cli() + + def __cli(self): + """ Support for common line arguments. """ + parser = argparse.ArgumentParser() + group = parser.add_mutually_exclusive_group() + group.add_argument("--dir_path", help="Path to directory where files for validation are located.") + group.add_argument("--file_path", help="Path to a validated file. Pass only when validating a single file.") + parser.add_argument("--file_prefix", help="File prefix. Optional. Pass only when --dir_path is provided.") + parser.add_argument("--file_suffix", help="File suffix. Optional. Pass only when --dir_path is provided.") + parser.add_argument("--validate_comm_links", help='Verify that comm links reference tasks.', action='store_true') + parser.add_argument("--debug", help="Enable debug logging", action="store_true") + args = parser.parse_args() + if args.file_path: + self.__file_path = os.path.abspath(args.file_path) + if args.dir_path: + self.__dir_path = os.path.abspath(args.dir_path) + if args.file_prefix: + self.__file_prefix = args.file_prefix + if args.file_suffix: + self.__file_suffix = args.file_suffix + self.__validate_comm_links = args.validate_comm_links + logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO, format='%(levelname)s - %(filename)s:%(lineno)d - %(message)s') + + @staticmethod + def __get_files_for_validation(dir_path: str, file_prefix: str, file_suffix: str) -> list: + """ Get a sorted list of files from directory. """ + list_of_files = os.listdir(dir_path) + + if not list_of_files: + sys.excepthook = exc_handler + raise FileNotFoundError(f"Directory: {dir_path} is EMPTY") + + if file_prefix is None and file_suffix is None: + logging.info("File prefix and file suffix not given") + file_prefix = Counter([file.split('.')[0] for file in list_of_files]).most_common()[0][0] + logging.info(f"Found most common prefix: {file_prefix}") + file_suffix = Counter([file.split('.')[-1] for file in list_of_files]).most_common()[0][0] + logging.info(f"Found most common suffix: {file_suffix}") + + if file_prefix is not None: + list_of_files = [file for file in list_of_files if file.split('.')[0] == file_prefix] + + if file_suffix is not None: + list_of_files = [file for file in list_of_files if file.split('.')[-1] == file_suffix] + + return sorted([os.path.join(dir_path, file) for file in list_of_files], + key=lambda x: int(x.split(os.sep)[-1].split('.')[-2])) + + @staticmethod + def get_complete_dataset(file_path): + """ Returns all json files that share the same basename. """ + dirname = os.path.dirname(file_path) + basename = os.path.basename(file_path) + index = basename.rfind('0') + base = basename[0:index] + files = [os.path.join(dirname, f) for f in os.listdir(dirname) + if f.startswith(base) and (f.endswith(".json") or f.endswith(".json.br"))] + logging.debug(f"Dataset: {files}") + + return files + + @staticmethod + def get_nodes_info(file_path): + """ Returns node information from file name. """ + basename = os.path.basename(file_path) + nodes_info = re.findall(r'\d+', basename) + if not nodes_info: + return '-1', '-1' + elif len(nodes_info) == 1: + return '-1', nodes_info[0] + else: + return nodes_info[0], nodes_info[1] + + def __validate_file(self, file_path): + """ Validates the file against the schema. """ + logging.info(f"Validating file: {file_path}") + json_data = get_json(file_path) + + # Extracting type from JSON data + schema_type = None + if json_data.get("metadata") is not None: + schema_type = json_data.get("metadata").get("type") + else: + if json_data.get("type") is not None: + schema_type = json_data.get("type") + + if schema_type is not None: + # Validate schema + if SchemaValidator(schema_type=schema_type).is_valid(schema_to_validate=json_data): + logging.info(f"Valid JSON schema in {file_path}") + else: + logging.error(f"Invalid JSON schema in {file_path}") + SchemaValidator(schema_type=schema_type).validate(schema_to_validate=json_data) + else: + logging.warning(f"Schema type not found in file: {file_path}. \n" + "Passing by default when schema type not found.") + + if self.__validate_comm_links and schema_type == "LBDatafile": + num_nodes, current_node = self.get_nodes_info(file_path) + if num_nodes == '-1' and current_node == '-1': + # validate single file + all_jsons = [json_data] + elif current_node == '0': + # validate complete dataset + dataset_files = self.get_complete_dataset(file_path) + all_jsons = [get_json(file) for file in dataset_files] + else: + # this dataset is already validated + return + + if not self.validate_comm_links(all_jsons): + logging.error(f" Invalid dataset for file: {file_path}!") + + + @staticmethod + def validate_comm_links(all_jsons): + for n in range(len(all_jsons[0]["phases"])): + comm_ids = set() + task_ids = set() + + for data in all_jsons: + tasks = data["phases"][n]["tasks"] + task_ids.update( + {int(task["entity"].get("id", task["entity"].get("seq_id"))) for task in tasks} + ) + + if data["phases"][n].get("communications") is not None: + comms = data["phases"][n]["communications"] + comm_ids.update({int(comm["from"].get("id", comm["from"].get("seq_id"))) for comm in comms}) + comm_ids.update({int(comm["to"].get("id", comm["to"].get("seq_id"))) for comm in comms}) + + if not comm_ids.issubset(task_ids): + logging.error( + f" Phase {n}: Task ids: {comm_ids - task_ids}. Tasks are " + "referenced in communication, but are not present in the " + "dataset." + ) + return False + return True + + def main(self): + if self.__file_path is not None: + if os.path.isfile(self.__file_path): + self.__validate_file(file_path=self.__file_path) + else: + sys.excepthook = exc_handler + raise FileNotFoundError(f"File: {self.__file_path} NOT found") + elif self.__dir_path is not None: + if os.path.isdir(self.__dir_path): + list_of_files_for_validation = self.__get_files_for_validation(dir_path=self.__dir_path, + file_prefix=self.__file_prefix, + file_suffix=self.__file_suffix) + for file in list_of_files_for_validation: + self.__validate_file(file_path=file) + else: + sys.excepthook = exc_handler + raise FileNotFoundError(f"Directory: {self.__dir_path} does NOT exist") + else: + sys.excepthook = exc_handler + raise Exception("FILE path or DIRECTORY path has to be given") + + +if __name__ == "__main__": + JSONDataFilesValidator().main() \ No newline at end of file diff --git a/scripts/lb_datafile_schema.py b/scripts/lb_datafile_schema.py new file mode 100644 index 000000000..d56b757e7 --- /dev/null +++ b/scripts/lb_datafile_schema.py @@ -0,0 +1,101 @@ +"""LB data JSON file schema""" +from schema import And, Optional, Schema + +def validate_ids(field): + """ + Ensure that 1) either seq_id or id is provided, + and 2) if an object is migratable, collection_id has been set. + """ + if "seq_id" not in field and "id" not in field: + raise ValueError("Either id (bit-encoded) or seq_id must be provided.") + + if field.get("migratable") is True and "seq_id" in field and "collection_id" not in field: + raise ValueError("If an entity is migratable, it must have a collection_id") + + return field + +LBDatafile_schema = Schema( + { + Optional('type'): And(str, "LBDatafile", error="'LBDatafile' must be chosen."), + Optional('metadata'): { + Optional('type'): And(str, "LBDatafile", error="'LBDatafile' must be chosen."), + Optional('rank'): int, + Optional('shared_node'): { + 'id': int, + 'size': int, + 'rank': int, + 'num_nodes': int, + }, + Optional('phases'): { + Optional('count'): int, + 'skipped': { + 'list': [int], + 'range': [[int]], + }, + 'identical_to_previous': { + 'list': [int], + 'range': [[int]], + }, + }, + Optional('attributes'): dict + }, + 'phases': [ + { + 'id': int, + 'tasks': [ + { + 'entity': And({ + Optional('collection_id'): int, + 'home': int, + Optional('id'): int, + Optional('seq_id'): int, + Optional('index'): [int], + 'type': str, + 'migratable': bool, + Optional('objgroup_id'): int + }, validate_ids), + 'node': int, + 'resource': str, + Optional('subphases'): [ + { + 'id': int, + 'time': float, + } + ], + 'time': float, + Optional('user_defined'): dict, + Optional('attributes'): dict + }, + ], + Optional('communications'): [ + { + 'type': str, + 'to': And({ + 'type': str, + Optional('id'): int, + Optional('seq_id'): int, + Optional('home'): int, + Optional('collection_id'): int, + Optional('migratable'): bool, + Optional('index'): [int], + Optional('objgroup_id'): int, + }, validate_ids), + 'messages': int, + 'from': And({ + 'type': str, + Optional('id'): int, + Optional('seq_id'): int, + Optional('home'): int, + Optional('collection_id'): int, + Optional('migratable'): bool, + Optional('index'): [int], + Optional('objgroup_id'): int, + }, validate_ids), + 'bytes': float + } + ], + Optional('user_defined'): dict + }, + ] + } +) diff --git a/src/vt-tv/utility/json_reader.cc b/src/vt-tv/utility/json_reader.cc index ec4b85b5a..ca9b6a24f 100644 --- a/src/vt-tv/utility/json_reader.cc +++ b/src/vt-tv/utility/json_reader.cc @@ -52,6 +52,9 @@ #include #include +#include +#include +#include namespace vt::tv::utility { @@ -280,4 +283,30 @@ std::unique_ptr JSONReader::parse() { return std::make_unique(std::move(object_info), std::move(rank_info)); } +bool JSONReader::validate_datafile(std::string file_path) +{ + // Init + bool is_valid = true; + + // Prepare command line + std::string cmd; + cmd += "python"; + cmd += " "; + cmd += SRC_DIR; + cmd += "/scripts/json_datafile_validator.py"; + cmd += " "; + cmd += " --file_path="; + cmd += file_path.data(); + + // Exit code + int exit_code = std::system(cmd.c_str()); + + // Launch + if (exit_code > 0) { + is_valid = false; + } + + return is_valid; +} + } /* end namespace vt::tv::utility */ diff --git a/src/vt-tv/utility/json_reader.h b/src/vt-tv/utility/json_reader.h index 0c274a441..7c12139da 100644 --- a/src/vt-tv/utility/json_reader.h +++ b/src/vt-tv/utility/json_reader.h @@ -94,6 +94,12 @@ struct JSONReader { */ std::unique_ptr parse(); + /** + * \brief Check if a JSON data file is well formatted + * \param[in] file_path the data file path to validate + */ + bool validate_datafile(std::string file_path); + private: NodeType rank_ = 0; std::unique_ptr json_ = nullptr; diff --git a/src/vt-tv/utility/parse_render.cc b/src/vt-tv/utility/parse_render.cc index fb5af08c9..cb66f9446 100644 --- a/src/vt-tv/utility/parse_render.cc +++ b/src/vt-tv/utility/parse_render.cc @@ -103,12 +103,21 @@ void ParseRender::parseAndRender( fmt::print("Reading file for rank {}\n", rank); utility::JSONReader reader{static_cast(rank)}; - reader.readFile(filepath); - auto tmpInfo = reader.parse(); + + // Validate the JSON data file + std::string data_file_path = input_dir + "data." + std::to_string(rank) + ".json"; + if (reader.validate_datafile(data_file_path)) { + reader.readFile(data_file_path); + auto tmpInfo = reader.parse(); + #if VT_TV_OPENMP_ENABLED #pragma omp critical #endif { info->addInfo(tmpInfo->getObjectInfo(), tmpInfo->getRank(rank)); } + + } else { + throw std::runtime_error("JSON data file is invalid: " + data_file_path); + } } std::size_t n_ranks = config["input"]["n_ranks"].as(); if (info->getNumRanks() != n_ranks) { diff --git a/tests/test_bindings.py b/tests/test_bindings.py index bbbffc2a2..85f093b2b 100644 --- a/tests/test_bindings.py +++ b/tests/test_bindings.py @@ -1,5 +1,6 @@ """This module calls vttv module to test that vttv bindings work as expected""" import os +import subprocess import json import sys import yaml @@ -9,7 +10,7 @@ source_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # Read the YAML config file -with open(f"{source_dir}/tests/test_bindings_conf.yaml", "r", encoding="utf-8") as stream: +with open(f"{source_dir}/tests/test_bindings_conf.yaml", 'r', encoding="utf-8") as stream: try: params = yaml.safe_load(stream) except yaml.YAMLError as exc: @@ -38,8 +39,31 @@ rank_data = [] for rank in range(n_ranks): - with open(f"{source_dir}/data/lb_test_data/data.{rank}.json", "r", encoding="utf-8") as f: + # JSON data file for rank + datafile = f'{source_dir}/data/lb_test_data/data.{rank}.json' + + # Check JSON schema validity + IS_VALID: bool + try: + p = subprocess.run([ + 'python', + os.path.join(source_dir, 'scripts/json_datafile_validator.py'), + "--file_path=" + datafile + ], check=True, capture_output=True) + p.check_returncode() + IS_VALID = True + except subprocess.CalledProcessError as e: + IS_VALID = False + print(e.output.decode() + f"[JSON data file invalid] {datafile}") + + # If validation failed + if IS_VALID is False: + sys.exit(1) + + # Read JSON data file + with open(datafile, 'r', encoding="utf-8") as f: data = json.load(f) + data_serialized = json.dumps(data) # Add serialized data into the rank rank_data.append((json.dumps(data))) diff --git a/tests/unit/generator.h b/tests/unit/generator.h index 09bea5e70..c702edbdf 100644 --- a/tests/unit/generator.h +++ b/tests/unit/generator.h @@ -184,14 +184,23 @@ struct Generator { for (int64_t rank = 0; rank < n_ranks; rank++) { fmt::print("Reading file for rank {}\n", rank); JSONReader reader{static_cast(rank)}; - reader.readFile(input_dir + "data." + std::to_string(rank) + ".json"); - auto tmpInfo = reader.parse(); + + // Validate the JSON data file + std::string data_file_path = input_dir + "data." + std::to_string(rank) + ".json"; + if (reader.validate_datafile(data_file_path)) { + reader.readFile(data_file_path); + auto tmpInfo = reader.parse(); + #ifdef VT_TV_OPENMP_ENABLED #if VT_TV_OPENMP_ENABLED #pragma omp critical #endif #endif { info.addInfo(tmpInfo->getObjectInfo(), tmpInfo->getRank(rank)); } + + } else { + throw std::runtime_error("JSON data file is invalid: " + data_file_path); + } } return info; } diff --git a/tests/unit/render/test_render.cc b/tests/unit/render/test_render.cc index 6aa2cd8ed..ce10a4c21 100644 --- a/tests/unit/render/test_render.cc +++ b/tests/unit/render/test_render.cc @@ -365,9 +365,18 @@ TEST_F(RenderTest, test_render_construct_from_info) { for (NodeType rank = 0; rank < n_ranks; rank++) { utility::JSONReader reader{rank}; - reader.readFile(path + "/data." + std::to_string(rank) + ".json"); - auto tmpInfo = reader.parse(); - info->addInfo(tmpInfo->getObjectInfo(), tmpInfo->getRank(rank)); + + // Validate the JSON data file + std::string data_file_path = path + "/data." + std::to_string(rank) + ".json"; + if (reader.validate_datafile(data_file_path)) { + reader.readFile(data_file_path); + auto tmpInfo = reader.parse(); + + info->addInfo(tmpInfo->getObjectInfo(), tmpInfo->getRank(rank)); + } else { + ADD_FAILURE() << "JSON data file is invalid: " + data_file_path; + } + } fmt::print("Num ranks={}\n", info->getNumRanks());