From 469ecfac16711004a508cb088dc18def5293bf74 Mon Sep 17 00:00:00 2001 From: Julian Geiger Date: Thu, 3 Oct 2024 13:01:31 +0200 Subject: [PATCH] Add `--also-remote` option to `verdi process dump` Tries to open a connection to the remote Computer using the transport of the CalcJobNode, and `get`s the files and folders which were not originally dumped, either as they weren't part of the CalcJobNode's repository or retrieved. --- src/aiida/cmdline/commands/cmd_process.py | 22 ++++++++- src/aiida/tools/dumping/processes.py | 55 ++++++++++++++++++++++- 2 files changed, 74 insertions(+), 3 deletions(-) diff --git a/src/aiida/cmdline/commands/cmd_process.py b/src/aiida/cmdline/commands/cmd_process.py index c9c492ae14..d865c8566b 100644 --- a/src/aiida/cmdline/commands/cmd_process.py +++ b/src/aiida/cmdline/commands/cmd_process.py @@ -576,6 +576,13 @@ def process_repair(manager, broker, dry_run): show_default=True, help='Include extras in the `.aiida_node_metadata.yaml` written for every `ProcessNode`.', ) +@click.option( + '--also-remote', + is_flag=True, + default=False, + show_default=True, + help='If true, try to obtain also intermediate files on the remote computer that were not initially retrieved.', +) @click.option( '-f', '--flat', @@ -591,19 +598,21 @@ def process_dump( include_outputs, include_attributes, include_extras, + also_remote, flat, ) -> None: """Dump process input and output files to disk. Child calculations/workflows (also called `CalcJob`s/`CalcFunction`s and `WorkChain`s/`WorkFunction`s in AiiDA jargon) run by the parent workflow are contained in the directory tree as sub-folders and are sorted by their - creation time. The directory tree thus mirrors the logical execution of the workflow, which can also be queried by + creation time. The directory tree thus mirrors the logical execution of the workflow, which can also be queried by running `verdi process status ` on the command line. By default, input and output files of each calculation can be found in the corresponding "inputs" and "outputs" directories (the former also contains the hidden ".aiida" folder with machine-readable job execution settings). Additional input and output files (depending on the type of calculation) are placed in the "node_inputs" - and "node_outputs", respectively. + and "node_outputs", respectively. Using the `--also-remote` flag, additional files of the `remote_workdir` on the + Computer where the CalcJobs were run can be retrieved (if they still exist on the remote). Lastly, every folder also contains a hidden, human-readable `.aiida_node_metadata.yaml` file with the relevant AiiDA node data for further inspection. @@ -618,8 +627,17 @@ def process_dump( include_extras=include_extras, overwrite=overwrite, flat=flat, + also_remote=also_remote, ) + if also_remote: + echo.echo_report( + '`--also-remote` set to True. Will try to retrieve additional files from the `workdir` of the remote Computer.' # noqa: E501 + ) + echo.echo_report( + 'If files are non-existent, they will be skipped silently. Check if the output files are what you expect.' + ) + try: dump_path = process_dumper.dump(process_node=process, output_path=path) except FileExistsError: diff --git a/src/aiida/tools/dumping/processes.py b/src/aiida/tools/dumping/processes.py index 3d970c421c..28b4b04cc6 100644 --- a/src/aiida/tools/dumping/processes.py +++ b/src/aiida/tools/dumping/processes.py @@ -10,6 +10,7 @@ from __future__ import annotations +import itertools as it import logging from pathlib import Path from types import SimpleNamespace @@ -18,7 +19,7 @@ import yaml from aiida.common import LinkType -from aiida.common.exceptions import NotExistentAttributeError +from aiida.common.exceptions import ConfigurationError, NotExistent, NotExistentAttributeError from aiida.orm import ( CalcFunctionNode, CalcJobNode, @@ -30,6 +31,7 @@ WorkFunctionNode, ) from aiida.orm.utils import LinkTriple +from aiida.orm.utils.remote import get_calcjob_remote_paths LOGGER = logging.getLogger(__name__) @@ -42,6 +44,7 @@ def __init__( include_attributes: bool = True, include_extras: bool = True, overwrite: bool = False, + also_remote: bool = False, flat: bool = False, ) -> None: self.include_inputs = include_inputs @@ -49,6 +52,7 @@ def __init__( self.include_attributes = include_attributes self.include_extras = include_extras self.overwrite = overwrite + self.also_remote = also_remote self.flat = flat @staticmethod @@ -285,6 +289,55 @@ def _dump_calculation( link_triples=output_links, ) + # Additional remote file retrieval should only apply for CalcJobNodes, not CalcFunctionNodes + if self.also_remote and isinstance(calculation_node, CalcJobNode): + self._dump_calculation_remote_files(calcjob_node=calculation_node, output_path=output_path) + + def _dump_calculation_remote_files(self, calcjob_node: CalcJobNode, output_path: Path) -> None: + """Dump the additional remote files attached to a `CalculationNode` to a specified output path. + + :param calculation_node: The `CalculationNode` to be dumped. + :param output_path: The path where the files will be dumped. + :param io_dump_paths: Subdirectories created for the `CalculationNode`. + Default: ['inputs', 'outputs', 'node_inputs', 'node_outputs'] + """ + + remote_workdir = calcjob_node.get_remote_workdir() + if remote_workdir is None: + raise NotExistent(f"CalcJobNode <{calcjob_node.pk}> doesn't have a `remote_workdir`.") + + # Exclude the objects that were already dumped, as they were either originally retrieved via `retrieve_list` + # or are already part of the file repository of the CalcJobNode, e.g. the `aiida.in` and `_aiidasubmit.sh` + retrieve_list = list(calcjob_node.get_retrieve_list()) # type: ignore[arg-type] + repository_list = calcjob_node.base.repository.list_object_names() + exclude_list = retrieve_list + repository_list + + # Obtain a flattened list of the `RemoteData` objects. + # The computer UUIDs that are the keys of the returned dictionary of `get_calcjob_remote_paths` aren't + # needed, as we only run the function for a single CalcJobNode using its associated transport to get the files + calcjob_remote_paths = get_calcjob_remote_paths(pks=[calcjob_node.pk]) # type: ignore[list-item] + calcjob_remote_datas = list(it.chain.from_iterable(calcjob_remote_paths.values())) # type: ignore[union-attr] + + # Unlike for the `retrieve_files_from_list` in `execmanager.py`, we only dump the files to disk, rather than + # also storing them in the repository via `FolderData` + try: + with calcjob_node.get_transport() as transport: + for calcjob_remote_data in calcjob_remote_datas: + # Obtain all objects from each of the RemoteDatas associated with the CalcJobNode + # (could this even be more than one?) + retrieve_objects = [_ for _ in calcjob_remote_data.listdir() if _ not in exclude_list] + remote_paths = [(Path(remote_workdir) / _).resolve() for _ in retrieve_objects] + local_paths = [(output_path / 'remote_files' / _).resolve() for _ in retrieve_objects] + + # Transport.get() works for files and folders, so we don't need to make a distinction + for rem, loc in zip(remote_paths, local_paths): + transport.get(str(rem), str(loc), ignore_nonexisting=True) + + # Getting the transport fails, propagate exception outwards + # (could just remove the except, but being explicit might make it clearer here) + except ConfigurationError: + raise + def _dump_calculation_io(self, parent_path: Path, link_triples: LinkManager | List[LinkTriple]): """Small helper function to dump linked input/output nodes of a `CalculationNode`.