Skip to content

Commit

Permalink
Merge pull request #583 from crim-ca/test-builtin-processes
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault authored Nov 3, 2023
2 parents 7392618 + 25be10b commit f36c35d
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 26 deletions.
7 changes: 5 additions & 2 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ Changes

Changes:
--------
- No change.
- Add more tests to validate core code paths of ``builtin`` `Process` ``jsonarray2netcdf``, ``metalink2netcdf`` and
``file_index_selector`` with validation of happy path and error handling conditions.

Fixes:
------
- No change.
- Fix invalid parsing of `XML` Metalink files in ``metalink2netcdf``. Metalink V3 and V4 will now properly consider the
namespace and specific content structure to extract the NetCDF URL reference, and the `Process` will validate that the
extracted reference respects the NetCDF extension.

.. _changes_4.34.0:

Expand Down
238 changes: 235 additions & 3 deletions tests/functional/test_builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
from typing import TYPE_CHECKING

import pytest
from pywps.inout.outputs import MetaFile, MetaLink, MetaLink4

from tests.functional.utils import WpsConfigBase
from tests.utils import get_settings_from_testapp, mocked_execute_celery, mocked_sub_requests
from weaver.execute import ExecuteControlOption, ExecuteMode, ExecuteResponse, ExecuteTransmissionMode
from weaver.formats import ContentType
from weaver.processes.builtin import register_builtin_processes
from weaver.formats import ContentType, get_format, repr_json
from weaver.processes.builtin import file_index_selector, jsonarray2netcdf, metalink2netcdf, register_builtin_processes
from weaver.status import Status

if TYPE_CHECKING:
Expand All @@ -35,7 +36,7 @@ def setUpClass(cls):
def setUp(self):
# register builtin processes from scratch to have clean state
self.process_store.clear_processes()
register_builtin_processes(self.settings)
register_builtin_processes(self.settings) # type: ignore

def test_jsonarray2netcdf_describe_old_schema(self):
resp = self.app.get("/processes/jsonarray2netcdf?schema=OLD", headers=self.json_headers)
Expand Down Expand Up @@ -362,3 +363,234 @@ def test_jsonarray2netcdf_execute_sync(self):
outputs = resp.json

self.validate_results(results, outputs, nc_data, None)


def test_jsonarray2netcdf_process():
with contextlib.ExitStack() as stack:
data = {}
for idx in range(3):
tmp_nc = stack.enter_context(tempfile.NamedTemporaryFile(mode="w", suffix=".nc"))
tmp_nc_data = f"data NetCDF {idx}"
tmp_nc.write(tmp_nc_data)
tmp_nc.flush()
tmp_nc.seek(0)
tmp_nc_href = f"file://{tmp_nc.name}"
data[tmp_nc_href] = tmp_nc_data
tmp_json = stack.enter_context(tempfile.NamedTemporaryFile(mode="w", suffix=".json"))
json.dump(list(data), tmp_json, indent=2)
tmp_json.flush()
tmp_json.seek(0)
tmp_out_dir = stack.enter_context(tempfile.TemporaryDirectory())

with pytest.raises(SystemExit) as err:
jsonarray2netcdf.main("-i", tmp_json.name, "-o", tmp_out_dir)
assert err.value.code in [None, 0]

for nc_file, nc_data in data.items():
nc_name = os.path.split(nc_file)[-1]
nc_path = os.path.join(tmp_out_dir, nc_name)
assert os.path.isfile(nc_path)
with open(nc_path, mode="r", encoding="utf-8") as nc_ref:
assert nc_ref.read() == nc_data


@pytest.mark.parametrize(
"test_data",
[
1,
"",
"abc",
{},
[
1,
2,
],
[
"abc",
"xyz",
],
[
"/tmp/does-not-exist/fake-file.txt", # noqa
"/tmp/does-not-exist/fake-file.nc", # noqa
]
]
)
def test_jsonarray2netcdf_invalid_json(test_data):
with contextlib.ExitStack() as stack:
tmp_out_dir = stack.enter_context(tempfile.TemporaryDirectory())
tmp_file = stack.enter_context(tempfile.NamedTemporaryFile(mode="w", suffix=".json"))
tmp_file.write(repr_json(test_data, force_string=True))
tmp_file.flush()
tmp_file.seek(0)

with pytest.raises(ValueError) as err:
jsonarray2netcdf.main("-i", tmp_file.name, "-o", tmp_out_dir)
assert str(err.value) == "Invalid JSON file format, expected a plain array of NetCDF file URL strings."


@pytest.mark.parametrize(
"args",
[
[],
["-i"],
]
)
def test_jsonarray2netcdf_missing_params(args):
with pytest.raises(SystemExit) as err:
jsonarray2netcdf.main(*args)
assert err.value.code == 2


def test_jsonarray2netcdf_invalid_out_dir():
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_out_dir = os.path.join(tmp_dir, "random")

with pytest.raises(ValueError) as err:
jsonarray2netcdf.main("-i", "", "-o", tmp_out_dir)
assert "does not exist" in str(err.value)


