Skip to content

Commit

Permalink
Consolidate VCF stats arrays at the end of ingestion (#432)
Browse files Browse the repository at this point in the history
  • Loading branch information
gspowley authored Jul 18, 2023
1 parent a5bd4f7 commit 14cca1e
Show file tree
Hide file tree
Showing 2 changed files with 214 additions and 2 deletions.
14 changes: 14 additions & 0 deletions src/tiledb/cloud/vcf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
from .allele_frequency import read_allele_frequency
from .ingestion import Contigs
from .ingestion import ingest
from .query import build_read_dag
from .query import read
from .utils import bgzip_and_index
from .utils import create_index_file
from .utils import find_index
from .utils import get_record_count
from .utils import get_sample_name
from .utils import is_bgzipped

__all__ = [
"Contigs",
"ingest",
"build_read_dag",
"read",
"read_allele_frequency",
"bgzip_and_index",
"create_index_file",
"find_index",
"get_sample_name",
"get_record_count",
"is_bgzipped",
]
202 changes: 200 additions & 2 deletions src/tiledb/cloud/vcf/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
import sys
from collections import defaultdict
from fnmatch import fnmatch
from functools import partial
from math import ceil
from multiprocessing.pool import ThreadPool
from os.path import basename
from typing import Any, Mapping, Optional, Sequence, Tuple, Union

import numpy as np
Expand Down Expand Up @@ -41,13 +44,22 @@
VCF_WORKERS = 40
VCF_THREADS = 8
VCF_HEADER_MB = 50 # memory per sample per thread
TARGET_FRAGMENT_BYTES = 1 << 30 # 1 GiB
CONSOLIDATION_BUFFER_SIZE = 1 << 30 # 1 GiB
CONSOLIDATION_THREADS = 8

# Consolidation task resources
CONSOLIDATE_RESOURCES = {
"cpu": "4",
"memory": "16Gi",
}

# Consolidation fragment task resources
CONSOLIDATE_FRAGMENT_RESOURCES = {
"cpu": "8",
"memory": "32Gi",
}

# Load manifest task resources
MANIFEST_RESOURCES = {
"cpu": "1",
Expand Down Expand Up @@ -719,7 +731,7 @@ def consolidate_dataset_udf(
continue

# NOTE: REST currently only supports fragment_meta, commits, metadata
modes = ["commits", "fragment_meta"]
modes = ["commits", "fragment_meta", "array_meta"]

# Consolidate fragments for selected arrays
if name in [LOG_ARRAY, MANIFEST_ARRAY, "vcf_headers"]:
Expand All @@ -740,6 +752,144 @@ def consolidate_dataset_udf(
print(e)


def consolidate_fragments_udf(
dataset_uri: str,
*,
group_member: str,
config: Optional[Mapping[str, Any]] = None,
id: str = "consolidate",
verbose: bool = False,
unique_dim_index: int = 0,
) -> None:
"""
Consolidate fragments for an array in the dataset using a bin packing consolidation
algorithm that packs fragments into bins based on their size and the target size of
the consolidated fragment.
Fragments are first grouped by the nonempty-domain of the dimension with index
`unique_dim_index`. This ensures that fragments with the same nonempty-domain are
consolidated together. For example, fragments with `contig=chr1` will be
consolidated together, and fragments with `contig=chr2` will be consolidated
together, but fragments with `contig=chr1` and `contig=chr2` will not be
consolidated together.
:param dataset_uri: dataset URI
:param group_member: array group member
:param config: config dictionary, defaults to None
:param id: profiler event id, defaults to "consolidate"
:param verbose: verbose logging, defaults to False
:param unique_dim_index: index of the dimension used to group fragments,
defaults to 0
"""

logger = get_logger_wrapper(verbose)

# Define a method to pack and consolidate fragments.
def consolidate_fragments(
fragment_info_list: Sequence[tiledb.FragmentInfo],
config: Optional[Mapping[str, Any]] = None,
) -> None:
"""
Run a bit packing algorithm to select fragments to be consolidated together
into a single fragment.
:param fragment_info_list: list of fragment info objects
:param config: config dictionary, defaults to None
"""

# Return if there are no fragments to consolidate.
if len(fragment_info_list) < 2:
return

total_size = []
fragment_bins = []
vfs = tiledb.VFS(config=config)

for fi in fragment_info_list:
# Get the size of the fragment.
size = vfs.dir_size(fi.uri)
logger.debug("Packing fragment with size: %d", size)

packed = False
for i in range(len(total_size)):
# Add fragment to the first bin that has enough space.
if total_size[i] + size < TARGET_FRAGMENT_BYTES:
total_size[i] += size
fragment_bins[i].append(basename(fi.uri))
logger.debug(
" packed into bin %d, new size = %f MiB",
i,
total_size[-1] / (1 << 20),
)
packed = True
break

# If fragment was not packed into any bin, create a new bin.
if not packed:
total_size.append(size)
fragment_bins.append([basename(fi.uri)])
logger.debug(
" packed into new bin, new size = %f MiB",
total_size[-1] / (1 << 20),
)

logger.debug("Total bins = %d", len(total_size))

# Consolidate the fragments in each bin.
for i, fragment_uris in enumerate(fragment_bins):
logger.debug(
" %d: num_uris = %d size = %.3f MiB",
i,
len(fragment_uris),
total_size[i] / (1 << 20),
)

# If there is only one fragment, skip consolidation.
if len(fragment_uris) > 1:
logger.debug("Consolidating %d fragments", len(fragment_uris))

# Set config to consolidate fragments.
config = tiledb.Config(config)
config["sm.consolidation.mode"] = "fragments"
config["sm.consolidation.buffer_size"] = f"{CONSOLIDATION_BUFFER_SIZE}"

# Consolidate fragments.
with tiledb.open(array_uri, "w", config=config) as array:
array.consolidate(fragment_uris=fragment_uris)

with tiledb.scope_ctx(config):
with Profiler(group_uri=dataset_uri, group_member=LOG_ARRAY, id=id):
with tiledb.Group(dataset_uri) as group:
array_uri = group[group_member].uri

# Build list of fragments to be consolidated for each unique nonempty-domain
# in the unique dimension.
fragment_info_lists = defaultdict(list)

fis = tiledb.FragmentInfoList(array_uri)
for fi in fis:
fragment_info_lists[fi.nonempty_domain[unique_dim_index]].append(fi)

logger.debug(
"Consolidating %d fragments into %d fragments",
len(fis),
len(fragment_info_lists),
)

# Run consolidation on each list of fragments.
with ThreadPool(CONSOLIDATION_THREADS) as pool:
pool.map(
partial(consolidate_fragments, config=config),
fragment_info_lists.values(),
)

# Vacuum fragments.
logger.debug("Vacuuming fragments")
config = tiledb.Config(config)
config["sm.vacuum.mode"] = "fragments"
tiledb.vacuum(array_uri, config=config)


# --------------------------------------------------------------------
# DAGs
# --------------------------------------------------------------------
Expand Down Expand Up @@ -935,6 +1085,7 @@ def ingest_samples_dag(
batch_mode: bool = True,
access_credentials_name: Optional[str] = None,
compute: bool = True,
consolidate_stats: bool = True,
) -> Tuple[Optional[dag.DAG], Sequence[str]]:
"""
Create a DAG to ingest samples into the dataset.
Expand All @@ -957,6 +1108,7 @@ def ingest_samples_dag(
:param access_credentials_name: name of role in TileDB Cloud to use in tasks
:param compute: when True the DAG will be computed before it is returned,
defaults to True
:param consolidate_stats: consolidate the stats arrays, defaults to True
:return: sample ingestion DAG and list of sample URIs ingested
"""

Expand Down Expand Up @@ -1037,7 +1189,7 @@ def ingest_samples_dag(
vcf_memory_mb = 1024 * threads

if ingest_resources is None:
ingest_resources = {"cpu": f"{threads + 2}", "memory": f"{node_memory_mb}Mi"}
ingest_resources = {"cpu": f"{threads}", "memory": f"{node_memory_mb}Mi"}

logger.debug("partitions=%d, consolidates=%d", num_partitions, num_consolidates)
logger.debug("ingest_resources=%s", ingest_resources)
Expand Down Expand Up @@ -1089,6 +1241,49 @@ def ingest_samples_dag(
if consolidate:
consolidate.depends_on(ingest)

# Consolidate fragments in the stats arrays, if enabled
if consolidate_stats:
consolidate_ac = submit(
consolidate_fragments_udf,
dataset_uri,
group_member="allele_count",
config=config,
id="vcf-consol-ac",
verbose=verbose,
resources=CONSOLIDATE_FRAGMENT_RESOURCES,
name="Consolidate allele_count ",
**kwargs,
)

consolidate_vs = submit(
consolidate_fragments_udf,
dataset_uri,
group_member="variant_stats",
config=config,
id="vcf-consol-vs",
verbose=verbose,
resources=CONSOLIDATE_FRAGMENT_RESOURCES,
name="Consolidate variant stats ",
**kwargs,
)

consolidate_ac.depends_on(consolidate)
consolidate_vs.depends_on(consolidate)

consolidate = submit(
consolidate_dataset_udf,
dataset_uri,
config=config,
id="vcf-consol-final",
verbose=verbose,
resources=CONSOLIDATE_RESOURCES,
name="Consolidate VCF final ",
**kwargs,
)

consolidate.depends_on(consolidate_ac)
consolidate.depends_on(consolidate_vs)

if compute:
logger.info("Ingesting %d samples.", len(sample_uris))
run_dag(graph, wait=local_ingest)
Expand Down Expand Up @@ -1136,6 +1331,7 @@ def ingest(
batch_mode: bool = True,
access_credentials_name: Optional[str] = None,
compute: bool = True,
consolidate_stats: bool = True,
aws_find_mode: bool = False,
) -> Tuple[Optional[dag.DAG], Sequence[str]]:
"""
Expand Down Expand Up @@ -1177,6 +1373,7 @@ def ingest(
:param access_credentials_name: name of role in TileDB Cloud to use in tasks
:param compute: when True the DAG will be computed before it is returned,
defaults to True
:param consolidate_stats: consolidate the stats arrays, defaults to True
:param aws_find_mode: use AWS CLI to find VCFs, defaults to False
:return: sample ingestion DAG and list of sample URIs ingested
"""
Expand Down Expand Up @@ -1240,6 +1437,7 @@ def ingest(
batch_mode=batch_mode,
access_credentials_name=access_credentials_name,
compute=compute,
consolidate_stats=consolidate_stats,
)

return dag, sample_uris

0 comments on commit 14cca1e

Please sign in to comment.