Skip to content

Commit

Permalink
Filtering with suffix and fixing root paths inclusion (#440)
Browse files Browse the repository at this point in the history
  • Loading branch information
ktsitsi authored Jul 14, 2023
1 parent 1d6d92e commit fb18d7b
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/tiledb/cloud/bioimg/exportation.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def export_tiff_udf(

# Get the list of all BioImg samples input/out
samples = get_uris(source, output, config, "tiff")
batch_size, max_workers = scale_calc(source, num_batches)
batch_size, max_workers = scale_calc(samples, num_batches)

# Build the task graph
dag_name = taskgraph_name or DEFAULT_DAG_NAME
Expand Down
22 changes: 9 additions & 13 deletions src/tiledb/cloud/bioimg/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@ def get_uris(
vfs = tiledb.VFS(config=config)

def create_output_path(input_file, output_dir) -> str:
return os.path.join(output_dir, os.path.basename(input_file) + f".{output_ext}")
filename = os.path.splitext(os.path.basename(input_file))[0]
return os.path.join(output_dir, filename + f".{output_ext}")

def iter_paths(sequence) -> Iterator[Tuple]:
for uri in sequence:
yield uri, create_output_path(uri, output_dir)
if uri.endswith((".tiff", ".tif", ".tdb")):
yield uri, create_output_path(uri, output_dir)

if len(source) == 1 and vfs.is_dir(source[0]):
# Check if the dir is actually a tiledb group for exportation
with tiledb.scope_ctx(ctx_or_config=config):
if tiledb.object_type(source[0]) != "group":
# Folder like input
contents = vfs.ls(source[0])
if len(contents) == 1:
return tuple(iter_paths(contents[1:]))
if len(contents) != 0:
return tuple(iter_paths(contents))
else:
raise ValueError("Input bucket should contain images for ingestion")
else:
Expand All @@ -56,7 +58,7 @@ def batch(iterable, chunks):
yield iterable[ndx : min(ndx + chunks, length)]


def scale_calc(source, num_batches):
def scale_calc(samples, num_batches):
"""Calculate scaling settings for batch_size and max_workers
:param source: The source iterable containing files to be ingested/exported
Expand All @@ -65,12 +67,6 @@ def scale_calc(source, num_batches):
"""
# If num_batches is default create number of images nodes
# constraint node max_workers to 20 fully heuristic
if num_batches is None:
num_batches = len(source)
batch_size = 1
max_workers = 20
else:
batch_size = math.ceil(len(source) / num_batches)
max_workers = None

batch_size = 1 if num_batches is None else math.ceil(len(samples) / num_batches)
max_workers = 20 if num_batches is None else None
return batch_size, max_workers
2 changes: 1 addition & 1 deletion src/tiledb/cloud/bioimg/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def ingest_tiff_udf(

# Get the list of all BioImg samples input/out
samples = get_uris(source, output, config, "tdb")
batch_size, max_workers = scale_calc(source, num_batches)
batch_size, max_workers = scale_calc(samples, num_batches)

# Build the task graph
dag_name = taskgraph_name or DEFAULT_DAG_NAME
Expand Down
129 changes: 129 additions & 0 deletions tests/test_bioimg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import math
import unittest
from unittest import mock

from tiledb.cloud.bioimg.helpers import batch
from tiledb.cloud.bioimg.helpers import get_uris
from tiledb.cloud.bioimg.helpers import scale_calc
from tiledb.vfs import VFS


class BioimgTest(unittest.TestCase):
def setUp(self) -> None:
self.out_path = "out"
return super().setUp()

def test_get_uris_ingestion(self):
ingest_uri_sample = {
"test1": ["test1.tiff", "test2.tiff", "test3.tiff"],
"test2": [],
"test3": ["test1.tiff", "test2.tiff", "test3.zarr"],
}

# Case 1-1 Ingestion - Ingest mode has tdb as target suffix
out_suffix = "tdb"
test1 = ingest_uri_sample["test1"]
with mock.patch.object(VFS, "ls", return_value=test1):
out_suffix = "tdb"
result_1 = get_uris(test1, self.out_path, None, out_suffix)
expected_1 = (
("test1.tiff", "out/test1.tdb"),
("test2.tiff", "out/test2.tdb"),
("test3.tiff", "out/test3.tdb"),
)
self.assertTupleEqual(result_1, expected_1)

# Case 1-2 Ingestion - Empty input list
with mock.patch.object(VFS, "ls", return_value=ingest_uri_sample["test2"]):
# Empty input list raises error
self.assertRaises(
get_uris(ingest_uri_sample["test2"], self.out_path, None, out_suffix)
)

# Case 1-3 Ingestion - Files with different suffices
test3 = ingest_uri_sample["test3"]
with mock.patch.object(VFS, "ls", return_value=test3):
result_3 = get_uris(test3, self.out_path, None, out_suffix)
expected_3 = (
("test1.tiff", "out/test1.tdb"),
("test2.tiff", "out/test2.tdb"),
)
self.assertTupleEqual(result_3, expected_3)

def test_get_uris_exportation(self):
export_uri_sample = {
"test1": ["test1.tdb", "test2.tdb", "test3.tdb"],
"test2": [],
"test3": ["test1.tdb", "test2.tdb", "test3.zarr"],
}

# Case 2-1 Exportation normal mode has tiff as target suffix
out_suffix = "tiff"
test1 = export_uri_sample["test1"]
with mock.patch.object(VFS, "ls", return_value=test1):
result = get_uris(test1, self.out_path, None, out_suffix)
expected = (
("test1.tdb", "out/test1.tiff"),
("test2.tdb", "out/test2.tiff"),
("test3.tdb", "out/test3.tiff"),
)
self.assertTupleEqual(result, expected)

# Case 2-2 Exportation empty list
test2 = export_uri_sample["test2"]
with mock.patch.object(VFS, "ls", return_value=test2):
# Empty input list raises error
self.assertRaises(get_uris(test2, self.out_path, None, out_suffix))

# Case 2-3 Exportation Files with different suffices
test3 = export_uri_sample["test3"]
with mock.patch.object(VFS, "ls", return_value=test3):
result_3 = get_uris(test3, self.out_path, None, out_suffix)
expected_3 = (
("test1.tdb", "out/test1.tiff"),
("test2.tdb", "out/test2.tiff"),
)
self.assertTupleEqual(result_3, expected_3)

def test_batch(self):
samples = (
("test1.tiff", "out/test1.tdb"),
("test2.tiff", "out/test2.tdb"),
("test3.tiff", "out/test3.tdb"),
)

# Three nodes with 1 job are expected
batch_size = 1
result = batch(samples, batch_size)
expected = 3
self.assertEqual(len(tuple(result)), expected)

# Two nodes with 2,1 jobs are expected
batch_size = 2
result = batch(samples, batch_size)
expected = 2
self.assertEqual(len(tuple(result)), expected)

# One node with all jobs is expected
batch_size = 3
result = batch(samples, batch_size)
expected = 1
self.assertEqual(len(tuple(result)), expected)

def test_scale_calc(self):
samples = (
("test1.tiff", "out/test1.tdb"),
("test2.tiff", "out/test2.tdb"),
("test3.tiff", "out/test3.tdb"),
)

result = scale_calc(samples, None)
self.assertEqual(result, (1, 20))

expected = math.ceil(len(samples) / 2)
result = scale_calc(samples, 2)
self.assertEqual(result, (expected, None))

expected = math.ceil(len(samples) / 3)
result = scale_calc(samples, 3)
self.assertEqual(result, (expected, None))

0 comments on commit fb18d7b

Please sign in to comment.