@pytest.mark.parametrize(
["metalink_cls", "metalink_ext", "test_index"],
[
(MetaLink, ".metalink", 2),
(MetaLink4, ".meta4", 2)
]
)
def test_metalink2netcdf_process(metalink_cls, metalink_ext, test_index):
with contextlib.ExitStack() as stack:
data = {}
meta_files = []
nc_fmt = get_format(ContentType.APP_NETCDF)
for idx in range(3):
tmp_nc = stack.enter_context(tempfile.NamedTemporaryFile(mode="w", suffix=".nc"))
tmp_nc_data = f"data NetCDF {idx}"
tmp_nc.write(tmp_nc_data)
tmp_nc.flush()
tmp_nc.seek(0)
data[idx] = {"name": tmp_nc.name, "data": tmp_nc_data}
tmp_meta_file = MetaFile(identity=str(idx), fmt=nc_fmt)
tmp_meta_file.file = tmp_nc.name
meta_files.append(tmp_meta_file)
metalink = metalink_cls(identity="test", workdir=tempfile.gettempdir(), files=tuple(meta_files))
tmp_meta = stack.enter_context(tempfile.NamedTemporaryFile(mode="w", suffix=metalink_ext))
tmp_meta.write(metalink.xml)
tmp_meta.flush()
tmp_meta.seek(0)
tmp_out_dir = stack.enter_context(tempfile.TemporaryDirectory())

with pytest.raises(SystemExit) as err:
metalink2netcdf.main("-i", tmp_meta.name, "-n", str(test_index), "-o", tmp_out_dir)
assert err.value.code in [None, 0]

for idx in range(3):
nc_out_name = os.path.split(data[idx]["name"])[-1]
nc_out_path = os.path.join(tmp_out_dir, nc_out_name)
if idx + 1 == test_index: # index is 1-based in XPath
assert os.path.isfile(nc_out_path)
with open(nc_out_path, mode="r", encoding="utf-8") as nc_out_file:
nc_out_data = nc_out_file.read()
os.remove(nc_out_path)
assert nc_out_data == data[idx]["data"]
else:
assert not os.path.isfile(nc_out_path)


def test_metalink2netcdf_reference_not_netcdf():
with contextlib.ExitStack() as stack:
metafile = MetaFile(fmt=get_format(ContentType.APP_NETCDF))
tmp_text = stack.enter_context(tempfile.NamedTemporaryFile(mode="w", suffix=".text"))
tmp_text.write("dont care")
tmp_text.flush()
tmp_text.seek(0)
metafile.file = tmp_text.name
metalink = MetaLink4(identity="test", workdir=tempfile.gettempdir(), files=tuple([metafile]))
tmp_meta = stack.enter_context(tempfile.NamedTemporaryFile(mode="w", suffix=".meta4"))
tmp_meta.write(metalink.xml)
tmp_meta.flush()
tmp_meta.seek(0)
tmp_out_dir = stack.enter_context(tempfile.TemporaryDirectory())

with pytest.raises(ValueError) as err:
metalink2netcdf.main("-i", tmp_meta.name, "-n", "1", "-o", tmp_out_dir)
assert "not a valid NetCDF" in str(err.value)


@pytest.mark.parametrize(
"args",
[
["-i"],
["-i", ""],
["-n"],
["-n", "1"],
]
)
def test_metalink2netcdf_missing_params(args):
with pytest.raises(SystemExit) as err:
metalink2netcdf.main(*args)
assert err.value.code == 2


def test_metalink2netcdf_invalid_out_dir():
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_out_dir = os.path.join(tmp_dir, "random")

with pytest.raises(ValueError) as err:
metalink2netcdf.main("-i", "", "-n", "1", "-o", tmp_out_dir)
assert "does not exist" in str(err.value)


def test_file_index_selector_process():
with contextlib.ExitStack() as stack:
data = {}
test_files = []
for idx, ext in enumerate([".txt", ".nc", ".tiff"]):
tmp_file = stack.enter_context(tempfile.NamedTemporaryFile(mode="w", suffix=ext))
tmp_data = f"data {idx}"
tmp_file.write(tmp_data)
tmp_file.flush()
tmp_file.seek(0)
tmp_href = f"file://{tmp_file.name}"
data[idx] = {"name": tmp_file.name, "data": tmp_data, "href": tmp_href}
test_files.append(tmp_href)
tmp_out_dir = stack.enter_context(tempfile.TemporaryDirectory())

test_index = 1
with pytest.raises(SystemExit) as err:
file_index_selector.main("-f", *test_files, "-i", str(test_index), "-o", tmp_out_dir)
assert err.value.code in [None, 0]
for idx, tmp_info in data.items():
out_name = os.path.split(tmp_info["name"])[-1]
out_path = os.path.join(tmp_out_dir, out_name)
if idx == test_index:
assert os.path.isfile(out_path)
with open(out_path, mode="r", encoding="utf-8") as out_file:
out_data = out_file.read()
os.remove(out_path)
assert out_data == tmp_info["data"]
else:
assert not os.path.isfile(out_path)


