Skip to content

Commit

Permalink
Use new BigQuery sink.
Browse files Browse the repository at this point in the history
  • Loading branch information
tneymanov committed Jan 13, 2020
1 parent c5e1e0f commit 482151b
Show file tree
Hide file tree
Showing 11 changed files with 30 additions and 79 deletions.
3 changes: 0 additions & 3 deletions cloudbuild_CI.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ steps:
- '--skip_build'
- '--project ${PROJECT_ID}'
- '--image_tag ${COMMIT_SHA}'
- '--run_unit_tests'
- '--run_preprocessor_tests'
- '--run_bq_to_vcf_tests'
- '--run_all_tests'
- '--test_name_prefix cloud-ci-'
id: 'test-gcp-variant-transforms-docker'
Expand Down
29 changes: 4 additions & 25 deletions docs/large_inputs.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ Default settings:
--worker_machine_type <default n1-standard-1> \
--disk_size_gb <default 250> \
--worker_disk_type <default PD> \
--num_bigquery_write_shards <default 1> \
--partition_config_path <default None> \
```

Expand Down Expand Up @@ -98,8 +97,7 @@ transforms (e.g. the sample name is repeated in every record in the BigQuery
output rather than just being specified once as in the VCF header), you
typically need 3 to 4 times the total size of the raw VCF files.

In addition, if [merging](variant_merging.md) or
[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled, you may
In addition, if [merging](variant_merging.md) is enabled, you may
need more disk per worker (e.g. 500GB) as the same variants need to be
aggregated together on one machine.

Expand All @@ -110,32 +108,14 @@ more expensive. However, when choosing a large machine (e.g. `n1-standard-16`),
they can reduce cost as they can avoid idle CPU cycles due to disk IOPS
limitations.

As a result, we recommend using SSDs if [merging](variant_merge.md) or
[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled: these
operations require "shuffling" the data (i.e. redistributing the data among
workers), which require significant disk I/O.
As a result, we recommend using SSDs if [merging](variant_merge.md) is enabled:
this operation requires "shuffling" the data (i.e. redistributing the data
among workers), which requires significant disk I/O.

Set
`--worker_disk_type compute.googleapis.com/projects//zones//diskTypes/pd-ssd`
to use SSDs.

### `--num_bigquery_write_shards`

Currently, the write operation to BigQuery in Dataflow is performed as a
postprocessing step after the main transforms are done. As a workaround for
BigQuery write limitations (more details
[here](https://github.com/googlegenomics/gcp-variant-transforms/issues/199)),
we have added "sharding" when writing to BigQuery. This makes the data load
to BigQuery significantly faster as it parallelizes the process and enables
loading large (>5TB) data to BigQuery at once.

As a result, we recommend setting `--num_bigquery_write_shards 20` when loading
any data that has more than 1 billion rows (after merging) or 1TB of final
output. You may use a smaller number of write shards (e.g. 5) when using
[partitioned output](#--partition_config_path) as each partition also acts as a
"shard". Note that using a larger value (e.g. 50) can cause BigQuery write to
fail as there is a maximum limit on the number of concurrent writes per table.

### `--partition_config_path`

Partitioning the output can save significant query costs once the data is in
Expand All @@ -146,4 +126,3 @@ partition).
As a result, we recommend setting the partition config for very large data
where possible. Please see the [documentation](partitioning.md) for more
details.

7 changes: 1 addition & 6 deletions gcp_variant_transforms/options/variant_transform_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,7 @@ def add_arguments(self, parser):
parser.add_argument(
'--num_bigquery_write_shards',
type=int, default=1,
help=('Before writing the final result to output BigQuery, the data is '
'sharded to avoid a known failure for very large inputs (issue '
'#199). Setting this flag to 1 will avoid this extra sharding.'
'It is recommended to use 20 for loading large inputs without '
'merging. Use a smaller value (2 or 3) if both merging and '
'optimize_for_large_inputs are enabled.'))
help=('This flag is deprecated and may be removed in future releases.'))
parser.add_argument(
'--null_numeric_value_replacement',
type=int,
Expand Down
3 changes: 3 additions & 0 deletions gcp_variant_transforms/pipeline_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ def parse_args(argv, command_line_options):
if hasattr(known_args, 'input_pattern') or hasattr(known_args, 'input_file'):
known_args.all_patterns = _get_all_patterns(
known_args.input_pattern, known_args.input_file)

# Enable new BQ sink experiment.
pipeline_args += ['--experiment', 'use_beam_bq_sink']
return known_args, pipeline_args


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
"worker_machine_type": "n1-standard-64",
"max_num_workers": "64",
"num_workers": "20",
"num_bigquery_write_shards": "20",
"assertion_configs": [
{
"query": ["NUM_ROWS_QUERY"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
"worker_machine_type": "n1-standard-16",
"max_num_workers": "20",
"num_workers": "20",
"num_bigquery_write_shards": "2",
"assertion_configs": [
{
"query": ["NUM_ROWS_QUERY"],
Expand Down Expand Up @@ -68,4 +67,3 @@
]
}
]

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
"worker_machine_type": "n1-standard-16",
"max_num_workers": "20",
"num_workers": "20",
"num_bigquery_write_shards": "20",
"assertion_configs": [
{
"query": ["NUM_ROWS_QUERY"],
Expand Down
4 changes: 3 additions & 1 deletion gcp_variant_transforms/transforms/sample_info_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(self, output_table_prefix, append=False,
self._append = append
self.samples_span_multiple_files = samples_span_multiple_files
self._schema = sample_info_table_schema_generator.generate_schema()
self._temp_location = temp_location

def expand(self, pcoll):
return (pcoll
Expand All @@ -76,4 +77,5 @@ def expand(self, pcoll):
beam.io.BigQueryDisposition.WRITE_APPEND
if self._append
else beam.io.BigQueryDisposition.WRITE_TRUNCATE),
method=beam.io.WriteToBigQuery.Method.FILE_LOADS))
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
custom_gcs_temp_location=self._temp_location))
45 changes: 11 additions & 34 deletions gcp_variant_transforms/transforms/variant_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from __future__ import absolute_import

import random
from typing import Dict, List # pylint: disable=unused-import

import apache_beam as beam
Expand All @@ -29,7 +28,6 @@
from gcp_variant_transforms.libs import processed_variant
from gcp_variant_transforms.libs import vcf_field_conflict_resolver
from gcp_variant_transforms.libs.variant_merge import variant_merge_strategy # pylint: disable=unused-import
from gcp_variant_transforms.transforms import limit_write


# TODO(samanvp): remove this hack when BQ custom sink is added to Python SDK,
Expand Down Expand Up @@ -67,6 +65,7 @@ def __init__(
self,
output_table, # type: str
header_fields, # type: vcf_header_io.VcfHeader
temp_location, # type: str
variant_merger=None, # type: variant_merge_strategy.VariantMergeStrategy
proc_var_factory=None, # type: processed_variant.ProcessedVariantFactory
# TODO(bashir2): proc_var_factory is a required argument and if `None` is
Expand All @@ -75,8 +74,8 @@ def __init__(
update_schema_on_append=False, # type: bool
allow_incompatible_records=False, # type: bool
omit_empty_sample_calls=False, # type: bool
num_bigquery_write_shards=1, # type: int
null_numeric_value_replacement=None # type: int
null_numeric_value_replacement=None # type: int,

):
# type: (...) -> None
"""Initializes the transform.
Expand All @@ -99,15 +98,14 @@ def __init__(
+ schema if there is a mismatch.
omit_empty_sample_calls: If true, samples that don't have a given call
will be omitted.
num_bigquery_write_shards: If > 1, we will limit number of sources which
are used for writing to the output BigQuery table.
null_numeric_value_replacement: the value to use instead of null for
numeric (float/int/long) lists. For instance, [0, None, 1] will become
[0, `null_numeric_value_replacement`, 1]. If not set, the value will set
to bigquery_util._DEFAULT_NULL_NUMERIC_VALUE_REPLACEMENT.
"""
self._output_table = output_table
self._header_fields = header_fields
self._temp_location = temp_location
self._variant_merger = variant_merger
self._proc_var_factory = proc_var_factory
self._append = append
Expand All @@ -125,7 +123,6 @@ def __init__(

self._allow_incompatible_records = allow_incompatible_records
self._omit_empty_sample_calls = omit_empty_sample_calls
self._num_bigquery_write_shards = num_bigquery_write_shards
if update_schema_on_append:
bigquery_util.update_bigquery_schema_on_append(self._schema.fields,
self._output_table)
Expand All @@ -136,35 +133,15 @@ def expand(self, pcoll):
self._bigquery_row_generator,
self._allow_incompatible_records,
self._omit_empty_sample_calls))
if self._num_bigquery_write_shards > 1:
# We split data into self._num_bigquery_write_shards random partitions
# and then write each part to final BQ by appending them together.
# Combined with LimitWrite transform, this will avoid the BQ failure.
bq_row_partitions = bq_rows | beam.Partition(
lambda _, n: random.randint(0, n - 1),
self._num_bigquery_write_shards)
bq_writes = []
for i in range(self._num_bigquery_write_shards):
bq_rows = (bq_row_partitions[i] | 'LimitWrite' + str(i) >>
limit_write.LimitWrite(_WRITE_SHARDS_LIMIT))
bq_writes.append(
bq_rows | 'WriteToBigQuery' + str(i) >>
beam.io.Write(beam.io.BigQuerySink(
return (bq_rows
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
self._output_table,
schema=self._schema,
create_disposition=(
beam.io.BigQueryDisposition.CREATE_IF_NEEDED),
write_disposition=(
beam.io.BigQueryDisposition.WRITE_APPEND))))
return bq_writes
else:
return (bq_rows
| 'WriteToBigQuery' >> beam.io.Write(beam.io.BigQuerySink(
self._output_table,
schema=self._schema,
create_disposition=(
beam.io.BigQueryDisposition.CREATE_IF_NEEDED),
write_disposition=(
beam.io.BigQueryDisposition.WRITE_APPEND
if self._append
else beam.io.BigQueryDisposition.WRITE_TRUNCATE))))
beam.io.BigQueryDisposition.WRITE_APPEND
if self._append
else beam.io.BigQueryDisposition.WRITE_TRUNCATE),
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
custom_gcs_temp_location=self._temp_location))
11 changes: 7 additions & 4 deletions gcp_variant_transforms/vcf_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,8 @@ def _run_annotation_pipeline(known_args, pipeline_args):

def _create_sample_info_table(pipeline, # type: beam.Pipeline
pipeline_mode, # type: PipelineModes
known_args, # type: argparse.Namespace
known_args, # type: argparse.Namespace,
pipeline_args, # type: List[str]
):
# type: (...) -> None
headers = pipeline_common.read_headers(
Expand All @@ -404,7 +405,6 @@ def run(argv=None):
logging.info('Command: %s', ' '.join(argv or sys.argv))
known_args, pipeline_args = pipeline_common.parse_args(argv,
_COMMAND_LINE_OPTIONS)

if known_args.auto_flags_experiment:
_get_input_dimensions(known_args, pipeline_args)

Expand Down Expand Up @@ -480,6 +480,8 @@ def run(argv=None):
num_partitions = 1

if known_args.output_table:
options = pipeline_options.PipelineOptions(pipeline_args)
google_cloud_options = options.view_as(pipeline_options.GoogleCloudOptions)
for i in range(num_partitions):
table_suffix = ''
if partitioner and partitioner.get_partition_name(i):
Expand All @@ -489,17 +491,18 @@ def run(argv=None):
variant_to_bigquery.VariantToBigQuery(
table_name,
header_fields,
google_cloud_options.temp_location,
variant_merger,
processed_variant_factory,
append=known_args.append,
update_schema_on_append=known_args.update_schema_on_append,
allow_incompatible_records=known_args.allow_incompatible_records,
omit_empty_sample_calls=known_args.omit_empty_sample_calls,
num_bigquery_write_shards=known_args.num_bigquery_write_shards,
null_numeric_value_replacement=(
known_args.null_numeric_value_replacement)))
if known_args.generate_sample_info_table:
_create_sample_info_table(pipeline, pipeline_mode, known_args)
_create_sample_info_table(
pipeline, pipeline_mode, known_args, pipeline_args)

if known_args.output_avro_path:
# TODO(bashir2): Add an integration test that outputs to Avro files and
Expand Down
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@
# Nucleus needs uptodate protocol buffer compiler (protoc).
'protobuf>=3.6.1',
'mmh3<2.6',
# Refer to issue #528
'google-cloud-storage<1.23.0',
'google-cloud-storage',
'pyfarmhash',
'pyyaml'
]
Expand Down

0 comments on commit 482151b

Please sign in to comment.