Skip to content

Commit

Permalink
Modify BigQuery write mechanism to use the new WriteToBigQuery PTrans…
Browse files Browse the repository at this point in the history
…form, and remove num_bigquery_write_shards flag.

Previously, issue #199 forced us to use a hack to shard the variants before they are written to BigQuery, which negatively affects the speed of the tool. With the implementation of the new sink, the flag is no longer need.
  • Loading branch information
tneymanov committed Jun 25, 2019
1 parent 8602ea7 commit bdb32b0
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 64 deletions.
29 changes: 4 additions & 25 deletions docs/large_inputs.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ Default settings:
--worker_machine_type <default n1-standard-1> \
--disk_size_gb <default 250> \
--worker_disk_type <default PD> \
--num_bigquery_write_shards <default 1> \
--partition_config_path <default None> \
```

Expand Down Expand Up @@ -98,8 +97,7 @@ transforms (e.g. the sample name is repeated in every record in the BigQuery
output rather than just being specified once as in the VCF header), you
typically need 3 to 4 times the total size of the raw VCF files.

In addition, if [merging](variant_merging.md) or
[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled, you may
In addition, if [merging](variant_merging.md) is enabled, you may
need more disk per worker (e.g. 500GB) as the same variants need to be
aggregated together on one machine.

Expand All @@ -110,32 +108,14 @@ more expensive. However, when choosing a large machine (e.g. `n1-standard-16`),
they can reduce cost as they can avoid idle CPU cycles due to disk IOPS
limitations.

As a result, we recommend using SSDs if [merging](variant_merge.md) or
[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled: these
operations require "shuffling" the data (i.e. redistributing the data among
workers), which require significant disk I/O.
As a result, we recommend using SSDs if [merging](variant_merge.md) is enabled:
these operations require "shuffling" the data (i.e. redistributing the data
among workers), which require significant disk I/O.

Set
`--worker_disk_type compute.googleapis.com/projects//zones//diskTypes/pd-ssd`
to use SSDs.

### `--num_bigquery_write_shards`

