diff --git a/CHANGES.rst b/CHANGES.rst index fd36eb397..d1ac0858e 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -12,11 +12,14 @@ Changes Changes: -------- -- No change. +- Add more tests to validate core code paths of ``builtin`` `Process` ``jsonarray2netcdf``, ``metalink2netcdf`` and + ``file_index_selector`` with validation of happy path and error handling conditions. Fixes: ------ -- No change. +- Fix invalid parsing of `XML` Metalink files in ``metalink2netcdf``. Metalink V3 and V4 will now properly consider the + namespace and specific content structure to extract the NetCDF URL reference, and the `Process` will validate that the + extracted reference respects the NetCDF extension. .. _changes_4.34.0: diff --git a/tests/functional/test_builtin.py b/tests/functional/test_builtin.py index 2e4067473..9c3ff8221 100644 --- a/tests/functional/test_builtin.py +++ b/tests/functional/test_builtin.py @@ -5,12 +5,13 @@ from typing import TYPE_CHECKING import pytest +from pywps.inout.outputs import MetaFile, MetaLink, MetaLink4 from tests.functional.utils import WpsConfigBase from tests.utils import get_settings_from_testapp, mocked_execute_celery, mocked_sub_requests from weaver.execute import ExecuteControlOption, ExecuteMode, ExecuteResponse, ExecuteTransmissionMode -from weaver.formats import ContentType -from weaver.processes.builtin import register_builtin_processes +from weaver.formats import ContentType, get_format, repr_json +from weaver.processes.builtin import file_index_selector, jsonarray2netcdf, metalink2netcdf, register_builtin_processes from weaver.status import Status if TYPE_CHECKING: @@ -35,7 +36,7 @@ def setUpClass(cls): def setUp(self): # register builtin processes from scratch to have clean state self.process_store.clear_processes() - register_builtin_processes(self.settings) + register_builtin_processes(self.settings) # type: ignore def test_jsonarray2netcdf_describe_old_schema(self): resp = self.app.get("/processes/jsonarray2netcdf?schema=OLD", headers=self.json_headers) @@ -362,3 +363,234 @@ def test_jsonarray2netcdf_execute_sync(self): outputs = resp.json self.validate_results(results, outputs, nc_data, None) + + +def test_jsonarray2netcdf_process(): + with contextlib.ExitStack() as stack: + data = {} + for idx in range(3): + tmp_nc = stack.enter_context(tempfile.NamedTemporaryFile(mode="w", suffix=".nc")) + tmp_nc_data = f"data NetCDF {idx}" + tmp_nc.write(tmp_nc_data) + tmp_nc.flush() + tmp_nc.seek(0) + tmp_nc_href = f"file://{tmp_nc.name}" + data[tmp_nc_href] = tmp_nc_data + tmp_json = stack.enter_context(tempfile.NamedTemporaryFile(mode="w", suffix=".json")) + json.dump(list(data), tmp_json, indent=2) + tmp_json.flush() + tmp_json.seek(0) + tmp_out_dir = stack.enter_context(tempfile.TemporaryDirectory()) + + with pytest.raises(SystemExit) as err: + jsonarray2netcdf.main("-i", tmp_json.name, "-o", tmp_out_dir) + assert err.value.code in [None, 0] + + for nc_file, nc_data in data.items(): + nc_name = os.path.split(nc_file)[-1] + nc_path = os.path.join(tmp_out_dir, nc_name) + assert os.path.isfile(nc_path) + with open(nc_path, mode="r", encoding="utf-8") as nc_ref: + assert nc_ref.read() == nc_data + + +@pytest.mark.parametrize( + "test_data", + [ + 1, + "", + "abc", + {}, + [ + 1, + 2, + ], + [ + "abc", + "xyz", + ], + [ + "/tmp/does-not-exist/fake-file.txt", # noqa + "/tmp/does-not-exist/fake-file.nc", # noqa + ] + ] +) +def test_jsonarray2netcdf_invalid_json(test_data): + with contextlib.ExitStack() as stack: + tmp_out_dir = stack.enter_context(tempfile.TemporaryDirectory()) + tmp_file = stack.enter_context(tempfile.NamedTemporaryFile(mode="w", suffix=".json")) + tmp_file.write(repr_json(test_data, force_string=True)) + tmp_file.flush() + tmp_file.seek(0) + + with pytest.raises(ValueError) as err: + jsonarray2netcdf.main("-i", tmp_file.name, "-o", tmp_out_dir) + assert str(err.value) == "Invalid JSON file format, expected a plain array of NetCDF file URL strings." + + +@pytest.mark.parametrize( + "args", + [ + [], + ["-i"], + ] +) +def test_jsonarray2netcdf_missing_params(args): + with pytest.raises(SystemExit) as err: + jsonarray2netcdf.main(*args) + assert err.value.code == 2 + + +def test_jsonarray2netcdf_invalid_out_dir(): + with tempfile.TemporaryDirectory() as tmp_dir: + tmp_out_dir = os.path.join(tmp_dir, "random") + + with pytest.raises(ValueError) as err: + jsonarray2netcdf.main("-i", "", "-o", tmp_out_dir) + assert "does not exist" in str(err.value) + + +@pytest.mark.parametrize( + ["metalink_cls", "metalink_ext", "test_index"], + [ + (MetaLink, ".metalink", 2), + (MetaLink4, ".meta4", 2) + ] +) +def test_metalink2netcdf_process(metalink_cls, metalink_ext, test_index): + with contextlib.ExitStack() as stack: + data = {} + meta_files = [] + nc_fmt = get_format(ContentType.APP_NETCDF) + for idx in range(3): + tmp_nc = stack.enter_context(tempfile.NamedTemporaryFile(mode="w", suffix=".nc")) + tmp_nc_data = f"data NetCDF {idx}" + tmp_nc.write(tmp_nc_data) + tmp_nc.flush() + tmp_nc.seek(0) + data[idx] = {"name": tmp_nc.name, "data": tmp_nc_data} + tmp_meta_file = MetaFile(identity=str(idx), fmt=nc_fmt) + tmp_meta_file.file = tmp_nc.name + meta_files.append(tmp_meta_file) + metalink = metalink_cls(identity="test", workdir=tempfile.gettempdir(), files=tuple(meta_files)) + tmp_meta = stack.enter_context(tempfile.NamedTemporaryFile(mode="w", suffix=metalink_ext)) + tmp_meta.write(metalink.xml) + tmp_meta.flush() + tmp_meta.seek(0) + tmp_out_dir = stack.enter_context(tempfile.TemporaryDirectory()) + + with pytest.raises(SystemExit) as err: + metalink2netcdf.main("-i", tmp_meta.name, "-n", str(test_index), "-o", tmp_out_dir) + assert err.value.code in [None, 0] + + for idx in range(3): + nc_out_name = os.path.split(data[idx]["name"])[-1] + nc_out_path = os.path.join(tmp_out_dir, nc_out_name) + if idx + 1 == test_index: # index is 1-based in XPath + assert os.path.isfile(nc_out_path) + with open(nc_out_path, mode="r", encoding="utf-8") as nc_out_file: + nc_out_data = nc_out_file.read() + os.remove(nc_out_path) + assert nc_out_data == data[idx]["data"] + else: + assert not os.path.isfile(nc_out_path) + + +def test_metalink2netcdf_reference_not_netcdf(): + with contextlib.ExitStack() as stack: + metafile = MetaFile(fmt=get_format(ContentType.APP_NETCDF)) + tmp_text = stack.enter_context(tempfile.NamedTemporaryFile(mode="w", suffix=".text")) + tmp_text.write("dont care") + tmp_text.flush() + tmp_text.seek(0) + metafile.file = tmp_text.name + metalink = MetaLink4(identity="test", workdir=tempfile.gettempdir(), files=tuple([metafile])) + tmp_meta = stack.enter_context(tempfile.NamedTemporaryFile(mode="w", suffix=".meta4")) + tmp_meta.write(metalink.xml) + tmp_meta.flush() + tmp_meta.seek(0) + tmp_out_dir = stack.enter_context(tempfile.TemporaryDirectory()) + + with pytest.raises(ValueError) as err: + metalink2netcdf.main("-i", tmp_meta.name, "-n", "1", "-o", tmp_out_dir) + assert "not a valid NetCDF" in str(err.value) + + +@pytest.mark.parametrize( + "args", + [ + ["-i"], + ["-i", ""], + ["-n"], + ["-n", "1"], + ] +) +def test_metalink2netcdf_missing_params(args): + with pytest.raises(SystemExit) as err: + metalink2netcdf.main(*args) + assert err.value.code == 2 + + +def test_metalink2netcdf_invalid_out_dir(): + with tempfile.TemporaryDirectory() as tmp_dir: + tmp_out_dir = os.path.join(tmp_dir, "random") + + with pytest.raises(ValueError) as err: + metalink2netcdf.main("-i", "", "-n", "1", "-o", tmp_out_dir) + assert "does not exist" in str(err.value) + + +def test_file_index_selector_process(): + with contextlib.ExitStack() as stack: + data = {} + test_files = [] + for idx, ext in enumerate([".txt", ".nc", ".tiff"]): + tmp_file = stack.enter_context(tempfile.NamedTemporaryFile(mode="w", suffix=ext)) + tmp_data = f"data {idx}" + tmp_file.write(tmp_data) + tmp_file.flush() + tmp_file.seek(0) + tmp_href = f"file://{tmp_file.name}" + data[idx] = {"name": tmp_file.name, "data": tmp_data, "href": tmp_href} + test_files.append(tmp_href) + tmp_out_dir = stack.enter_context(tempfile.TemporaryDirectory()) + + test_index = 1 + with pytest.raises(SystemExit) as err: + file_index_selector.main("-f", *test_files, "-i", str(test_index), "-o", tmp_out_dir) + assert err.value.code in [None, 0] + for idx, tmp_info in data.items(): + out_name = os.path.split(tmp_info["name"])[-1] + out_path = os.path.join(tmp_out_dir, out_name) + if idx == test_index: + assert os.path.isfile(out_path) + with open(out_path, mode="r", encoding="utf-8") as out_file: + out_data = out_file.read() + os.remove(out_path) + assert out_data == tmp_info["data"] + else: + assert not os.path.isfile(out_path) + + +@pytest.mark.parametrize( + "args", + [ + ["-f"], + ["-f", ""], + ["-i"], + ["-i", "1"], + ] +) +def test_file_index_selector_missing_params(args): + with pytest.raises(SystemExit) as err: + file_index_selector.main(*args) + assert err.value.code == 2 + + +def test_file_index_selector_invalid_out_dir(): + with tempfile.TemporaryDirectory() as tmp_dir: + tmp_out_dir = os.path.join(tmp_dir, "random") + + with pytest.raises(ValueError) as err: + file_index_selector.main("-f", "", "-i", "1", "-o", tmp_out_dir) + assert "does not exist" in str(err.value) diff --git a/weaver/processes/builtin/file_index_selector.py b/weaver/processes/builtin/file_index_selector.py index 37b6fcc22..a18e00e06 100644 --- a/weaver/processes/builtin/file_index_selector.py +++ b/weaver/processes/builtin/file_index_selector.py @@ -6,7 +6,6 @@ import argparse import logging import os -import shutil import sys from typing import TYPE_CHECKING @@ -18,6 +17,10 @@ # root to allow 'from weaver import <...>' sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(CUR_DIR)))) +# place weaver specific imports after sys path fixing to ensure they are found from external call +# pylint: disable=C0413,wrong-import-order +from weaver.utils import fetch_file # isort:skip # noqa: E402 + PACKAGE_NAME = os.path.split(os.path.splitext(__file__)[0])[-1] # setup logger since it is not run from the main 'weaver' app @@ -26,7 +29,7 @@ LOGGER.setLevel(logging.INFO) # process details -__version__ = "1.1" +__version__ = "1.2" __title__ = "File Index Selector" __abstract__ = __doc__ # NOTE: '__doc__' is fetched directly, this is mostly to be informative @@ -38,7 +41,7 @@ def select(files, index, output_dir): try: if not os.path.isdir(output_dir): raise ValueError(f"Output dir [{output_dir}] does not exist.") - shutil.copy(files[index], output_dir) + fetch_file(files[index], output_dir) except Exception as exc: # log only debug for tracking, re-raise and actual error wil be logged by top process monitor LOGGER.debug("Process '%s' raised an exception: [%s]", PACKAGE_NAME, exc) @@ -50,12 +53,12 @@ def main(*args): # type: (*str) -> None LOGGER.info("Parsing inputs of '%s' process.", PACKAGE_NAME) parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument("-f", "--files", type=str, nargs="+", help="Files from which to select.") - parser.add_argument("-i", "--index", type=int, help="Index of the file to select.") + parser.add_argument("-f", "--files", type=str, required=True, nargs="+", help="Files from which to select.") + parser.add_argument("-i", "--index", type=int, required=True, help="Index of the file to select.") parser.add_argument("-o", "--outdir", default=CUR_DIR, help="Output directory of the selected file.") - ns = parser.parse_args(*args) + ns = parser.parse_args(args) sys.exit(select(ns.files, ns.index, ns.outdir)) if __name__ == "__main__": - main() + main(*sys.argv[1:]) diff --git a/weaver/processes/builtin/jsonarray2netcdf.py b/weaver/processes/builtin/jsonarray2netcdf.py index 0b45985de..3244312a6 100644 --- a/weaver/processes/builtin/jsonarray2netcdf.py +++ b/weaver/processes/builtin/jsonarray2netcdf.py @@ -41,7 +41,7 @@ def j2n(json_reference, output_dir): LOGGER.debug("Process '%s' output directory: [%s].", PACKAGE_NAME, output_dir) try: if not os.path.isdir(output_dir): - raise ValueError(f"Output dir [{output_dir}] does not exist.") + raise ValueError(f"Output directory [{output_dir}] does not exist.") with TemporaryDirectory(prefix=f"wps_process_{PACKAGE_NAME}_") as tmp_dir: LOGGER.debug("Fetching JSON file: [%s]", json_reference) json_path = fetch_file(json_reference, tmp_dir, timeout=10, retry=3) @@ -56,7 +56,7 @@ def j2n(json_reference, output_dir): LOGGER.debug("Fetching NetCDF reference from JSON file: [%s]", file_url) fetched_nc = fetch_file(file_url, output_dir, timeout=10, retry=3) LOGGER.debug("Fetched NetCDF output location: [%s]", fetched_nc) - except Exception as exc: + except Exception as exc: # pragma: no cover LOGGER.error("Process '%s' raised an exception: [%s]", PACKAGE_NAME, exc) raise LOGGER.info("Process '%s' execution completed.", PACKAGE_NAME) @@ -66,13 +66,13 @@ def main(*args): # type: (*str) -> None LOGGER.info("Parsing inputs of '%s' process.", PACKAGE_NAME) parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument("-i", metavar="json", type=str, + parser.add_argument("-i", metavar="json", type=str, required=True, help="JSON file to be parsed for NetCDF file names.") parser.add_argument("-o", metavar="outdir", default=CUR_DIR, help="Output directory of the retrieved NetCDF files extracted by name from the JSON file.") - ns = parser.parse_args(*args) + ns = parser.parse_args(args) sys.exit(j2n(ns.i, ns.o)) if __name__ == "__main__": - main() + main(*sys.argv[1:]) diff --git a/weaver/processes/builtin/metalink2netcdf.cwl b/weaver/processes/builtin/metalink2netcdf.cwl index e962ff71d..bd49df98f 100644 --- a/weaver/processes/builtin/metalink2netcdf.cwl +++ b/weaver/processes/builtin/metalink2netcdf.cwl @@ -11,6 +11,7 @@ inputs: position: 1 prefix: "-i" index: + doc: Index of the MetaLink file to extract. This index is 1-based. type: int inputBinding: position: 2 diff --git a/weaver/processes/builtin/metalink2netcdf.py b/weaver/processes/builtin/metalink2netcdf.py index dc3994c4b..cecb5ffa2 100644 --- a/weaver/processes/builtin/metalink2netcdf.py +++ b/weaver/processes/builtin/metalink2netcdf.py @@ -18,6 +18,7 @@ # pylint: disable=C0413,wrong-import-order from weaver import xml_util # isort:skip # noqa: E402 from weaver.utils import fetch_file # isort:skip # noqa: E402 +from weaver.processes.builtin.utils import is_netcdf_url # isort:skip # noqa: E402 PACKAGE_NAME = os.path.split(os.path.splitext(__file__)[0])[-1] @@ -27,7 +28,7 @@ LOGGER.setLevel(logging.INFO) # process details -__version__ = "1.2" +__version__ = "1.3" __title__ = "Metalink to NetCDF" __abstract__ = __doc__ # NOTE: '__doc__' is fetched directly, this is mostly to be informative @@ -48,29 +49,41 @@ def m2n(metalink_reference, index, output_dir): LOGGER.debug("Reading Metalink file: [%s]", metalink_path) xml_data = xml_util.parse(metalink_path) LOGGER.debug("Parsing Metalink file references.") - nc_file_url = xml_data.xpath(f"string(//metalink/file[{index}]/metaurl)") + meta_ns = xml_data.getroot().nsmap[None] # metalink URN namespace, pass explicitly otherwise xpath fails + meta_version = xml_data.xpath("/m:metalink[1]", namespaces={"m": meta_ns}) + if ( + (meta_version and meta_version[0].get("version") == "4.0") or + os.path.splitext(metalink_path)[-1] == ".meta4" + ): + ns_xpath = f"/m:metalink/m:file[{index}]/m:metaurl" + else: + ns_xpath = f"/m:metalink/m:files/m:file[{index}]/m:resources[1]/m:url" + nc_file_url = str(xml_data.xpath(f"string({ns_xpath})", namespaces={"m": meta_ns})) + if not is_netcdf_url(nc_file_url): + raise ValueError(f"Resolved file URL [{nc_file_url}] is not a valid NetCDF reference.") LOGGER.debug("Fetching NetCDF reference from Metalink file: [%s]", metalink_reference) LOGGER.debug("NetCDF file URL : %s", nc_file_url) fetch_file(nc_file_url, output_dir) except Exception as exc: # log only debug for tracking, re-raise and actual error wil be logged by top process monitor - LOGGER.debug("Process '%s' raised an exception: [%s]", PACKAGE_NAME, exc) + LOGGER.error("Process '%s' raised an unhandled exception: [%s]", PACKAGE_NAME, exc) raise LOGGER.info("Process '%s' execution completed.", PACKAGE_NAME) -def main(): +def main(*args): + # type: (*str) -> None LOGGER.info("Parsing inputs of '%s' process.", PACKAGE_NAME) parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument("-i", metavar="metalink", type=str, + parser.add_argument("-i", metavar="metalink", type=str, required=True, help="Metalink file to be parsed for NetCDF file names.") - parser.add_argument("-n", metavar="index", type=int, + parser.add_argument("-n", metavar="index", type=int, required=True, help="Index of the specific NetCDF file to extract. First element's index is 1.") parser.add_argument("-o", metavar="outdir", default=CUR_DIR, help="Output directory of the retrieved NetCDF files extracted by name from the Metalink file.") - args = parser.parse_args() + args = parser.parse_args(args) sys.exit(m2n(args.i, args.n, args.o)) if __name__ == "__main__": - main() + main(*sys.argv[1:]) diff --git a/weaver/xml_util.py b/weaver/xml_util.py index f3d6f9026..b0605a281 100644 --- a/weaver/xml_util.py +++ b/weaver/xml_util.py @@ -46,6 +46,7 @@ # define this type here so that code can use it for actual logic without repeating 'noqa' XML = lxml_etree._Element # noqa +XMLTree = lxml_etree._ElementTree # noqa # save a local reference to method employed by OWSLib directly called _lxml_fromstring = lxml_etree.fromstring @@ -59,7 +60,7 @@ def fromstring(text, parser=XML_PARSER): def parse(source, parser=XML_PARSER): - # type: (Union[str, BufferedReader], lxml_etree.XMLParser) -> XML + # type: (Union[str, BufferedReader], lxml_etree.XMLParser) -> XMLTree return lxml_etree.parse(source, parser=parser) # nosec: B410