Skip to content

Commit

Permalink
Registration upon ingestion (#508)
Browse files Browse the repository at this point in the history
* Support only list of files and targets as ingestor inputs

* Fixing format

* Adding logger

* Simplify to drop support for traverse

* Adding tests

* Adding a rename hashmap for user to provide new names of source ingested files

* Rename the argument to name_map

* Working version for renaming

* Adding tests

* Simplify validate logic - PR changes

* Local ingestion and use of run_dag

* Importing dag module - fix reference before assignment graph

* local feature only on realtime dag mode

* Changing naming to follow exporter's convention

* Initial commit for registering assets after ingestion

* Adding logging

* PR comments - Register default to True

* Changing f strings for performance

* PR changes

* Misc staging testing

* User converter minor suggestion
  • Loading branch information
ktsitsi authored Feb 7, 2024
1 parent 9c0faba commit ce36be3
Show file tree
Hide file tree
Showing 3 changed files with 276 additions and 42 deletions.
26 changes: 26 additions & 0 deletions src/tiledb/cloud/bioimg/helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from typing import Sequence

import tiledb
from tiledb.cloud.utilities import get_logger
Expand Down Expand Up @@ -34,3 +35,28 @@ def serialize_filter(filter):
return filter_dict
else:
raise TypeError


def is_folder(path: str) -> bool:
return path.endswith("/")


def validate_io_paths(source: Sequence[str], output: Sequence[str]) -> None:
if len(source) == 0 or len(output) == 0:
raise ValueError("Source/Output list must not be empty.")

if len(source) == 1 and len(output) == 1:
if is_folder(source[0]) and not is_folder(output[0]):
raise ValueError("Invalid combination of source and output paths.")
else:
return
else:
if len(source) == len(output):
if all(not is_folder(s) for s in source) and all(
not is_folder(o) for o in output
):
return
else:
raise ValueError("Invalid combination of source and output paths.")
else:
raise ValueError("Invalid combination of source and output paths.")
193 changes: 151 additions & 42 deletions src/tiledb/cloud/bioimg/ingestion.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import logging
from typing import Any, Dict, Iterator, Mapping, Optional, Sequence, Tuple, Union

import tiledb
from tiledb.cloud import dag
from tiledb.cloud.bioimg.helpers import get_logger_wrapper
from tiledb.cloud.bioimg.helpers import serialize_filter
from tiledb.cloud.bioimg.helpers import validate_io_paths
from tiledb.cloud.dag.mode import Mode
from tiledb.cloud.rest_api.models import RetryStrategy
from tiledb.cloud.utilities._common import run_dag
Expand All @@ -17,14 +19,15 @@

def ingest(
source: Union[Sequence[str], str],
output: str,
output: Union[Sequence[str], str],
config: Mapping[str, Any],
*args: Any,
taskgraph_name: Optional[str] = None,
num_batches: Optional[int] = None,
threads: Optional[int] = 8,
threads: Optional[int] = 0,
resources: Optional[Mapping[str, Any]] = None,
compute: bool = True,
register: bool = True,
mode: Optional[Mode] = Mode.BATCH,
namespace: Optional[str],
verbose: bool = False,
Expand All @@ -35,8 +38,10 @@ def ingest(
) -> tiledb.cloud.dag.DAG:
"""The function ingests microscopy images into TileDB arrays
:param source: uri / iterable of uris of input files
:param output: output dir for the ingested tiledb arrays
:param source: uri / iterable of uris of input files.
If the uri points to a directory of files make sure it ends with a trailing '/'
:param output: uri / iterable of uris of input files.
If the uri points to a directory of files make sure it ends with a trailing '/'
:param config: dict configuration to pass on tiledb.VFS
:param taskgraph_name: Optional name for taskgraph, defaults to None
:param num_batches: Number of graph nodes to spawn.
Expand All @@ -46,6 +51,8 @@ def ingest(
defaults to None
:param compute: When True the DAG returned will be computed inside the function
otherwise DAG will only be returned.
:param register: When True the ingested images are also being registered under the
namespace in which were ingested.
:param mode: By default runs Mode.Batch
:param namespace: The namespace where the DAG will run
:param verbose: verbose logging, defaults to False
Expand All @@ -62,12 +69,12 @@ def ingest(

def build_io_uris_ingestion(
source: Sequence[str],
output_dir: str,
output: Sequence[str],
output_ext: str,
supported_exts: Tuple[str],
logger: logging.Logger,
):
"""Match input uri/s with output destinations
:param source: A sequence of paths or path to input
:param output_dir: A path to the output directory
"""
Expand All @@ -79,45 +86,72 @@ def build_io_uris_ingestion(
# Even though a tuple by definition when passed through submit becomes list
supported_exts = tuple(supported_exts)

def create_output_path(input_file, output_dir) -> str:
filename = os.path.splitext(os.path.basename(input_file))[0]
output_filename = filename + f".{output_ext}" if output_ext else filename
return os.path.join(output_dir, output_filename)

def iter_paths(source: Sequence[str]) -> Iterator[Tuple]:
for uri in source:
if vfs.is_dir(uri):
# Folder for exploration
contents = vfs.ls(uri)
yield from iter_paths(contents)
elif uri.endswith(supported_exts):
yield uri, create_output_path(uri, output_dir)

if len(source) == 0:
raise ValueError("The source files cannot be empty")
return tuple(iter_paths(source))
def create_output_path(input_file: str, output: str) -> str:
# Check if output is dir
if output.endswith("/"):
filename = os.path.splitext(os.path.basename(input_file))[0]
output_filename = (
filename + f".{output_ext}" if output_ext else filename
)
return os.path.join(output, output_filename)
else:
# The output is considered a target file
return output

def iter_paths(source: Sequence[str], output: Sequence[str]) -> Iterator[Tuple]:
if len(output) != 1:
for s, o in zip(source, output):
logger.debug("Pair %s and %s", s, o)
yield s, create_output_path(s, o)
else:
logger.debug("Traverse source: %s", source)
for s in source:
if s.endswith("/"):
# Folder for exploration
contents = vfs.ls(s)
# Explore folders only at depth 1
filtered_contents = [c for c in contents if not vfs.is_dir(c)]
yield from iter_paths(filtered_contents, output)
elif s.endswith(supported_exts):
logger.debug("Pair %s and %s", s, output[0])
yield s, create_output_path(s, output[0])

logger.debug("Create pairs between %s and %s", source, output)
return tuple(iter_paths(source, output))

def build_input_batches(
source: Sequence[str],
output: str,
output: Sequence[str],
num_batches: int,
out_ext: str,
supported_exts: Tuple,
*,
verbose: bool,
):
logger = get_logger_wrapper(verbose)

"""Groups input URIs into batches."""
uri_pairs = build_io_uris_ingestion(source, output, out_ext, supported_exts)
uri_pairs = build_io_uris_ingestion(
source, output, out_ext, supported_exts, logger
)
logger.debug(f"Input batches:{uri_pairs}")
# If the user didn't specify a number of batches, run every import
# as its own task.
logger.debug(f"The io pairs for ingestion: {uri_pairs}")
my_num_batches = num_batches or len(uri_pairs)
# If they specified too many batches, don't create empty tasks.
my_num_batches = min(len(uri_pairs), my_num_batches)
return [uri_pairs[n::my_num_batches] for n in range(my_num_batches)]
logger.debug(f"Number of batches:{my_num_batches}")
split_batches = [uri_pairs[n::my_num_batches] for n in range(my_num_batches)]
logger.debug(f"Split batches:{split_batches}")
return split_batches

def ingest_tiff_udf(
io_uris: Sequence[Tuple],
config: Mapping[str, Any],
verbose: bool,
exclude_metadata: bool,
converter: str = "tiff",
*args: Any,
**kwargs,
):
Expand All @@ -132,14 +166,10 @@ def ingest_tiff_udf(
from tiledb.bioimg import Converters
from tiledb.bioimg import from_bioimg

converter = kwargs.get("converter", None)
user_converter = Converters.OMETIFF
if not converter or converter == "tiff":
user_converter = Converters.OMETIFF
elif converter == "zarr":
user_converter = Converters.OMEZARR
elif converter == "osd":
user_converter = Converters.OSD
user_converter = {
"zarr": Converters.OMEZARR,
"osd": Converters.OSD,
}.get(converter, Converters.OMETIFF)

compressor = kwargs.get("compressor", None)
if compressor:
Expand Down Expand Up @@ -169,10 +199,71 @@ def ingest_tiff_udf(
verbose=verbose,
**kwargs,
)
return io_uris

def register_dataset_udf(
io_uris: Sequence[Tuple],
*,
acn: str,
namespace: Optional[str] = None,
config: Optional[Mapping[str, Any]] = None,
verbose: bool = False,
) -> None:
"""
Register the dataset on TileDB Cloud.
The ingested assets are being registered with the register name matching
the ingested filename of the data
:param io_uris: Tuple of source and dest paths from ingestion UDF
:param namespace: TileDB Cloud namespace, defaults to the user's
default namespace
:param config: config dictionary, defaults to None
:param verbose: verbose logging, defaults to False
"""
import os

if isinstance(source, str):
# Handle only lists
source = [source]
logger = get_logger_wrapper(verbose)

namespace = namespace or tiledb.cloud.user_profile().default_namespace_charged
# Default registration based on the output filename
register_map = {}
tiledb_uris = {}
for _, output_path in io_uris:
register_name = os.path.splitext(os.path.basename(output_path))[0]
register_map[output_path] = register_name
tiledb_uris[output_path] = f"tiledb://{namespace}/{register_name}"

with tiledb.scope_ctx(config):
for dataset_uri, tiledb_uri in tiledb_uris.items():
found = False
try:
object_type = tiledb.object_type(tiledb_uri)
logger.debug("Object type of %s : %s", tiledb_uri, object_type)
if object_type == "group":
found = True
elif object_type is not None:
raise ValueError(
f"Another object is already registered at '{tiledb_uri}'."
)

except Exception:
# tiledb.object_type raises an exception
# if the namespace does not exist
logger.error(
"Error checking if %r is registered. Bad namespace?", tiledb_uri
)
raise

if found:
logger.info("Dataset already registered at %r.", tiledb_uri)
else:
logger.info("Registering dataset %s at %s", dataset_uri, tiledb_uri)
tiledb.cloud.groups.register(
dataset_uri,
name=register_name,
namespace=namespace,
credentials_name=acn,
)

# Default None the TIFF converter is used
# The Converters Enum is defined in the tiledb-bioimg package
Expand All @@ -183,8 +274,9 @@ def ingest_tiff_udf(
f"The selected converter is not supported please \
choose on of {_SUPPORTED_CONVERTERS}"
)

logger.debug("Ingesting files: %s", source)
source = [source] if isinstance(source, str) else source
output = [output] if isinstance(output, str) else output
validate_io_paths(source, output)

# Build the task graph
dag_name = taskgraph_name or DEFAULT_DAG_NAME
Expand All @@ -210,6 +302,8 @@ def ingest_tiff_udf(
num_batches,
output_ext,
_SUPPORTED_EXTENSIONS,
*args,
verbose=verbose,
access_credentials_name=kwargs.get("access_credentials_name"),
name=f"{dag_name} input collector",
result_format="json",
Expand All @@ -220,22 +314,37 @@ def ingest_tiff_udf(
logger.debug("Compressor: %r", compressor)
compressor_serial = serialize_filter(compressor) if compressor else None

graph.submit(
ingest_list_node = graph.submit(
ingest_tiff_udf,
input_list_node,
config,
verbose,
exclude_metadata,
threads,
converter,
*args,
name=f"{dag_name} ingestor ",
expand_node_output=input_list_node,
resources=DEFAULT_RESOURCES if resources is None else resources,
image_name=DEFAULT_IMG_NAME,
max_workers=threads,
compressor=compressor_serial,
converter=converter,
**kwargs,
)

if register:
graph.submit(
register_dataset_udf,
ingest_list_node,
config=config,
verbose=verbose,
acn=kwargs.get("access_credentials_name"),
namespace=namespace,
name=f"{dag_name} registrator ",
expand_node_output=ingest_list_node,
resources=DEFAULT_RESOURCES if resources is None else resources,
image_name=DEFAULT_IMG_NAME,
**kwargs,
)
if compute:
run_dag(graph, debug=verbose)
return graph
Expand Down
Loading

0 comments on commit ce36be3

Please sign in to comment.