Currently, the write operation to BigQuery in Dataflow is performed as a
postprocessing step after the main transforms are done. As a workaround for
BigQuery write limitations (more details
[here](https://github.com/googlegenomics/gcp-variant-transforms/issues/199)),
we have added "sharding" when writing to BigQuery. This makes the data load
to BigQuery significantly faster as it parallelizes the process and enables
loading large (>5TB) data to BigQuery at once.

As a result, we recommend setting `--num_bigquery_write_shards 20` when loading
any data that has more than 1 billion rows (after merging) or 1TB of final
output. You may use a smaller number of write shards (e.g. 5) when using
[partitioned output](#--partition_config_path) as each partition also acts as a
"shard". Note that using a larger value (e.g. 50) can cause BigQuery write to
fail as there is a maximum limit on the number of concurrent writes per table.

### `--partition_config_path`

Partitioning the output can save significant query costs once the data is in
Expand All @@ -146,4 +126,3 @@ partition).
As a result, we recommend setting the partition config for very large data
where possible. Please see the [documentation](partitioning.md) for more
details.

3 changes: 2 additions & 1 deletion gcp_variant_transforms/options/variant_transform_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ def add_arguments(self, parser):
parser.add_argument(
'--num_bigquery_write_shards',
type=int, default=1,
help=('Before writing the final result to output BigQuery, the data is '
help=('Note: This flag is now deprecated and should not be used! '
'Before writing the final result to output BigQuery, the data is '
'sharded to avoid a known failure for very large inputs (issue '
'#199). Setting this flag to 1 will avoid this extra sharding.'
'It is recommended to use 20 for loading large inputs without '
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
"worker_machine_type": "n1-standard-64",
"max_num_workers": "64",
"num_workers": "20",
"num_bigquery_write_shards": "20",
"assertion_configs": [
{
"query": ["NUM_ROWS_QUERY"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
"worker_machine_type": "n1-standard-16",
"max_num_workers": "20",
"num_workers": "20",
"num_bigquery_write_shards": "2",
"assertion_configs": [
{
"query": ["NUM_ROWS_QUERY"],
Expand Down Expand Up @@ -68,4 +67,3 @@
]
}
]

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
"worker_machine_type": "n1-standard-16",
"max_num_workers": "20",
"num_workers": "20",
"num_bigquery_write_shards": "20",
"assertion_configs": [
{
"query": ["NUM_ROWS_QUERY"],
Expand Down
39 changes: 6 additions & 33 deletions gcp_variant_transforms/transforms/variant_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from __future__ import absolute_import

import random
from typing import Dict, List # pylint: disable=unused-import

import apache_beam as beam
Expand All @@ -29,7 +28,6 @@
from gcp_variant_transforms.libs import processed_variant
from gcp_variant_transforms.libs import vcf_field_conflict_resolver
from gcp_variant_transforms.libs.variant_merge import variant_merge_strategy # pylint: disable=unused-import
from gcp_variant_transforms.transforms import limit_write


# TODO(samanvp): remove this hack when BQ custom sink is added to Python SDK,
Expand Down Expand Up @@ -75,7 +73,6 @@ def __init__(
update_schema_on_append=False, # type: bool
allow_incompatible_records=False, # type: bool
omit_empty_sample_calls=False, # type: bool
num_bigquery_write_shards=1, # type: int
null_numeric_value_replacement=None # type: int
):
# type: (...) -> None
Expand All @@ -99,8 +96,6 @@ def __init__(
+ schema if there is a mismatch.
omit_empty_sample_calls: If true, samples that don't have a given call
will be omitted.
num_bigquery_write_shards: If > 1, we will limit number of sources which
are used for writing to the output BigQuery table.
null_numeric_value_replacement: the value to use instead of null for
numeric (float/int/long) lists. For instance, [0, None, 1] will become
[0, `null_numeric_value_replacement`, 1]. If not set, the value will set
Expand All @@ -125,7 +120,6 @@ def __init__(

self._allow_incompatible_records = allow_incompatible_records
self._omit_empty_sample_calls = omit_empty_sample_calls
self._num_bigquery_write_shards = num_bigquery_write_shards
if update_schema_on_append:
bigquery_util.update_bigquery_schema_on_append(self._schema.fields,
self._output_table)
Expand All @@ -136,35 +130,14 @@ def expand(self, pcoll):
self._bigquery_row_generator,
self._allow_incompatible_records,
self._omit_empty_sample_calls))
if self._num_bigquery_write_shards > 1:
# We split data into self._num_bigquery_write_shards random partitions
# and then write each part to final BQ by appending them together.
# Combined with LimitWrite transform, this will avoid the BQ failure.
bq_row_partitions = bq_rows | beam.Partition(
lambda _, n: random.randint(0, n - 1),
self._num_bigquery_write_shards)
bq_writes = []
for i in range(self._num_bigquery_write_shards):
bq_rows = (bq_row_partitions[i] | 'LimitWrite' + str(i) >>
limit_write.LimitWrite(_WRITE_SHARDS_LIMIT))
bq_writes.append(
bq_rows | 'WriteToBigQuery' + str(i) >>
beam.io.Write(beam.io.BigQuerySink(
return (bq_rows
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
self._output_table,
schema=self._schema,
create_disposition=(
beam.io.BigQueryDisposition.CREATE_IF_NEEDED),
write_disposition=(
beam.io.BigQueryDisposition.WRITE_APPEND))))
return bq_writes
else:
return (bq_rows
| 'WriteToBigQuery' >> beam.io.Write(beam.io.BigQuerySink(
self._output_table,
schema=self._schema,
create_disposition=(
beam.io.BigQueryDisposition.CREATE_IF_NEEDED),
write_disposition=(
beam.io.BigQueryDisposition.WRITE_APPEND
if self._append
else beam.io.BigQueryDisposition.WRITE_TRUNCATE))))
beam.io.BigQueryDisposition.WRITE_APPEND
if self._append
else beam.io.BigQueryDisposition.WRITE_TRUNCATE),
method=beam.io.WriteToBigQuery.Method.STREAMING_INSERTS))
1 change: 0 additions & 1 deletion gcp_variant_transforms/vcf_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,6 @@ def run(argv=None):
update_schema_on_append=known_args.update_schema_on_append,
allow_incompatible_records=known_args.allow_incompatible_records,
omit_empty_sample_calls=known_args.omit_empty_sample_calls,
num_bigquery_write_shards=known_args.num_bigquery_write_shards,
null_numeric_value_replacement=(
known_args.null_numeric_value_replacement)))

Expand Down

0 comments on commit bdb32b0

Please sign in to comment.