Skip to content

Commit

Permalink
Address first iteration of comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
tneymanov committed Jan 14, 2020
1 parent 666fa4a commit 1dc6904
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 56 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
[
{
"test_name": "valid-4-2-gz-pysam",
"table_name": "valid_4_2_gz_pysam",
"input_pattern": "gs://gcp-variant-transforms-testfiles/small_tests/valid-4.2.vcf.gz",
"runner": "DirectRunner",
"vcf_parser": "PYSAM",
"assertion_configs": [
{
"query": ["NUM_ROWS_QUERY"],
"expected_result": {"num_rows": 13}
},
{
"query": ["SUM_START_QUERY"],
"expected_result": {"sum_start": 23031929}
},
{
"query": ["SUM_END_QUERY"],
"expected_result": {"sum_end": 23033052}
}
]
}
]
[
{
"test_name": "valid-4-2-gz-pysam",
"table_name": "valid_4_2_gz_pysam",
"input_pattern": "gs://gcp-variant-transforms-testfiles/small_tests/valid-4.2.vcf.gz",
"runner": "DirectRunner",
"vcf_parser": "PYSAM",
"assertion_configs": [
{
"query": ["NUM_ROWS_QUERY"],
"expected_result": {"num_rows": 13}
},
{
"query": ["SUM_START_QUERY"],
"expected_result": {"sum_start": 23031929}
},
{
"query": ["SUM_END_QUERY"],
"expected_result": {"sum_end": 23033052}
}
]
}
]
14 changes: 10 additions & 4 deletions gcp_variant_transforms/transforms/sample_info_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,16 @@
class ConvertSampleInfoToRow(beam.DoFn):
"""Extracts sample info from `VcfHeader` and converts it to a BigQuery row."""

def process(self, vcf_header, samples_span_multiple_files):

def __init__(self,
samples_span_multiple_files=False, # type: bool
):
self._samples_span_multiple_files = samples_span_multiple_files

def process(self, vcf_header):
# type: (vcf_header_io.VcfHeader, bool) -> Dict[str, Union[int, str]]
for sample in vcf_header.samples:
if samples_span_multiple_files:
if self._samples_span_multiple_files:
sample_id = hashing_util.generate_unsigned_hash_code(
[sample], max_hash_value=pow(2, 63))
else:
Expand All @@ -45,7 +51,7 @@ def process(self, vcf_header, samples_span_multiple_files):
class SampleInfoToBigQuery(beam.PTransform):
"""Writes sample info to BigQuery."""

def __init__(self, output_table_prefix, append=False,
def __init__(self, output_table_prefix, temp_location, append=False,
samples_span_multiple_files=False):
# type: (str, Dict[str, str], bool, bool) -> None
"""Initializes the transform.
Expand All @@ -60,7 +66,7 @@ def __init__(self, output_table_prefix, append=False,
self._output_table = sample_info_table_schema_generator.compose_table_name(
output_table_prefix, sample_info_table_schema_generator.TABLE_SUFFIX)
self._append = append
self.samples_span_multiple_files = samples_span_multiple_files
self._samples_span_multiple_files = samples_span_multiple_files
self._schema = sample_info_table_schema_generator.generate_schema()
self._temp_location = temp_location

Expand Down
15 changes: 10 additions & 5 deletions gcp_variant_transforms/vcf_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ def _run_annotation_pipeline(known_args, pipeline_args):
def _create_sample_info_table(pipeline, # type: beam.Pipeline
pipeline_mode, # type: PipelineModes
known_args, # type: argparse.Namespace,
pipeline_args, # type: List[str]
temp_directory, # str
):
# type: (...) -> None
headers = pipeline_common.read_headers(
Expand All @@ -395,6 +395,7 @@ def _create_sample_info_table(pipeline, # type: beam.Pipeline
_ = (headers | 'SampleInfoToBigQuery' >>
sample_info_to_bigquery.SampleInfoToBigQuery(
known_args.output_table,
temp_directory,
known_args.append,
known_args.samples_span_multiple_files))

Expand All @@ -405,6 +406,8 @@ def run(argv=None):
logging.info('Command: %s', ' '.join(argv or sys.argv))
known_args, pipeline_args = pipeline_common.parse_args(argv,
_COMMAND_LINE_OPTIONS)
if known_args.output_table and '--temp_location' not in pipeline_args:
raise ValueError('--temp_location is required for BigQuery imports.')
if known_args.auto_flags_experiment:
_get_input_dimensions(known_args, pipeline_args)

Expand Down Expand Up @@ -480,8 +483,10 @@ def run(argv=None):
num_partitions = 1

if known_args.output_table:
options = pipeline_options.PipelineOptions(pipeline_args)
google_cloud_options = options.view_as(pipeline_options.GoogleCloudOptions)
temp_directory = pipeline_options.PipelineOptions(pipeline_args).view_as(
pipeline_options.GoogleCloudOptions).temp_location
if not temp_directory:
raise ValueError('--temp_location must be set when writing to BigQuery.')
for i in range(num_partitions):
table_suffix = ''
if partitioner and partitioner.get_partition_name(i):
Expand All @@ -491,7 +496,7 @@ def run(argv=None):
variant_to_bigquery.VariantToBigQuery(
table_name,
header_fields,
google_cloud_options.temp_location,
temp_directory,
variant_merger,
processed_variant_factory,
append=known_args.append,
Expand All @@ -502,7 +507,7 @@ def run(argv=None):
known_args.null_numeric_value_replacement)))
if known_args.generate_sample_info_table:
_create_sample_info_table(
pipeline, pipeline_mode, known_args, pipeline_args)
pipeline, pipeline_mode, known_args, temp_directory)

if known_args.output_avro_path:
# TODO(bashir2): Add an integration test that outputs to Avro files and
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
# Nucleus needs uptodate protocol buffer compiler (protoc).
'protobuf>=3.6.1',
'mmh3<2.6',
'google-cloud-storage',
# Refer to issue #528
'google-cloud-storage<1.23.0',
'pyfarmhash',
'pyyaml'
]
Expand Down

0 comments on commit 1dc6904

Please sign in to comment.