Skip to content

Commit

Permalink
Creates PoC pipeline to estimate disk usage of vcf_to_bq on Dataflow.
Browse files Browse the repository at this point in the history
The pipeline uses raw file size and raw+encoded sizes of a short snippet at beginning of VCF files to
estimate the encoded size for a commit. The major blocking bug is that when the snippets are being read from VCFs in an
encoded format, lines are being read more than once.
  • Loading branch information
hanjohn committed Aug 17, 2018
1 parent 6420e96 commit 08b8772
Show file tree
Hide file tree
Showing 6 changed files with 372 additions and 0 deletions.
131 changes: 131 additions & 0 deletions gcp_variant_transforms/beam_io/vcf_snippet_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Copyright 2017 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A source for reading VCF file headers."""

from __future__ import absolute_import

import logging

from apache_beam.io import filebasedsource
from apache_beam.io import range_trackers # pylint: disable=unused-import
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.iobase import Read
from apache_beam.transforms import PTransform
from typing import Iterable # pylint: disable=unused-import

from gcp_variant_transforms.beam_io import vcf_parser, vcfio


class _VcfSnippetSource(filebasedsource.FileBasedSource):
"""A source for reading a limited number of variants from a set of VCF files.
Lines that are malformed are skipped.
Parses VCF files (version 4) using PyVCF library.
"""

DEFAULT_VCF_READ_BUFFER_SIZE = 65536 # 64kB

def __init__(self,
file_pattern,
snippet_size,
compression_type=CompressionTypes.AUTO,
validate=True):
# type: (str, int, str, bool) -> None
super(_VcfSnippetSource, self).__init__(file_pattern,
compression_type=compression_type,
validate=validate,
splittable=False)
self._compression_type = compression_type
self._snippet_size = snippet_size

def read_records(
self,
file_name, # type: str
range_tracker # type: range_trackers.UnsplittableRangeTracker
):
# type: (...) -> Iterable[Tuple[str, str, vcfio.Variant]]
# Iterator to emit lines encoded as `Variant` objects.
record_iterator = vcf_parser.PyVcfParser(
file_name,
range_tracker,
self._pattern,
self._compression_type,
allow_malformed_records=True,
representative_header_lines=None,
buffer_size=self.DEFAULT_VCF_READ_BUFFER_SIZE,
skip_header_lines=0)

# Open distinct channel to read lines as raw bytestrings.
with FileSystems.open(file_name, self._compression_type) as raw_reader:
line = raw_reader.readline()
while line and line.startswith('#'):
# Skip headers, assume header size is negligible.
line = raw_reader.readline()

count = 0
for encoded_record in record_iterator:
raw_record = raw_reader.readline()

if count >= self._snippet_size:
break
if not isinstance(encoded_record, vcfio.Variant):
continue

count += 1
yield file_name, raw_record, encoded_record


class ReadVcfSnippet(PTransform):
"""A PTransform for reading a limited number of lines from a set of VCF files.
Output will be a PTable mapping from `file names -> Tuple[(line, Variant)]`
objects. The list contains the first `snippet_size` number of lines that are
not malformed, first as a raw string and then encoded as a `Variant` class.
Parses VCF files (version 4) using PyVCF library.
"""

def __init__(
self,
file_pattern, # type: str
snippet_size, # type: int
compression_type=CompressionTypes.AUTO, # type: str
validate=True, # type: bool
**kwargs # type: **str
):
# type: (...) -> None
"""Initialize the :class:`ReadVcfHeaders` transform.
Args:
file_pattern: The file path to read from either as a single file or a glob
pattern.
snippet_size: The number of lines that should be read from the file.
compression_type: Used to handle compressed input files.
Typical value is :attr:`CompressionTypes.AUTO
<apache_beam.io.filesystem.CompressionTypes.AUTO>`, in which case the
underlying file_path's extension will be used to detect the compression.
validate: Flag to verify that the files exist during the pipeline creation
time.
"""
super(ReadVcfSnippet, self).__init__(**kwargs)
self._source = _VcfSnippetSource(
file_pattern, snippet_size, compression_type, validate=validate)

def expand(self, pvalue):
return pvalue.pipeline | Read(self._source)


Empty file.
14 changes: 14 additions & 0 deletions gcp_variant_transforms/libs/preprocess_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
TODO(allieychen): Eventually, it also contains the resource estimation.
Output example (assuming opening in spreedsheet):
Estimated disk usage by Dataflow: 4846.0 GB. The total raw file sizes summed up
to 1231.0 GB.
Header Conflicts
ID Category Conflicts File Paths Proposed Resolution
NS INFO num=1 type=Float file1 num=1 type=Float
Expand All @@ -43,6 +45,7 @@
File Path Variant Record Error Message
file 1 rs6 G A 29 PASS NS=3; invalid literal for int() with base 10.
"""
import math