@pytest.mark.parametrize(
"args",
[
["-f"],
["-f", ""],
["-i"],
["-i", "1"],
]
)
def test_file_index_selector_missing_params(args):
with pytest.raises(SystemExit) as err:
file_index_selector.main(*args)
assert err.value.code == 2


def test_file_index_selector_invalid_out_dir():
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_out_dir = os.path.join(tmp_dir, "random")

with pytest.raises(ValueError) as err:
file_index_selector.main("-f", "", "-i", "1", "-o", tmp_out_dir)
assert "does not exist" in str(err.value)
17 changes: 10 additions & 7 deletions weaver/processes/builtin/file_index_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import argparse
import logging
import os
import shutil
import sys
from typing import TYPE_CHECKING

Expand All @@ -18,6 +17,10 @@
# root to allow 'from weaver import <...>'
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(CUR_DIR))))

# place weaver specific imports after sys path fixing to ensure they are found from external call
# pylint: disable=C0413,wrong-import-order
from weaver.utils import fetch_file # isort:skip # noqa: E402

PACKAGE_NAME = os.path.split(os.path.splitext(__file__)[0])[-1]

# setup logger since it is not run from the main 'weaver' app
Expand All @@ -26,7 +29,7 @@
LOGGER.setLevel(logging.INFO)

# process details
__version__ = "1.1"
__version__ = "1.2"
__title__ = "File Index Selector"
__abstract__ = __doc__ # NOTE: '__doc__' is fetched directly, this is mostly to be informative

Expand All @@ -38,7 +41,7 @@ def select(files, index, output_dir):
try:
if not os.path.isdir(output_dir):
raise ValueError(f"Output dir [{output_dir}] does not exist.")
shutil.copy(files[index], output_dir)
fetch_file(files[index], output_dir)
except Exception as exc:
# log only debug for tracking, re-raise and actual error wil be logged by top process monitor
LOGGER.debug("Process '%s' raised an exception: [%s]", PACKAGE_NAME, exc)
Expand All @@ -50,12 +53,12 @@ def main(*args):
# type: (*str) -> None
LOGGER.info("Parsing inputs of '%s' process.", PACKAGE_NAME)
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("-f", "--files", type=str, nargs="+", help="Files from which to select.")
parser.add_argument("-i", "--index", type=int, help="Index of the file to select.")
parser.add_argument("-f", "--files", type=str, required=True, nargs="+", help="Files from which to select.")
parser.add_argument("-i", "--index", type=int, required=True, help="Index of the file to select.")
parser.add_argument("-o", "--outdir", default=CUR_DIR, help="Output directory of the selected file.")
ns = parser.parse_args(*args)
ns = parser.parse_args(args)
sys.exit(select(ns.files, ns.index, ns.outdir))


if __name__ == "__main__":
main()
main(*sys.argv[1:])
10 changes: 5 additions & 5 deletions weaver/processes/builtin/jsonarray2netcdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def j2n(json_reference, output_dir):
LOGGER.debug("Process '%s' output directory: [%s].", PACKAGE_NAME, output_dir)
try:
if not os.path.isdir(output_dir):
raise ValueError(f"Output dir [{output_dir}] does not exist.")
raise ValueError(f"Output directory [{output_dir}] does not exist.")
with TemporaryDirectory(prefix=f"wps_process_{PACKAGE_NAME}_") as tmp_dir:
LOGGER.debug("Fetching JSON file: [%s]", json_reference)
json_path = fetch_file(json_reference, tmp_dir, timeout=10, retry=3)
Expand All @@ -56,7 +56,7 @@ def j2n(json_reference, output_dir):
LOGGER.debug("Fetching NetCDF reference from JSON file: [%s]", file_url)
fetched_nc = fetch_file(file_url, output_dir, timeout=10, retry=3)
LOGGER.debug("Fetched NetCDF output location: [%s]", fetched_nc)
except Exception as exc:
except Exception as exc: # pragma: no cover
LOGGER.error("Process '%s' raised an exception: [%s]", PACKAGE_NAME, exc)
raise
LOGGER.info("Process '%s' execution completed.", PACKAGE_NAME)
Expand All @@ -66,13 +66,13 @@ def main(*args):
# type: (*str) -> None
LOGGER.info("Parsing inputs of '%s' process.", PACKAGE_NAME)
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("-i", metavar="json", type=str,
parser.add_argument("-i", metavar="json", type=str, required=True,
help="JSON file to be parsed for NetCDF file names.")
parser.add_argument("-o", metavar="outdir", default=CUR_DIR,
help="Output directory of the retrieved NetCDF files extracted by name from the JSON file.")
ns = parser.parse_args(*args)
ns = parser.parse_args(args)
sys.exit(j2n(ns.i, ns.o))


if __name__ == "__main__":
main()
main(*sys.argv[1:])
1 change: 1 addition & 0 deletions weaver/processes/builtin/metalink2netcdf.cwl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ inputs:
position: 1
prefix: "-i"
index:
doc: Index of the MetaLink file to extract. This index is 1-based.
type: int
inputBinding:
position: 2
Expand Down
Loading

0 comments on commit f36c35d

Please sign in to comment.