diff --git a/gcp_variant_transforms/beam_io/vcf_snippet_io.py b/gcp_variant_transforms/beam_io/vcf_snippet_io.py new file mode 100644 index 000000000..1571020ae --- /dev/null +++ b/gcp_variant_transforms/beam_io/vcf_snippet_io.py @@ -0,0 +1,131 @@ +# Copyright 2017 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A source for reading VCF file headers.""" + +from __future__ import absolute_import + +import logging + +from apache_beam.io import filebasedsource +from apache_beam.io import range_trackers # pylint: disable=unused-import +from apache_beam.io.filesystem import CompressionTypes +from apache_beam.io.filesystems import FileSystems +from apache_beam.io.iobase import Read +from apache_beam.transforms import PTransform +from typing import Iterable # pylint: disable=unused-import + +from gcp_variant_transforms.beam_io import vcf_parser, vcfio + + +class _VcfSnippetSource(filebasedsource.FileBasedSource): + """A source for reading a limited number of variants from a set of VCF files. + + Lines that are malformed are skipped. + + Parses VCF files (version 4) using PyVCF library. + """ + + DEFAULT_VCF_READ_BUFFER_SIZE = 65536 # 64kB + + def __init__(self, + file_pattern, + snippet_size, + compression_type=CompressionTypes.AUTO, + validate=True): + # type: (str, int, str, bool) -> None + super(_VcfSnippetSource, self).__init__(file_pattern, + compression_type=compression_type, + validate=validate, + splittable=False) + self._compression_type = compression_type + self._snippet_size = snippet_size + + def read_records( + self, + file_name, # type: str + range_tracker # type: range_trackers.UnsplittableRangeTracker + ): + # type: (...) -> Iterable[Tuple[str, str, vcfio.Variant]] + # Iterator to emit lines encoded as `Variant` objects. + record_iterator = vcf_parser.PyVcfParser( + file_name, + range_tracker, + self._pattern, + self._compression_type, + allow_malformed_records=True, + representative_header_lines=None, + buffer_size=self.DEFAULT_VCF_READ_BUFFER_SIZE, + skip_header_lines=0) + + # Open distinct channel to read lines as raw bytestrings. + with FileSystems.open(file_name, self._compression_type) as raw_reader: + line = raw_reader.readline() + while line and line.startswith('#'): + # Skip headers, assume header size is negligible. + line = raw_reader.readline() + + count = 0 + for encoded_record in record_iterator: + raw_record = raw_reader.readline() + + if count >= self._snippet_size: + break + if not isinstance(encoded_record, vcfio.Variant): + continue + + count += 1 + yield file_name, raw_record, encoded_record + + +class ReadVcfSnippet(PTransform): + """A PTransform for reading a limited number of lines from a set of VCF files. + + Output will be a PTable mapping from `file names -> Tuple[(line, Variant)]` + objects. The list contains the first `snippet_size` number of lines that are + not malformed, first as a raw string and then encoded as a `Variant` class. + + Parses VCF files (version 4) using PyVCF library. + """ + + def __init__( + self, + file_pattern, # type: str + snippet_size, # type: int + compression_type=CompressionTypes.AUTO, # type: str + validate=True, # type: bool + **kwargs # type: **str + ): + # type: (...) -> None + """Initialize the :class:`ReadVcfHeaders` transform. + + Args: + file_pattern: The file path to read from either as a single file or a glob + pattern. + snippet_size: The number of lines that should be read from the file. + compression_type: Used to handle compressed input files. + Typical value is :attr:`CompressionTypes.AUTO + `, in which case the + underlying file_path's extension will be used to detect the compression. + validate: Flag to verify that the files exist during the pipeline creation + time. + """ + super(ReadVcfSnippet, self).__init__(**kwargs) + self._source = _VcfSnippetSource( + file_pattern, snippet_size, compression_type, validate=validate) + + def expand(self, pvalue): + return pvalue.pipeline | Read(self._source) + + diff --git a/gcp_variant_transforms/beam_io/vcf_snippet_io_test.py b/gcp_variant_transforms/beam_io/vcf_snippet_io_test.py new file mode 100644 index 000000000..e69de29bb diff --git a/gcp_variant_transforms/libs/preprocess_reporter.py b/gcp_variant_transforms/libs/preprocess_reporter.py index dd38ec6e1..625b6a0ee 100644 --- a/gcp_variant_transforms/libs/preprocess_reporter.py +++ b/gcp_variant_transforms/libs/preprocess_reporter.py @@ -26,6 +26,8 @@ TODO(allieychen): Eventually, it also contains the resource estimation. Output example (assuming opening in spreedsheet): +Estimated disk usage by Dataflow: 4846.0 GB. The total raw file sizes summed up +to 1231.0 GB. Header Conflicts ID Category Conflicts File Paths Proposed Resolution NS INFO num=1 type=Float file1 num=1 type=Float @@ -43,6 +45,7 @@ File Path Variant Record Error Message file 1 rs6 G A 29 PASS NS=3; invalid literal for int() with base 10. """ +import math from typing import Dict, List, Optional, Union # pylint: disable=unused-import @@ -77,6 +80,7 @@ class _HeaderLine(object): def generate_report( header_definitions, # type: merge_header_definitions.VcfHeaderDefinitions file_path, # type: str + disk_usage_estimate, # type: int resolved_headers=None, # type: vcf_header_io.VcfHeader inferred_headers=None, # type: vcf_header_io.VcfHeader malformed_records=None # type: List[vcfio.MalformedVcfRecord] @@ -97,6 +101,7 @@ def generate_report( """ resolved_headers = resolved_headers or vcf_header_io.VcfHeader() with filesystems.FileSystems.create(file_path) as file_to_write: + _append_disk_usage_estimate_to_report(file_to_write, disk_usage_estimate) _append_conflicting_headers_to_report(file_to_write, header_definitions, resolved_headers) _append_inferred_headers_to_report(file_to_write, inferred_headers) @@ -272,6 +277,15 @@ def _format_definition(num_value, type_value): ] return ' '.join(formatted_definition) +def _append_disk_usage_estimate_to_report(file_to_write, disk_usage_estimate): + # type: (file, FileSizeInfo) -> None + if disk_usage_estimate is None: + return + file_to_write.write( + 'Estimated disk usage by Dataflow: {} GB. The total raw file sizes summed ' + 'up to {} GB.\n'.format( + int(math.ceil(disk_usage_estimate.encoded / 1e9)), + int(math.ceil(disk_usage_estimate.raw / 1e9)))) def _append_to_report(file_to_write, error_type, header, contents): # type: (file, str, str, List[str]) -> None diff --git a/gcp_variant_transforms/libs/resource_estimator.py b/gcp_variant_transforms/libs/resource_estimator.py new file mode 100644 index 000000000..eb88dfb21 --- /dev/null +++ b/gcp_variant_transforms/libs/resource_estimator.py @@ -0,0 +1,179 @@ +# Copyright 2018 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Helper functions for estimating the resources that a vcf_to_bq will require. + +Currently, the resource estimator only estimates the disk usage that a Dataflow +pipeline will take along with the `MergeVariants` step, since this can cause +expensive pipeline failures late in the run. +""" +import logging + +import apache_beam as beam +from apache_beam import coders +from apache_beam.io.filesystem import CompressionTypes +from apache_beam.io.filesystems import FileSystems + +from gcp_variant_transforms.beam_io import vcfio + + +# TODO(hanjohn): Add unit tests. + +def _convert_variant_snippets_to_bytesize(variant): + # type: (vcfio.Variant -> int) + return coders.registry.get_coder(vcfio.Variant).estimate_size(variant) + + +class SnippetSizeInfo(object): + def __init__(self, + raw_snippet_size, # type: int + encoded_snippet_size, # type: int + ): + # type: (...) -> (None) + self.raw = raw_snippet_size + self.encoded = encoded_snippet_size + + +class FileSizeInfo(object): + def __init__(self, raw_file_size, encoded_file_size=None): + # type: (int, int) -> (None) + self.raw = raw_file_size + self.encoded = encoded_file_size + + def calculateEncodedFileSize(self, snippet_size_info): + # type: (SnippetSizeInfo) -> (None) + """Estimate a VCF file's encoded size based on snippet analysis. + + Given the raw_file_size and measurements of several VCF lines from the file, + estimate how much disk the file will take after expansion due to encoding + lines as `vcfio.Variant` objects. The encoded_snippet_size will be set as + `self.encoded`. + + This is a simple ratio problem, solving for encoded_snippet_size which is + the only unknown: + encoded_snippet_size / raw_snippet_size = encoded_file_size / raw_file_size + """ + if snippet_size_info.raw == 0: + logging.error("VCF file {} reported with 0 well-formed variant lines; " + "its contribution to disk resource usage will be " + "ignored.".format(self.name)) + self.encoded = 0 + self.raw = 0 + else: + self.encoded = (self.raw * snippet_size_info.encoded / + snippet_size_info.raw) + + +def measure_variant_size(element): + # type: (Tuple[str, str, vcfio.Variant]) -> (Tuple[str, SnippetSizeInfo]) + """Measure the lengths of the raw and encoded representations of a Variant. + + Given a PTable mapping file_paths to the raw (bytestring) and vcfio.Variant- + encoded representations of a Variant line, have the output PTable instead map + from the file_paths to a Tuple with the (raw, encoded) representation sizes. + + The file_path keys are not expected to be unique. + """ + file_path, raw_variant, encoded_variant = element + encoded_variant_size = _convert_variant_snippets_to_bytesize(encoded_variant) + raw_variant_size = len(raw_variant) + return file_path, SnippetSizeInfo(raw_variant_size, encoded_variant_size) + + +def estimate_file_encoded_size(element): + # type: (Tuple[str, Dict[str, Object]]) -> (Tuple[str, FileSizeInfo]) + file_name, metrics = element + file_size_info = metrics['whole_file_raw_size'][0] # type: FileSizeInfo + snippet_size_info = metrics['snippet_stats'][0] # type: SnippetSizeInfo + + # Assume that the ratio of encoded size to raw disk size is roughly the same + # throughout the file compared to the first several lines. + file_size_info.calculateEncodedFileSize(snippet_size_info) + + logging.debug("File %s has raw file size %d, raw snippet size %d, encoded " + "size %d. Estimated encoded file size: %d", + file_name, file_size_info.raw, snippet_size_info.raw, + snippet_size_info.encoded, file_size_info.encoded) + return file_name, file_size_info + +def get_file_sizes(input_pattern): + # type: (str) -> (List[FileSizeInfo]) + match_results = FileSystems.match([input_pattern]) + file_sizes = [] + for match in match_results: + for file_metadata in match.metadata_list: + compression_type = CompressionTypes.detect_compression_type( + file_metadata.path) + if compression_type != CompressionTypes.UNCOMPRESSED: + logging.error("VCF file {} is compressed; disk requirement estimator " + "will not be accurate.", + file_metadata.path) + + file_sizes.append((file_metadata.path, + FileSizeInfo(file_metadata.size_in_bytes),)) + return file_sizes + + +class SnippetSizeInfoSumFn(beam.CombineFn): + """Combiner Function to sum up the size fields of SnippetSizeInfos. + + Example: [SnippetSizeInfo(a, b), SnippetSizeInfo(c, d)] -> + SnippetSizeInfo(a+c, b+d) + """ + def create_accumulator(self): + # type: (None) -> (Tuple[int, int]) + return (0, 0) # (raw, encoded) sums + + def add_input(self, sums, input): + # type: (Tuple[int, int], SnippetSizeInfo) -> (Tuple[int, int]) + return sums[0] + input.raw, sums[1] + input.encoded + + def merge_accumulators(self, accumulators): + # type: (Iterable[Tuple[int, int]]) -> (Tuple[int, int]) + first, second = zip(*accumulators) + return sum(first), sum(second) + + def extract_output(self, sums): + # type: (Tuple[int, int]) -> (SnippetSizeInfo) + return SnippetSizeInfo(*sums) + + +class FileSizeInfoSumFn(beam.CombineFn): + """Combiner Function to sum up the size fields of Tuple[str, FileSizeInfo]s. + + Unlike SnippetSizeInfoSumFn, the input is a PTable mapping str to + FileSizeInfo, so the input is a tuple with the FileSizeInfos as the second + field. The output strips out the str key which represents the file path. + + Example: [('/path/a', FileSizeInfo(a, b)), ('/path/b;, FileSizeInfo(c, d))] -> + FileSizeInfo(a+c, b+d) + """ + def create_accumulator(self): + # type: (None) -> (Tuple[int, int]) + return (0, 0) # (raw, encoded) sums + + def add_input(self, raw_encoded, input): + # type: (Tuple[int, int], Tuple[str, FileSizeInfo]) -> (Tuple[int, int]) + raw, encoded = raw_encoded + _, file_size_info = input + return raw + file_size_info.raw, encoded + file_size_info.encoded + + def merge_accumulators(self, accumulators): + # type: (Iterable[Tuple[int, int]]) -> (Tuple[int, int]) + raw, encoded = zip(*accumulators) + return sum(raw), sum(encoded) + + def extract_output(self, raw_encoded): + # type: (Tuple[int, int]) -> (FileSizeInfo) + raw, encoded = raw_encoded + return FileSizeInfo(raw, encoded) diff --git a/gcp_variant_transforms/options/variant_transform_options.py b/gcp_variant_transforms/options/variant_transform_options.py index 5f38f16a5..63c3d9cf8 100644 --- a/gcp_variant_transforms/options/variant_transform_options.py +++ b/gcp_variant_transforms/options/variant_transform_options.py @@ -424,6 +424,13 @@ def add_arguments(self, parser): help=('The full path of the resolved headers. The file will not be' 'generated if unspecified. Otherwise, please provide a local ' 'path if run locally, or a cloud path if run on Dataflow.')) + parser.add_argument( + '--estimate_disk_usage', + type='bool', default=False, nargs='?', const=True, + help=('By default, disk resource usage will not be estimated.' + 'If true, the preprocessor will estimate the maximum disk usage ' + 'consumed at any step in the pipeline, which could lead to ' + 'out-of-disk errors at a shuffle step e.g. MergeVariants.')) class PartitionOptions(VariantTransformsOptions): """Options for partitioning Variant records.""" diff --git a/gcp_variant_transforms/vcf_to_bq_preprocess.py b/gcp_variant_transforms/vcf_to_bq_preprocess.py index 04e489453..6aa5b4c1d 100644 --- a/gcp_variant_transforms/vcf_to_bq_preprocess.py +++ b/gcp_variant_transforms/vcf_to_bq_preprocess.py @@ -56,8 +56,10 @@ from apache_beam.options import pipeline_options from gcp_variant_transforms import vcf_to_bq_common +from gcp_variant_transforms.beam_io import vcf_snippet_io from gcp_variant_transforms.beam_io import vcfio from gcp_variant_transforms.libs import preprocess_reporter +from gcp_variant_transforms.libs import resource_estimator from gcp_variant_transforms.options import variant_transform_options from gcp_variant_transforms.transforms import filter_variants from gcp_variant_transforms.transforms import infer_headers @@ -66,6 +68,8 @@ _COMMAND_LINE_OPTIONS = [variant_transform_options.PreprocessOptions] +# Number of lines from each VCF that should be read when estimating disk usage. +SNIPPET_READ_SIZE = 50 def _get_inferred_headers(variants, # type: pvalue.PCollection merged_header # type: pvalue.PCollection @@ -86,6 +90,36 @@ def _get_inferred_headers(variants, # type: pvalue.PCollection return inferred_headers, merged_header +# TODO(hanjohn): Add an e2e test +def _estimateDiskResources(p, # type: pvalue.PCollection + input_pattern # type: str + ): + # type: (...) -> (pvalue.PCollection) + raw_file_sizes = (p | 'InputFilePattern' >> beam.Create([input_pattern]) + | 'GetRawFileSizes' >> beam.FlatMap( + lambda pattern: resource_estimator.get_file_sizes(pattern))) + # TODO(hanjohn): Add support for `ReadAll` pattern for inputs with very large + # number of files. + vcf_snippet_sizes = (p + | 'GetVcfSnippets' >> vcf_snippet_io.ReadVcfSnippet( + input_pattern, SNIPPET_READ_SIZE) + | 'FindSnippetSizes' >> beam.Map( + resource_estimator.measure_variant_size) + | 'AggregateSnippetSizePerFile' >> beam.CombinePerKey( + resource_estimator.SnippetSizeInfoSumFn())) + + result = ({'whole_file_raw_size': raw_file_sizes, + 'snippet_stats': vcf_snippet_sizes} + | 'ConsolidateFileStats' >> beam.CoGroupByKey() + | 'EstimateEncodedFileSizes' >> beam.Map( + resource_estimator.estimate_file_encoded_size) + | 'SumFileSizeEstimates' >> beam.CombineGlobally( + resource_estimator.FileSizeInfoSumFn())) + result | 'PrintEstimate' >> beam.Map(lambda x: logging.info( + "Final estimate of encoded size: %d GB", x.encoded / 1e9)) + return result + + def run(argv=None): # type: (List[str]) -> (str, str) """Runs preprocess pipeline.""" @@ -101,6 +135,11 @@ def run(argv=None): merged_definitions = (headers | 'MergeDefinitions' >> merge_header_definitions.MergeDefinitions()) + + disk_usage_estimate = None + if known_args.estimate_disk_usage: + disk_usage_estimate = beam.pvalue.AsSingleton( + _estimateDiskResources(p, known_args.input_pattern)) if known_args.report_all_conflicts: variants = p | 'ReadFromVcf' >> vcfio.ReadFromVcf( known_args.input_pattern, allow_malformed_records=True) @@ -111,6 +150,7 @@ def run(argv=None): | 'GenerateConflictsReport' >> beam.ParDo(preprocess_reporter.generate_report, known_args.report_path, + disk_usage_estimate, beam.pvalue.AsSingleton(merged_headers), beam.pvalue.AsSingleton(inferred_headers), beam.pvalue.AsIter(malformed_records))) @@ -119,6 +159,7 @@ def run(argv=None): | 'GenerateConflictsReport' >> beam.ParDo(preprocess_reporter.generate_report, known_args.report_path, + disk_usage_estimate, beam.pvalue.AsSingleton(merged_headers))) if known_args.resolved_headers_path: