Skip to content

Commit

Permalink
Add registered_name to SOMA ingestion. (#640)
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnMoutafis authored Sep 12, 2024
1 parent 77b3d6d commit c88608e
Showing 1 changed file with 87 additions and 3 deletions.
90 changes: 87 additions & 3 deletions src/tiledb/cloud/soma/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
import os
import re
import warnings
from typing import ContextManager, Dict, Optional
from typing import Any, ContextManager, Dict, Mapping, Optional
from unittest import mock

import tiledb
from tiledb.cloud import dag
from tiledb.cloud._common import functions
from tiledb.cloud._common import utils
from tiledb.cloud.utilities import as_batch
from tiledb.cloud.utilities import get_logger_wrapper
from tiledb.cloud.utilities import run_dag
Expand All @@ -16,6 +17,60 @@
"""Default resource size; equivalent to a "large" UDF container."""


def register_dataset_udf(
dataset_uri: str,
*,
register_name: str,
acn: str,
namespace: Optional[str] = None,
config: Optional[Mapping[str, Any]] = None,
verbose: bool = False,
) -> None:
"""
Register the dataset on TileDB Cloud.
:param dataset_uri: dataset URI
:param register_name: name to register the dataset with on TileDB Cloud
:param namespace: TileDB Cloud namespace, defaults to the user's default namespace
:param config: config dictionary, defaults to None
:param verbose: verbose logging, defaults to False
"""

logger = get_logger_wrapper(verbose)

namespace = namespace or tiledb.cloud.user_profile().default_namespace_charged
tiledb_uri = f"tiledb://{namespace}/{register_name}"

with tiledb.scope_ctx(config):
found = False
try:
object_type = tiledb.object_type(tiledb_uri)
if object_type == "group":
found = True
elif object_type is not None:
raise ValueError(
f"Another object is already registered at '{tiledb_uri}'."
)

except Exception:
# tiledb.object_type raises an exception if the namespace does not exist
logger.error(
"Error checking if %r is registered. Bad namespace?", tiledb_uri
)
raise

if found:
logger.info("Dataset already registered at %r.", tiledb_uri)
else:
logger.info("Registering dataset at %r.", tiledb_uri)
tiledb.cloud.groups.register(
dataset_uri,
name=register_name,
namespace=namespace,
credentials_name=acn,
)


def run_ingest_workflow_udf(
*,
output_uri: str,
Expand Down Expand Up @@ -246,6 +301,7 @@ def run_ingest_workflow(
ingest_mode: str = "write",
resources: Optional[Dict[str, object]] = None,
namespace: Optional[str] = None,
register_name: Optional[str] = None,
acn: Optional[str] = None,
logging_level: int = logging.INFO,
dry_run: bool = False,
Expand Down Expand Up @@ -278,6 +334,7 @@ def run_ingest_workflow(
:param resources: A specification for the amount of resources to provide
to the UDF executing the ingestion process, to override the default.
:param namespace: An alternate namespace to run the ingestion process under.
:param register_name: name to register the dataset with on TileDB Cloud.
:param acn: The name of the credentials to pass to the executing UDF.
:param dry_run: If provided and set to ``True``, does the input-path
traversals without ingesting data.
Expand All @@ -303,6 +360,19 @@ def run_ingest_workflow(
)
)

try:
ns, dst = utils.split_uri(output_uri)
namespace = namespace or ns
# Ensure compatibility with "tiledb://<namespace>/<bucket-path>"
# style URIs
if "://" in dst:
output_uri = dst
except ValueError:
pass

if not register_name:
register_name = os.path.splitext(os.path.basename(output_uri))[0]

# Graph init
grf = dag.DAG(
name="ingest-h5ad-launcher",
Expand All @@ -317,7 +387,7 @@ def run_ingest_workflow(
"access_credentials_name": acn,
}

grf.submit(
ingest_workflow = grf.submit(
_run_ingest_workflow_udf_byval,
output_uri=output_uri,
input_uri=input_uri,
Expand All @@ -334,8 +404,21 @@ def run_ingest_workflow(
dry_run=dry_run,
)

# Start the ingestion process
verbose = logging_level == logging.DEBUG

# Register the SOMA result
if not dry_run:
grf.submit(
_register_dataset_udf_byval,
output_uri,
namespace=namespace,
register_name=register_name,
config=extra_tiledb_config,
verbose=verbose,
acn=acn,
).depends_on(ingest_workflow)

# Start the ingestion process
run_dag(grf, debug=verbose)

# Get the initial graph node UUID
Expand All @@ -355,6 +438,7 @@ def run_ingest_workflow(
_ingest_h5ad_byval = functions.to_register_by_value(ingest_h5ad)
_run_ingest_workflow_byval = functions.to_register_by_value(run_ingest_workflow)
_run_ingest_workflow_udf_byval = functions.to_register_by_value(run_ingest_workflow_udf)
_register_dataset_udf_byval = functions.to_register_by_value(register_dataset_udf)
_hack_patch_anndata_byval = functions.to_register_by_value(_hack_patch_anndata)

ingest = as_batch(_run_ingest_workflow_byval)

0 comments on commit c88608e

Please sign in to comment.