-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a PoC disk_estimator feature to the vcf_to_bq preprocessor. #335
base: master
Are you sure you want to change the base?
Conversation
892ddd7
to
08b8772
Compare
Thank you so much for adding the estimator, John! This is so exciting! It will be very useful when processing large dataset! Before I do the review, can you make sure the unit tests pass? You can run all unit tests by BTW, please refer Issue 67 in the PR comment (just add something like |
08b8772
to
0415861
Compare
Pull Request Test Coverage Report for Build 1703
💛 - Coveralls |
Hi Allie, sorry about that, done; thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks John for updating the code! I have made some comments randomly. Then I realized the pipeline process in vcf_to_bq_preprocess
is a little hard to read. It might be easier to group those transforms into one:
- read file.
- inside one new PTransform, for each file, you can get the raw file size, calculate the snippet size/encoded size, and ultimately the estimated size.
- Finally combine globally.
If you agree with this structure, we may need to refactor some code in resourse_estimator
. :)
from apache_beam.io.iobase import Read | ||
from apache_beam.transforms import PTransform | ||
|
||
from gcp_variant_transforms.beam_io import vcf_parser, vcfio |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: import only one module per line.
@@ -77,6 +80,7 @@ class _HeaderLine(object): | |||
def generate_report( | |||
header_definitions, # type: merge_header_definitions.VcfHeaderDefinitions | |||
file_path, # type: str | |||
disk_usage_estimate, # type: int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also add disk_usage_estimate
in the Args
.
|
||
|
||
class FileSizeInfo(object): | ||
def __init__(self, raw_file_size, encoded_file_size=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need encoded_file_size
in the argument?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Being able to set this in the constructor makes more straightforward for FileSizeInfoSumFn IMO
class FileSizeInfo(object): | ||
def __init__(self, raw_file_size, encoded_file_size=None): | ||
# type: (int, int) -> (None) | ||
self.raw = raw_file_size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/self.raw/self._raw_size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed to self.raw_size for now; since this is consumed by FileSizeInfoSumFn, though, I think this should remain public, WDYT?
def __init__(self, raw_file_size, encoded_file_size=None): | ||
# type: (int, int) -> (None) | ||
self.raw = raw_file_size | ||
self.encoded = encoded_file_size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/self.encoded/self._encoded_size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
5f8d066
to
770bdc2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much John for updating this. It looks great! I added a few comments.
p | ||
| 'InputFilePattern' >> beam.Create([input_pattern]) | ||
| 'ReadFileSizeAndSampleVariants' >> vcf_file_size_io.EstimateVcfSize( | ||
input_pattern, _SNIPPET_READ_SIZE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will you consider taking a flag for _SNIPPET_READ_SIZE
in the future? If not, I think it is fine to define _SNIPPET_READ_SIZE
in vcf_file_size_io
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm I could imagine changing this eventually to a flag; I added a TODO to indicate that in the code. Would you prefer the change to be in the same commit though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think it's better to move it to vcf_file_size_io since general consensus is that we want to reduce the amount of flags if possible - I don't quite envision when clients would be fidgeting with this number. Seems like we should instead figure out an optimal amount (which 50 seems to be, especially given that difference between reading 1 line or 50 lines is negligible) and set it by default for all use cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sg, thanks! done
"""A source for estimating the encoded size of a VCF file in `vcf_to_bq`. | ||
|
||
This source first reads a limited number of variants from a set of VCF files, | ||
then |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider rephrasing here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah sorry, done
if count >= self._sample_size: | ||
break | ||
if not isinstance(encoded_record, vcfio.Variant): | ||
logging.error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it for malformed variants?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I understand is that we have two iterators here, one for the encoded variant, one for raw record. While we skip some encoded variants here, should be skip the raw record as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct; I was concerned about ill-formed lines but didn't want to crash the estimator, so I just decided to try skipping these (though if an enormous proportion of lines are malformed, then this will tend to make the tool overestimate size). I don't have a very specific idea of how often/why variants tend to be malformed though, and I'm am open to ideas here, especially around the condition that I'm checking for
Oh this is a bug, I was intending to skip both raw and encoded; sorry about that, I'm rewriting this to use itertools which should be less errorprone
"`vcfio.Variant` in file %s: %s", file_name, raw_record) | ||
continue | ||
|
||
raw_size += len(raw_record) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the raw size is the length of the raw_record?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment to explain this line: In the case of ASCII strings, the bytesize of the full str object (with raw_size.__sizeof__()
) adds 20+ bytes of overhead which isn't representative of the size on disk for that line; the length of the string represents the size for ASCII strings though
Not sure if unicode happens in vcf files though, but now I changed it to be more robust in that case
class FileSizeInfo(object): | ||
def __init__(self, name, raw_file_size, encoded_file_size=None): | ||
# type: (str, int, int) -> None | ||
self.name = name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you use the name anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use this in the generate_report
module as well; WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry, I just cannot find where you are using it - in generate_report you call _append_disk_usage_estimate_to_report which seems to only be getting raw size and encoded size, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry this was my fault; I either cited some earlier revision incorrectly or misspoke/misread entirely
The self.name
is only consumed when there's some issue reading the file in estimate_encoded_file_size
so that a more descriptive message can be logged:
logging.warning("File %s appears to have no valid Variant lines. File "
"will be ignored for size estimation.", self.name)
We found a Contributor License Agreement for you (the sender of this pull request), but were unable to find agreements for all the commit author(s) or Co-authors. If you authored these, maybe you used a different email address in the git commits than was used to sign the CLA (login here to double check)? If these were authored by someone else, then they will need to sign a CLA as well, and confirm that they're okay with these being contributed to Google. |
Let me know how this looks; if it lg then I can squash these commits and rebase with the newest changes to master, and thanks again! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you John. It looks great!! Please rebase with the master and I will take a final look. Thanks!
"will not be accurate.", file_metadata.path) | ||
file_sizes.append((file_metadata.path, file_metadata.size_in_bytes,)) | ||
return file_sizes | ||
match_result = filesystems.FileSystems.match([file_name])[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it file name? If it is, do you still need the pattern matching?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also fetches the file metadata from the filesystem e.g. the file size; I tweaked the code a little so hopefully it's more readable now
The pipeline uses raw file size and raw+encoded sizes of a short snippet at beginning of VCF files to estimate the encoded size for a commit. The major blocking bug is that when the snippets are being read from VCFs in an encoded format, lines are being read more than once.
Rebased and fixed with the final fixes; thanks so much for your patience! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi John,
My name is Tural, I'm a relatively recent addition to the BioMed team and I'll be assisting Allie with the review of this PR.
Apologies for the late reply - now that I've better familiarized myself with the task at hand, I should be faster with my future reviews.
I've added several comments for you to look at.
Thanks
self.raw_size = raw_file_size | ||
self.encoded_size = encoded_file_size # Optional, useful for SumFn. | ||
|
||
def estimate_encoded_file_size(self, raw_sample_size, encoded_sample_size): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we are populating self.encoded_size, lets use the same name here (ie by dropping 'file_') or vice versa.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
This is a simple ratio problem, solving for encoded_sample_size which is | ||
the only unknown: | ||
encoded_sample_size / raw_sample_size = encoded_file_size / raw_file_size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although I also like to be verbose, general approach on GVT codebase is to add less descriptive texts.
This seems a bit too much. Since this method doesn't really use Variants, how about something like:
"""Given the sizes for the raw file, sample raw lines and sample encoded lines, estimate encoded file size."""
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sg, thanks! done
FileSizeInfo, so the input is a tuple with the FileSizeInfos as the second | ||
field. The output strips out the str key which represents the file path. | ||
|
||
Example: [FileSizeInfo(a, b), FileSizeInfo(c, d)] -> FileSizeInfo(a+c, b+d) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, it seems that everything past first line in unnecessary. If you really want to include info about how the summing is done, how about
"""Combiner function, used to vector sum FileSizeInfo objects.""" or something of that sort?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sg, done
class FileSizeInfo(object): | ||
def __init__(self, name, raw_file_size, encoded_file_size=None): | ||
# type: (str, int, int) -> None | ||
self.name = name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry, I just cannot find where you are using it - in generate_report you call _append_disk_usage_estimate_to_report which seems to only be getting raw size and encoded size, no?
self.raw_size = raw_file_size | ||
self.encoded_size = encoded_file_size # Optional, useful for SumFn. | ||
|
||
def estimate_encoded_file_size(self, raw_sample_size, encoded_sample_size): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we seem to be missing the types for the args.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment might be from an earlier revision; there are type annotations at line 63 for this method, are you referring to something else?
# type: (str, int, int) -> None | ||
self.name = name | ||
self.raw_size = raw_file_size | ||
self.encoded_size = encoded_file_size # Optional, useful for SumFn. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd argue that the names for these variables should be same - either both raw_size/encoded_size or both raw_file_size/encoded_file_size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
p | ||
| 'InputFilePattern' >> beam.Create([input_pattern]) | ||
| 'ReadFileSizeAndSampleVariants' >> vcf_file_size_io.EstimateVcfSize( | ||
input_pattern, _SNIPPET_READ_SIZE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think it's better to move it to vcf_file_size_io since general consensus is that we want to reduce the amount of flags if possible - I don't quite envision when clients would be fidgeting with this number. Seems like we should instead figure out an optimal amount (which 50 seems to be, especially given that difference between reading 1 line or 50 lines is negligible) and set it by default for all use cases.
return file_metadata.size_in_bytes | ||
|
||
|
||
def _convert_variants_to_bytesize(variant): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a separate method for this? Seems to only be used in 1 place and is a one-liner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| 'InputFilePattern' >> beam.Create([input_pattern]) | ||
| 'ReadFileSizeAndSampleVariants' >> vcf_file_size_io.EstimateVcfSize( | ||
input_pattern, _SNIPPET_READ_SIZE) | ||
| 'SumFileSizeEstimates' >> beam.CombineGlobally( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about moving this combine into EstimateVcfSize PTransform? Would we ever call EstimateVcfSize without summing the results?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sg; also inlined the rest of the code directly in run() since it's just a single pipe now
Thanks for the comments! I had some trouble with developing locally on Mac but it seems like it works again after some package update Travis says it's failing but I can reproduce it in essentially a clean client by creating a new PR; it seems like there's something wrong with the cbuild atm |
The disk_estimator pipeline uses, for each input VCF file, the 1) raw file size and the 2) raw and 3) encoded sizes of the first 50 Variants/lines to estimate the disk space that would be required for a shuffle step in the pipeline. This estimate is useful because a pipeline that is run with insufficient disk and
MergeVariants
enabled as a step in a pipeline can fail hours/days after the Dataflow pipeline is kicked off, while the customer is still billed for the compute.The estimated disk size is emitted in the preprocessor report file as well as the
PrintEstimate
step of the pipeline.At least three significant things are missing in this pull request, which is why the feature is currently disabled by default when running:
ReadAllFromVcf
that can handle reading from a large set (> tens of thousands) of input files.The disk estimator is triggered by adding the
--estimate_disk_usage
flag to the vcf_to_bq_preprocess pipeline invocation e.g.python -m gcp_variant_transforms.vcf_to_bq_preprocess --input_pattern gs://genomics-public-data/1000-genomes/vcf/*.vcf --estimate_disk_usage --report_path report.log
.Output in preprocessor report for 1000-genomes VCFs (invocation above):
"Estimated disk usage by Dataflow: 4847 GB. The total raw file sizes summed up to 1231 GB."
Issue #67