from typing import Dict, List, Optional, Union # pylint: disable=unused-import

Expand Down Expand Up @@ -77,6 +80,7 @@ class _HeaderLine(object):
def generate_report(
header_definitions, # type: merge_header_definitions.VcfHeaderDefinitions
file_path, # type: str
disk_usage_estimate, # type: int
resolved_headers=None, # type: vcf_header_io.VcfHeader
inferred_headers=None, # type: vcf_header_io.VcfHeader
malformed_records=None # type: List[vcfio.MalformedVcfRecord]
Expand All @@ -97,6 +101,7 @@ def generate_report(
"""
resolved_headers = resolved_headers or vcf_header_io.VcfHeader()
with filesystems.FileSystems.create(file_path) as file_to_write:
_append_disk_usage_estimate_to_report(file_to_write, disk_usage_estimate)
_append_conflicting_headers_to_report(file_to_write, header_definitions,
resolved_headers)
_append_inferred_headers_to_report(file_to_write, inferred_headers)
Expand Down Expand Up @@ -272,6 +277,15 @@ def _format_definition(num_value, type_value):
]
return ' '.join(formatted_definition)

def _append_disk_usage_estimate_to_report(file_to_write, disk_usage_estimate):
# type: (file, FileSizeInfo) -> None
if disk_usage_estimate is None:
return
file_to_write.write(
'Estimated disk usage by Dataflow: {} GB. The total raw file sizes summed '
'up to {} GB.\n'.format(
int(math.ceil(disk_usage_estimate.encoded / 1e9)),
int(math.ceil(disk_usage_estimate.raw / 1e9))))

def _append_to_report(file_to_write, error_type, header, contents):
# type: (file, str, str, List[str]) -> None
Expand Down
179 changes: 179 additions & 0 deletions gcp_variant_transforms/libs/resource_estimator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
# Copyright 2018 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper functions for estimating the resources that a vcf_to_bq will require.
Currently, the resource estimator only estimates the disk usage that a Dataflow
pipeline will take along with the `MergeVariants` step, since this can cause
expensive pipeline failures late in the run.
"""
import logging

import apache_beam as beam
from apache_beam import coders
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystems import FileSystems

from gcp_variant_transforms.beam_io import vcfio


# TODO(hanjohn): Add unit tests.

def _convert_variant_snippets_to_bytesize(variant):
# type: (vcfio.Variant -> int)
return coders.registry.get_coder(vcfio.Variant).estimate_size(variant)


class SnippetSizeInfo(object):
def __init__(self,
raw_snippet_size, # type: int
encoded_snippet_size, # type: int
):
# type: (...) -> (None)
self.raw = raw_snippet_size
self.encoded = encoded_snippet_size


class FileSizeInfo(object):
def __init__(self, raw_file_size, encoded_file_size=None):
# type: (int, int) -> (None)
self.raw = raw_file_size
self.encoded = encoded_file_size

def calculateEncodedFileSize(self, snippet_size_info):
# type: (SnippetSizeInfo) -> (None)
"""Estimate a VCF file's encoded size based on snippet analysis.
Given the raw_file_size and measurements of several VCF lines from the file,
estimate how much disk the file will take after expansion due to encoding
lines as `vcfio.Variant` objects. The encoded_snippet_size will be set as
`self.encoded`.
This is a simple ratio problem, solving for encoded_snippet_size which is
the only unknown:
encoded_snippet_size / raw_snippet_size = encoded_file_size / raw_file_size
"""
if snippet_size_info.raw == 0:
logging.error("VCF file {} reported with 0 well-formed variant lines; "
"its contribution to disk resource usage will be "
"ignored.".format(self.name))
self.encoded = 0
self.raw = 0
else:
self.encoded = (self.raw * snippet_size_info.encoded /
snippet_size_info.raw)


def measure_variant_size(element):
# type: (Tuple[str, str, vcfio.Variant]) -> (Tuple[str, SnippetSizeInfo])
"""Measure the lengths of the raw and encoded representations of a Variant.
Given a PTable mapping file_paths to the raw (bytestring) and vcfio.Variant-
encoded representations of a Variant line, have the output PTable instead map
from the file_paths to a Tuple with the (raw, encoded) representation sizes.
The file_path keys are not expected to be unique.
"""
file_path, raw_variant, encoded_variant = element
encoded_variant_size = _convert_variant_snippets_to_bytesize(encoded_variant)
raw_variant_size = len(raw_variant)
return file_path, SnippetSizeInfo(raw_variant_size, encoded_variant_size)


def estimate_file_encoded_size(element):
# type: (Tuple[str, Dict[str, Object]]) -> (Tuple[str, FileSizeInfo])
file_name, metrics = element
file_size_info = metrics['whole_file_raw_size'][0] # type: FileSizeInfo
snippet_size_info = metrics['snippet_stats'][0] # type: SnippetSizeInfo

# Assume that the ratio of encoded size to raw disk size is roughly the same
# throughout the file compared to the first several lines.
file_size_info.calculateEncodedFileSize(snippet_size_info)

logging.debug("File %s has raw file size %d, raw snippet size %d, encoded "
"size %d. Estimated encoded file size: %d",
file_name, file_size_info.raw, snippet_size_info.raw,
snippet_size_info.encoded, file_size_info.encoded)
return file_name, file_size_info

def get_file_sizes(input_pattern):
# type: (str) -> (List[FileSizeInfo])
match_results = FileSystems.match([input_pattern])
file_sizes = []
for match in match_results:
for file_metadata in match.metadata_list:
compression_type = CompressionTypes.detect_compression_type(
file_metadata.path)
if compression_type != CompressionTypes.UNCOMPRESSED:
logging.error("VCF file {} is compressed; disk requirement estimator "
"will not be accurate.",
file_metadata.path)

file_sizes.append((file_metadata.path,
FileSizeInfo(file_metadata.size_in_bytes),))
return file_sizes


class SnippetSizeInfoSumFn(beam.CombineFn):
"""Combiner Function to sum up the size fields of SnippetSizeInfos.
Example: [SnippetSizeInfo(a, b), SnippetSizeInfo(c, d)] ->
SnippetSizeInfo(a+c, b+d)
"""
def create_accumulator(self):
# type: (None) -> (Tuple[int, int])
return (0, 0) # (raw, encoded) sums

def add_input(self, sums, input):
# type: (Tuple[int, int], SnippetSizeInfo) -> (Tuple[int, int])
return sums[0] + input.raw, sums[1] + input.encoded

def merge_accumulators(self, accumulators):
# type: (Iterable[Tuple[int, int]]) -> (Tuple[int, int])
first, second = zip(*accumulators)
return sum(first), sum(second)

def extract_output(self, sums):
# type: (Tuple[int, int]) -> (SnippetSizeInfo)
return SnippetSizeInfo(*sums)


class FileSizeInfoSumFn(beam.CombineFn):
"""Combiner Function to sum up the size fields of Tuple[str, FileSizeInfo]s.
Unlike SnippetSizeInfoSumFn, the input is a PTable mapping str to
FileSizeInfo, so the input is a tuple with the FileSizeInfos as the second
field. The output strips out the str key which represents the file path.
Example: [('/path/a', FileSizeInfo(a, b)), ('/path/b;, FileSizeInfo(c, d))] ->
FileSizeInfo(a+c, b+d)
"""
def create_accumulator(self):
# type: (None) -> (Tuple[int, int])
return (0, 0) # (raw, encoded) sums

def add_input(self, raw_encoded, input):
# type: (Tuple[int, int], Tuple[str, FileSizeInfo]) -> (Tuple[int, int])
raw, encoded = raw_encoded
_, file_size_info = input
return raw + file_size_info.raw, encoded + file_size_info.encoded

def merge_accumulators(self, accumulators):
# type: (Iterable[Tuple[int, int]]) -> (Tuple[int, int])
raw, encoded = zip(*accumulators)
return sum(raw), sum(encoded)

def extract_output(self, raw_encoded):
# type: (Tuple[int, int]) -> (FileSizeInfo)
raw, encoded = raw_encoded
return FileSizeInfo(raw, encoded)
7 changes: 7 additions & 0 deletions gcp_variant_transforms/options/variant_transform_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,13 @@ def add_arguments(self, parser):
help=('The full path of the resolved headers. The file will not be'
'generated if unspecified. Otherwise, please provide a local '
'path if run locally, or a cloud path if run on Dataflow.'))
parser.add_argument(
'--estimate_disk_usage',
type='bool', default=False, nargs='?', const=True,
help=('By default, disk resource usage will not be estimated.'
'If true, the preprocessor will estimate the maximum disk usage '
'consumed at any step in the pipeline, which could lead to '
'out-of-disk errors at a shuffle step e.g. MergeVariants.'))

class PartitionOptions(VariantTransformsOptions):
"""Options for partitioning Variant records."""
Expand Down
Loading

0 comments on commit 08b8772

Please sign in to comment.