From ce36be3c197f3a9230f76eb5b4c6a42d90af7448 Mon Sep 17 00:00:00 2001 From: Konstantinos Tsitsimpikos Date: Wed, 7 Feb 2024 09:45:41 +0200 Subject: [PATCH] Registration upon ingestion (#508) * Support only list of files and targets as ingestor inputs * Fixing format * Adding logger * Simplify to drop support for traverse * Adding tests * Adding a rename hashmap for user to provide new names of source ingested files * Rename the argument to name_map * Working version for renaming * Adding tests * Simplify validate logic - PR changes * Local ingestion and use of run_dag * Importing dag module - fix reference before assignment graph * local feature only on realtime dag mode * Changing naming to follow exporter's convention * Initial commit for registering assets after ingestion * Adding logging * PR comments - Register default to True * Changing f strings for performance * PR changes * Misc staging testing * User converter minor suggestion --- src/tiledb/cloud/bioimg/helpers.py | 26 ++++ src/tiledb/cloud/bioimg/ingestion.py | 193 +++++++++++++++++++++------ tests/test_bioimg.py | 99 ++++++++++++++ 3 files changed, 276 insertions(+), 42 deletions(-) create mode 100644 tests/test_bioimg.py diff --git a/src/tiledb/cloud/bioimg/helpers.py b/src/tiledb/cloud/bioimg/helpers.py index 0353a870f..2692fb971 100644 --- a/src/tiledb/cloud/bioimg/helpers.py +++ b/src/tiledb/cloud/bioimg/helpers.py @@ -1,4 +1,5 @@ import logging +from typing import Sequence import tiledb from tiledb.cloud.utilities import get_logger @@ -34,3 +35,28 @@ def serialize_filter(filter): return filter_dict else: raise TypeError + + +def is_folder(path: str) -> bool: + return path.endswith("/") + + +def validate_io_paths(source: Sequence[str], output: Sequence[str]) -> None: + if len(source) == 0 or len(output) == 0: + raise ValueError("Source/Output list must not be empty.") + + if len(source) == 1 and len(output) == 1: + if is_folder(source[0]) and not is_folder(output[0]): + raise ValueError("Invalid combination of source and output paths.") + else: + return + else: + if len(source) == len(output): + if all(not is_folder(s) for s in source) and all( + not is_folder(o) for o in output + ): + return + else: + raise ValueError("Invalid combination of source and output paths.") + else: + raise ValueError("Invalid combination of source and output paths.") diff --git a/src/tiledb/cloud/bioimg/ingestion.py b/src/tiledb/cloud/bioimg/ingestion.py index 8bf69816d..b2ff05a97 100644 --- a/src/tiledb/cloud/bioimg/ingestion.py +++ b/src/tiledb/cloud/bioimg/ingestion.py @@ -1,9 +1,11 @@ +import logging from typing import Any, Dict, Iterator, Mapping, Optional, Sequence, Tuple, Union import tiledb from tiledb.cloud import dag from tiledb.cloud.bioimg.helpers import get_logger_wrapper from tiledb.cloud.bioimg.helpers import serialize_filter +from tiledb.cloud.bioimg.helpers import validate_io_paths from tiledb.cloud.dag.mode import Mode from tiledb.cloud.rest_api.models import RetryStrategy from tiledb.cloud.utilities._common import run_dag @@ -17,14 +19,15 @@ def ingest( source: Union[Sequence[str], str], - output: str, + output: Union[Sequence[str], str], config: Mapping[str, Any], *args: Any, taskgraph_name: Optional[str] = None, num_batches: Optional[int] = None, - threads: Optional[int] = 8, + threads: Optional[int] = 0, resources: Optional[Mapping[str, Any]] = None, compute: bool = True, + register: bool = True, mode: Optional[Mode] = Mode.BATCH, namespace: Optional[str], verbose: bool = False, @@ -35,8 +38,10 @@ def ingest( ) -> tiledb.cloud.dag.DAG: """The function ingests microscopy images into TileDB arrays - :param source: uri / iterable of uris of input files - :param output: output dir for the ingested tiledb arrays + :param source: uri / iterable of uris of input files. + If the uri points to a directory of files make sure it ends with a trailing '/' + :param output: uri / iterable of uris of input files. + If the uri points to a directory of files make sure it ends with a trailing '/' :param config: dict configuration to pass on tiledb.VFS :param taskgraph_name: Optional name for taskgraph, defaults to None :param num_batches: Number of graph nodes to spawn. @@ -46,6 +51,8 @@ def ingest( defaults to None :param compute: When True the DAG returned will be computed inside the function otherwise DAG will only be returned. + :param register: When True the ingested images are also being registered under the + namespace in which were ingested. :param mode: By default runs Mode.Batch :param namespace: The namespace where the DAG will run :param verbose: verbose logging, defaults to False @@ -62,12 +69,12 @@ def ingest( def build_io_uris_ingestion( source: Sequence[str], - output_dir: str, + output: Sequence[str], output_ext: str, supported_exts: Tuple[str], + logger: logging.Logger, ): """Match input uri/s with output destinations - :param source: A sequence of paths or path to input :param output_dir: A path to the output directory """ @@ -79,45 +86,72 @@ def build_io_uris_ingestion( # Even though a tuple by definition when passed through submit becomes list supported_exts = tuple(supported_exts) - def create_output_path(input_file, output_dir) -> str: - filename = os.path.splitext(os.path.basename(input_file))[0] - output_filename = filename + f".{output_ext}" if output_ext else filename - return os.path.join(output_dir, output_filename) - - def iter_paths(source: Sequence[str]) -> Iterator[Tuple]: - for uri in source: - if vfs.is_dir(uri): - # Folder for exploration - contents = vfs.ls(uri) - yield from iter_paths(contents) - elif uri.endswith(supported_exts): - yield uri, create_output_path(uri, output_dir) - - if len(source) == 0: - raise ValueError("The source files cannot be empty") - return tuple(iter_paths(source)) + def create_output_path(input_file: str, output: str) -> str: + # Check if output is dir + if output.endswith("/"): + filename = os.path.splitext(os.path.basename(input_file))[0] + output_filename = ( + filename + f".{output_ext}" if output_ext else filename + ) + return os.path.join(output, output_filename) + else: + # The output is considered a target file + return output + + def iter_paths(source: Sequence[str], output: Sequence[str]) -> Iterator[Tuple]: + if len(output) != 1: + for s, o in zip(source, output): + logger.debug("Pair %s and %s", s, o) + yield s, create_output_path(s, o) + else: + logger.debug("Traverse source: %s", source) + for s in source: + if s.endswith("/"): + # Folder for exploration + contents = vfs.ls(s) + # Explore folders only at depth 1 + filtered_contents = [c for c in contents if not vfs.is_dir(c)] + yield from iter_paths(filtered_contents, output) + elif s.endswith(supported_exts): + logger.debug("Pair %s and %s", s, output[0]) + yield s, create_output_path(s, output[0]) + + logger.debug("Create pairs between %s and %s", source, output) + return tuple(iter_paths(source, output)) def build_input_batches( source: Sequence[str], - output: str, + output: Sequence[str], num_batches: int, out_ext: str, supported_exts: Tuple, + *, + verbose: bool, ): + logger = get_logger_wrapper(verbose) + """Groups input URIs into batches.""" - uri_pairs = build_io_uris_ingestion(source, output, out_ext, supported_exts) + uri_pairs = build_io_uris_ingestion( + source, output, out_ext, supported_exts, logger + ) + logger.debug(f"Input batches:{uri_pairs}") # If the user didn't specify a number of batches, run every import # as its own task. + logger.debug(f"The io pairs for ingestion: {uri_pairs}") my_num_batches = num_batches or len(uri_pairs) # If they specified too many batches, don't create empty tasks. my_num_batches = min(len(uri_pairs), my_num_batches) - return [uri_pairs[n::my_num_batches] for n in range(my_num_batches)] + logger.debug(f"Number of batches:{my_num_batches}") + split_batches = [uri_pairs[n::my_num_batches] for n in range(my_num_batches)] + logger.debug(f"Split batches:{split_batches}") + return split_batches def ingest_tiff_udf( io_uris: Sequence[Tuple], config: Mapping[str, Any], verbose: bool, exclude_metadata: bool, + converter: str = "tiff", *args: Any, **kwargs, ): @@ -132,14 +166,10 @@ def ingest_tiff_udf( from tiledb.bioimg import Converters from tiledb.bioimg import from_bioimg - converter = kwargs.get("converter", None) - user_converter = Converters.OMETIFF - if not converter or converter == "tiff": - user_converter = Converters.OMETIFF - elif converter == "zarr": - user_converter = Converters.OMEZARR - elif converter == "osd": - user_converter = Converters.OSD + user_converter = { + "zarr": Converters.OMEZARR, + "osd": Converters.OSD, + }.get(converter, Converters.OMETIFF) compressor = kwargs.get("compressor", None) if compressor: @@ -169,10 +199,71 @@ def ingest_tiff_udf( verbose=verbose, **kwargs, ) + return io_uris + + def register_dataset_udf( + io_uris: Sequence[Tuple], + *, + acn: str, + namespace: Optional[str] = None, + config: Optional[Mapping[str, Any]] = None, + verbose: bool = False, + ) -> None: + """ + Register the dataset on TileDB Cloud. + The ingested assets are being registered with the register name matching + the ingested filename of the data + + :param io_uris: Tuple of source and dest paths from ingestion UDF + :param namespace: TileDB Cloud namespace, defaults to the user's + default namespace + :param config: config dictionary, defaults to None + :param verbose: verbose logging, defaults to False + """ + import os - if isinstance(source, str): - # Handle only lists - source = [source] + logger = get_logger_wrapper(verbose) + + namespace = namespace or tiledb.cloud.user_profile().default_namespace_charged + # Default registration based on the output filename + register_map = {} + tiledb_uris = {} + for _, output_path in io_uris: + register_name = os.path.splitext(os.path.basename(output_path))[0] + register_map[output_path] = register_name + tiledb_uris[output_path] = f"tiledb://{namespace}/{register_name}" + + with tiledb.scope_ctx(config): + for dataset_uri, tiledb_uri in tiledb_uris.items(): + found = False + try: + object_type = tiledb.object_type(tiledb_uri) + logger.debug("Object type of %s : %s", tiledb_uri, object_type) + if object_type == "group": + found = True + elif object_type is not None: + raise ValueError( + f"Another object is already registered at '{tiledb_uri}'." + ) + + except Exception: + # tiledb.object_type raises an exception + # if the namespace does not exist + logger.error( + "Error checking if %r is registered. Bad namespace?", tiledb_uri + ) + raise + + if found: + logger.info("Dataset already registered at %r.", tiledb_uri) + else: + logger.info("Registering dataset %s at %s", dataset_uri, tiledb_uri) + tiledb.cloud.groups.register( + dataset_uri, + name=register_name, + namespace=namespace, + credentials_name=acn, + ) # Default None the TIFF converter is used # The Converters Enum is defined in the tiledb-bioimg package @@ -183,8 +274,9 @@ def ingest_tiff_udf( f"The selected converter is not supported please \ choose on of {_SUPPORTED_CONVERTERS}" ) - - logger.debug("Ingesting files: %s", source) + source = [source] if isinstance(source, str) else source + output = [output] if isinstance(output, str) else output + validate_io_paths(source, output) # Build the task graph dag_name = taskgraph_name or DEFAULT_DAG_NAME @@ -210,6 +302,8 @@ def ingest_tiff_udf( num_batches, output_ext, _SUPPORTED_EXTENSIONS, + *args, + verbose=verbose, access_credentials_name=kwargs.get("access_credentials_name"), name=f"{dag_name} input collector", result_format="json", @@ -220,22 +314,37 @@ def ingest_tiff_udf( logger.debug("Compressor: %r", compressor) compressor_serial = serialize_filter(compressor) if compressor else None - graph.submit( + ingest_list_node = graph.submit( ingest_tiff_udf, input_list_node, config, verbose, exclude_metadata, - threads, + converter, *args, name=f"{dag_name} ingestor ", expand_node_output=input_list_node, resources=DEFAULT_RESOURCES if resources is None else resources, image_name=DEFAULT_IMG_NAME, + max_workers=threads, compressor=compressor_serial, - converter=converter, **kwargs, ) + + if register: + graph.submit( + register_dataset_udf, + ingest_list_node, + config=config, + verbose=verbose, + acn=kwargs.get("access_credentials_name"), + namespace=namespace, + name=f"{dag_name} registrator ", + expand_node_output=ingest_list_node, + resources=DEFAULT_RESOURCES if resources is None else resources, + image_name=DEFAULT_IMG_NAME, + **kwargs, + ) if compute: run_dag(graph, debug=verbose) return graph diff --git a/tests/test_bioimg.py b/tests/test_bioimg.py new file mode 100644 index 000000000..14e748d51 --- /dev/null +++ b/tests/test_bioimg.py @@ -0,0 +1,99 @@ +import unittest + +from tiledb.cloud.bioimg.helpers import validate_io_paths + + +class BioimgTest(unittest.TestCase): + def test_validate_io_paths_accepted(self): + # Accepted cases + accepted_pairs = { + # 1 File -> Output: 1 Folder + "test1": (["s3://test_in/a.tiff"], ["s3://test_out/b/"]), + # 1 File -> Output: 1 File + "test3": (["s3://test_in/a.tiff"], ["s3://test_out/b"]), + # 1 Folder -> Output: 1 Folder + "test5": (["s3://test_in/a/"], ["s3://test_out/b/"]), + # Multiple Files -> Output: Multiple Files (Matching number) + "test11": ( + ["s3://test_in/a.tiff", "s3://test_in/c.tiff"], + ["s3://test_out/b", "s3://test_out/d"], + ), + } + for test_name, (source, dest) in accepted_pairs.items(): + with self.subTest(f"case: {test_name}"): + validate_io_paths(source, dest) + + # Non Accepted cases + non_accepted_pairs = { + # 1 File -> Output: Multiple Folders + "test2": ( + ["s3://test_in/a.tiff"], + ["s3://test_out/b/", "s3://test_out/c/"], + ), + # 1 File -> Output: Multiple Files + "test4": (["s3://test_in/a"], ["s3://test_out/b", "s3://test_out/c"]), + # 1 Folder -> Output: 1 File + "test6": (["s3://test_in/a/"], ["s3://test_out/b"]), + # 1 Folder -> Output: Multiple Files + "test7": (["s3://test_in/a/"], ["s3://test_out/b", "s3://test_out/c"]), + # 1 Folder -> Output: Multiple Folders + "test8": (["s3://test_in/a/"], ["s3://test_out/b/", "s3://test_out/c/"]), + # Multiple Files -> Output: 1 File + "test9": (["s3://test_in/a", "s3://test_in/c"], ["s3://test_out/b"]), + # Multiple Files -> Output: Multiple Files (Non-Matching number) + "test12": ( + ["s3://test_in/a", "s3://test_in/c"], + ["s3://test_out/b", "s3://test_out/d", "s3://test_out/e"], + ), + # Multiple Files -> Output: Multiple Folders + # (Matching or non-matching length) + # Non-matching + "test13a": ( + ["s3://test_in/a", "s3://test_in/c"], + ["s3://test_out/b/", "s3://test_out/d/", "s3://test_out/e/"], + ), + # matching + "test13b": ( + ["s3://test_in/a", "s3://test_in/c"], + ["s3://test_out/b/", "s3://test_out/d/"], + ), + # Multiple Folders -> Output: 1 File + "test15": (["s3://test_in/a/", "s3://test_in/c/"], ["s3://test_out/b"]), + # Multiple Folders -> Output: Multiple Files + # (Matching or non-matching length) + # Matching + "test17a": ( + ["s3://test_in/a/", "s3://test_in/c/"], + ["s3://test_out/b", "s3://test_out/d", "s3://test_out/e"], + ), + # Non Matching + "test17b": ( + ["s3://test_in/a/", "s3://test_in/c/"], + ["s3://test_out/b", "s3://test_out/d"], + ), + # Multiple Folders -> Output: Multiple Folders (Non-Matching number) + "test18": ( + ["s3://test_in/a/", "s3://test_in/c/"], + ["s3://test_out/b", "s3://test_out/d", "s3://test_out/e"], + ), + # Mix of Files and Folders -> Output: 1 File + "test20": (["s3://test_in/a", "s3://test_in/c/"], ["s3://test_out/b"]), + # Mix of Files and Folders -> Output: Multiple Files and Folders + "test21": ( + ["s3://test_in/a/", "s3://test_in/c"], + ["s3://test_out/b", "s3://test_out/d/"], + ), + # Multiple Folders -> Output: 1 Folder + "test22": (["s3://test_in/a/", "s3://test_in/c/"], ["s3://test_out/b/"]), + # Mix of Files and Folders -> Output: 1 Folder + "test23": (["s3://test_in/a", "s3://test_in/c/"], ["s3://test_out/b/"]), + # Multiple Files -> Output: 1 Folder + "test10": ( + ["s3://test_in/a.tiff", "s3://test_in/c.tiff"], + ["s3://test_out/b/"], + ), + } + for test_name, (source, dest) in non_accepted_pairs.items(): + with self.subTest(f"case: {test_name}"): + with self.assertRaises(ValueError): + validate_io_paths(source, dest)