Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use new BigQuery sink and remove num_bigquery_write_shards flag usage. #499

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 4 additions & 25 deletions docs/large_inputs.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ Default settings:
--worker_machine_type <default n1-standard-1> \
--disk_size_gb <default 250> \
--worker_disk_type <default PD> \
--num_bigquery_write_shards <default 1> \
--partition_config_path <default None> \
```

Expand Down Expand Up @@ -98,8 +97,7 @@ transforms (e.g. the sample name is repeated in every record in the BigQuery
output rather than just being specified once as in the VCF header), you
typically need 3 to 4 times the total size of the raw VCF files.

In addition, if [merging](variant_merging.md) or
[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled, you may
In addition, if [merging](variant_merging.md) is enabled, you may
need more disk per worker (e.g. 500GB) as the same variants need to be
aggregated together on one machine.

Expand All @@ -110,32 +108,14 @@ more expensive. However, when choosing a large machine (e.g. `n1-standard-16`),
they can reduce cost as they can avoid idle CPU cycles due to disk IOPS
limitations.

As a result, we recommend using SSDs if [merging](variant_merge.md) or
[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled: these
operations require "shuffling" the data (i.e. redistributing the data among
workers), which require significant disk I/O.
As a result, we recommend using SSDs if [merging](variant_merge.md) is enabled:
this operation requires "shuffling" the data (i.e. redistributing the data
among workers), which requires significant disk I/O.

Set
`--worker_disk_type compute.googleapis.com/projects//zones//diskTypes/pd-ssd`
to use SSDs.

### `--num_bigquery_write_shards`

Currently, the write operation to BigQuery in Dataflow is performed as a
postprocessing step after the main transforms are done. As a workaround for
BigQuery write limitations (more details
[here](https://github.com/googlegenomics/gcp-variant-transforms/issues/199)),
we have added "sharding" when writing to BigQuery. This makes the data load
to BigQuery significantly faster as it parallelizes the process and enables
loading large (>5TB) data to BigQuery at once.

As a result, we recommend setting `--num_bigquery_write_shards 20` when loading
any data that has more than 1 billion rows (after merging) or 1TB of final
output. You may use a smaller number of write shards (e.g. 5) when using
[sharded output](#--sharding_config_path) as each partition also acts as a
"shard". Note that using a larger value (e.g. 50) can cause BigQuery write to
fail as there is a maximum limit on the number of concurrent writes per table.

### `--sharding_config_path`

Sharding the output can save significant query costs once the data is in
Expand All @@ -146,4 +126,3 @@ partition).
As a result, we recommend setting the partition config for very large data
where possible. Please see the [documentation](sharding.md) for more
details.

8 changes: 2 additions & 6 deletions gcp_variant_transforms/options/variant_transform_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,8 @@ def add_arguments(self, parser):
parser.add_argument(
'--num_bigquery_write_shards',
type=int, default=1,
help=('Before writing the final result to output BigQuery, the data is '
'sharded to avoid a known failure for very large inputs (issue '
'#199). Setting this flag to 1 will avoid this extra sharding.'
'It is recommended to use 20 for loading large inputs without '
'merging. Use a smaller value (2 or 3) if both merging and '
'optimize_for_large_inputs are enabled.'))
help=('This flag is deprecated and will be removed in future '
'releases.'))
parser.add_argument(
'--null_numeric_value_replacement',
type=int,
Expand Down
19 changes: 16 additions & 3 deletions gcp_variant_transforms/pipeline_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,15 @@ def parse_args(argv, command_line_options):
known_args, pipeline_args = parser.parse_known_args(argv)
for transform_options in options:
transform_options.validate(known_args)
_raise_error_on_invalid_flags(pipeline_args)
_raise_error_on_invalid_flags(
pipeline_args,
known_args.output_table if hasattr(known_args, 'output_table') else None)
if hasattr(known_args, 'input_pattern') or hasattr(known_args, 'input_file'):
known_args.all_patterns = _get_all_patterns(
known_args.input_pattern, known_args.input_file)

# Enable new BQ sink experiment.
pipeline_args += ['--experiment', 'use_beam_bq_sink']
return known_args, pipeline_args


Expand Down Expand Up @@ -301,8 +306,8 @@ def write_headers(merged_header, file_path):
vcf_header_io.WriteVcfHeaders(file_path))


def _raise_error_on_invalid_flags(pipeline_args):
# type: (List[str]) -> None
def _raise_error_on_invalid_flags(pipeline_args, output_table):
# type: (List[str], Any) -> None
"""Raises an error if there are unrecognized flags."""
parser = argparse.ArgumentParser()
for cls in pipeline_options.PipelineOptions.__subclasses__():
Expand All @@ -315,6 +320,14 @@ def _raise_error_on_invalid_flags(pipeline_args):
not known_pipeline_args.setup_file):
raise ValueError('The --setup_file flag is required for DataflowRunner. '
'Please provide a path to the setup.py file.')
if output_table:
if (not hasattr(known_pipeline_args, 'temp_location') or
not known_pipeline_args.temp_location):
raise ValueError('--temp_location is required for BigQuery imports.')
if not known_pipeline_args.temp_location.startswith('gs://'):
raise ValueError(
'--temp_location must be valid GCS location for BigQuery imports')



def is_pipeline_direct_runner(pipeline):
Expand Down
18 changes: 14 additions & 4 deletions gcp_variant_transforms/pipeline_common_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,21 +94,31 @@ def test_fail_on_invalid_flags(self):
'gcp-variant-transforms-test',
'--staging_location',
'gs://integration_test_runs/staging']
pipeline_common._raise_error_on_invalid_flags(pipeline_args)
pipeline_common._raise_error_on_invalid_flags(pipeline_args, None)

# Add Dataflow runner (requires --setup_file).
pipeline_args.extend(['--runner', 'DataflowRunner'])
with self.assertRaisesRegexp(ValueError, 'setup_file'):
pipeline_common._raise_error_on_invalid_flags(pipeline_args)
pipeline_common._raise_error_on_invalid_flags(pipeline_args, None)

# Add setup.py (required for Variant Transforms run). This is now valid.
pipeline_args.extend(['--setup_file', 'setup.py'])
pipeline_common._raise_error_on_invalid_flags(pipeline_args)
pipeline_common._raise_error_on_invalid_flags(pipeline_args, None)

with self.assertRaisesRegexp(ValueError, '--temp_location is required*'):
pipeline_common._raise_error_on_invalid_flags(pipeline_args, 'output')

pipeline_args.extend(['--temp_location', 'wrong_gcs'])
with self.assertRaisesRegexp(ValueError, '--temp_location must be valid*'):
pipeline_common._raise_error_on_invalid_flags(pipeline_args, 'output')

pipeline_args = pipeline_args[:-1] + ['gs://valid_bucket/temp']
pipeline_common._raise_error_on_invalid_flags(pipeline_args, 'output')

# Add an unknown flag.
pipeline_args.extend(['--unknown_flag', 'somevalue'])
with self.assertRaisesRegexp(ValueError, 'Unrecognized.*unknown_flag'):
pipeline_common._raise_error_on_invalid_flags(pipeline_args)
pipeline_common._raise_error_on_invalid_flags(pipeline_args, 'output')

def test_get_compression_type(self):
vcf_metadata_list = [filesystem.FileMetadata(path, size) for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
"worker_machine_type": "n1-standard-64",
"max_num_workers": "64",
"num_workers": "20",
"num_bigquery_write_shards": "2",
"assertion_configs": [
{
"query": ["NUM_ROWS_QUERY"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
"worker_machine_type": "n1-standard-16",
"max_num_workers": "20",
"num_workers": "20",
"num_bigquery_write_shards": "2",
"assertion_configs": [
{
"query": ["NUM_ROWS_QUERY"],
Expand Down Expand Up @@ -69,4 +68,3 @@
]
}
]

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
"worker_machine_type": "n1-standard-16",
"max_num_workers": "20",
"num_workers": "20",
"num_bigquery_write_shards": "20",
"assertion_configs": [
{
"query": ["NUM_ROWS_QUERY"],
Expand Down

This file was deleted.

57 changes: 0 additions & 57 deletions gcp_variant_transforms/transforms/limit_write.py

This file was deleted.

76 changes: 0 additions & 76 deletions gcp_variant_transforms/transforms/limit_write_test.py

This file was deleted.

10 changes: 4 additions & 6 deletions gcp_variant_transforms/transforms/sample_info_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,11 @@ def __init__(self, output_table_prefix, sample_name_encoding, append=False):
def expand(self, pcoll):
return (pcoll
| 'ConvertSampleInfoToBigQueryTableRow' >> beam.ParDo(
ConvertSampleInfoToRow(self._sample_name_encoding))
| 'WriteSampleInfoToBigQuery' >> beam.io.Write(beam.io.BigQuerySink(
ConvertSampleInfoToRow(self.sample_name_encoding))
| 'WriteSampleInfoToBigQuery' >> beam.io.WriteToBigQuery(
self._output_table,
schema=self._schema,
create_disposition=(
beam.io.BigQueryDisposition.CREATE_IF_NEEDED),
write_disposition=(
beam.io.BigQueryDisposition.WRITE_APPEND
if self._append
else beam.io.BigQueryDisposition.WRITE_TRUNCATE))))
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
method=beam.io.WriteToBigQuery.Method.FILE_LOADS))
Loading