diff --git a/ingest/README.md b/ingest/README.md new file mode 100644 index 00000000..973c3458 --- /dev/null +++ b/ingest/README.md @@ -0,0 +1,96 @@ +# nextstrain.org/dengue/ingest + +This is the ingest pipeline for dengue virus sequences. + +## Software requirements + +Follow the [standard installation instructions](https://docs.nextstrain.org/en/latest/install.html) for Nextstrain's suite of software tools. + +## Usage + +> NOTE: All command examples assume you are within the `ingest` directory. +> If running commands from the outer `dengue` directory, please replace the `.` with `ingest` + +Fetch sequences with + +```sh +nextstrain build . data/sequences.ndjson +``` + +Run the complete ingest pipeline with + +```sh +nextstrain build . +``` + +This will produce two files (within the `ingest` directory): + +- `results/metadata.tsv` +- `results/sequences.fasta` + +Run the complete ingest pipeline and upload results to AWS S3 with + +```sh +nextstrain build . --configfiles config/config.yaml config/optional.yaml +``` + +### Adding new sequences not from GenBank + +#### Static Files + +Do the following to include sequences from static FASTA files. + +1. Convert the FASTA files to NDJSON files with: + + ```sh + ./ingest/bin/fasta-to-ndjson \ + --fasta {path-to-fasta-file} \ + --fields {fasta-header-field-names} \ + --separator {field-separator-in-header} \ + --exclude {fields-to-exclude-in-output} \ + > ingest/data/{file-name}.ndjson + ``` + +2. Add the following to the `.gitignore` to allow the file to be included in the repo: + + ```gitignore + !ingest/data/{file-name}.ndjson + ``` + +3. Add the `file-name` (without the `.ndjson` extension) as a source to `ingest/config/config.yaml`. This will tell the ingest pipeline to concatenate the records to the GenBank sequences and run them through the same transform pipeline. + +## Configuration + +Configuration takes place in `config/config.yaml` by default. +Optional configs for uploading files and Slack notifications are in `config/optional.yaml`. + +### Environment Variables + +The complete ingest pipeline with AWS S3 uploads and Slack notifications uses the following environment variables: + +#### Required + +- `AWS_ACCESS_KEY_ID` +- `AWS_SECRET_ACCESS_KEY` +- `SLACK_TOKEN` +- `SLACK_CHANNELS` + +#### Optional + +These are optional environment variables used in our automated pipeline for providing detailed Slack notifications. + +- `GITHUB_RUN_ID` - provided via [`github.run_id` in a GitHub Action workflow](https://docs.github.com/en/actions/learn-github-actions/contexts#github-context) +- `AWS_BATCH_JOB_ID` - provided via [AWS Batch Job environment variables](https://docs.aws.amazon.com/batch/latest/userguide/job_env_vars.html) + +## Input data + +### GenBank data + +GenBank sequences and metadata are fetched via [NCBI datasets](https://www.ncbi.nlm.nih.gov/datasets/docs/v2/download-and-install/). + +## `ingest/vendored` + +This repository uses [`git subrepo`](https://github.com/ingydotnet/git-subrepo) to manage copies of ingest scripts in [ingest/vendored](./vendored), from [nextstrain/ingest](https://github.com/nextstrain/ingest). + +See [vendored/README.md](vendored/README.md#vendoring) for instructions on how to update +the vendored scripts. diff --git a/ingest/Snakefile b/ingest/Snakefile new file mode 100644 index 00000000..bfc99b0d --- /dev/null +++ b/ingest/Snakefile @@ -0,0 +1,74 @@ +from snakemake.utils import min_version + +min_version( + "7.7.0" +) # Snakemake 7.7.0 introduced `retries` directive used in fetch-sequences + +if not config: + + configfile: "config/config.yaml" + + +send_slack_notifications = config.get("send_slack_notifications", False) + + +def _get_all_targets(wildcards): + # Default targets are the metadata TSV and sequences FASTA files + all_targets = ["results/sequences.fasta", "results/metadata.tsv"] + + # Add additional targets based on upload config + upload_config = config.get("upload", {}) + + for target, params in upload_config.items(): + files_to_upload = params.get("files_to_upload", {}) + + if not params.get("dst"): + print( + f"Skipping file upload for {target!r} because the destination was not defined." + ) + else: + all_targets.extend( + expand( + [f"data/upload/{target}/{{remote_file_name}}.done"], + zip, + remote_file_name=files_to_upload.keys(), + ) + ) + + # Add additional targets for Nextstrain's internal Slack notifications + if send_slack_notifications: + all_targets.extend( + [ + "data/notify/genbank-record-change.done", + "data/notify/metadata-diff.done", + ] + ) + + if config.get("trigger_rebuild", False): + all_targets.append("data/trigger/rebuild.done") + + return all_targets + + +rule all: + input: + _get_all_targets, + + +include: "workflow/snakemake_rules/fetch_sequences.smk" +include: "workflow/snakemake_rules/transform.smk" + + +if config.get("upload", False): + + include: "workflow/snakemake_rules/upload.smk" + + +if send_slack_notifications: + + include: "workflow/snakemake_rules/slack_notifications.smk" + + +if config.get("trigger_rebuild", False): + + include: "workflow/snakemake_rules/trigger_rebuild.smk" diff --git a/ingest/bin/fasta-to-ndjson b/ingest/bin/fasta-to-ndjson new file mode 100755 index 00000000..1ee9f8f6 --- /dev/null +++ b/ingest/bin/fasta-to-ndjson @@ -0,0 +1,86 @@ +#!/usr/bin/env python3 +""" +Parse delimited fields from FASTA header into NDJSON format to stdout. +The output NDJSON records are guaranteed to have at least two fields: + 1. strain + 2. sequence + +Uses the `augur.io.read_sequences` function to read the FASTA file, +so `augur` must be installed in the environment running the script. +""" + +import argparse +import json +import sys + +from augur.io import read_sequences + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument("--fasta", required=True, + help="FASTA file to be transformed into NDJSON format") + parser.add_argument("--fields", nargs="+", + help="Fields in the FASTA header, listed in the same order as the header. " + + "These will be used as the keys in the final NDJSON output. " + + "One of the fields must be 'strain'. " + + "These cannot include the field 'sequence' as this field is reserved for the genomic sequence.") + parser.add_argument("--separator", default='|', + help="Field separator in the FASTA header") + parser.add_argument("--exclude", nargs="*", + help="List of fields to exclude from final NDJSON record. " + "These cannot include 'strain' or 'sequence'.") + + args = parser.parse_args() + + fasta_fields = [field.lower() for field in args.fields] + + exclude_fields = [] + if args.exclude: + exclude_fields = [field.lower() for field in args.exclude] + + passed_checks = True + + if 'strain' not in fasta_fields: + print("ERROR: FASTA fields must include a 'strain' field.", file=sys.stderr) + passed_checks = False + + if 'sequence' in fasta_fields: + print("ERROR: FASTA fields cannot include a 'sequence' field.", file=sys.stderr) + passed_checks = False + + if 'strain' in exclude_fields: + print("ERROR: The field 'strain' cannot be excluded from the output.", file=sys.stderr) + passed_checks = False + + if 'sequence' in exclude_fields: + print("ERROR: The field 'sequence' cannot be excluded from the output.", file=sys.stderr) + passed_checks = False + + missing_fields = [field for field in exclude_fields if field not in fasta_fields] + if missing_fields: + print(f"ERROR: The following exclude fields do not match any FASTA fields: {missing_fields}", file=sys.stderr) + passed_checks = False + + if not passed_checks: + print("ERROR: Failed to parse FASTA file into NDJSON records.","See detailed errors above.", file=sys.stderr) + sys.exit(1) + + sequences = read_sequences(args.fasta) + + for sequence in sequences: + field_values = [ + value.strip() + for value in sequence.description.split(args.separator) + ] + record = dict(zip(fasta_fields, field_values)) + record['sequence'] = str(sequence.seq).upper() + + for field in exclude_fields: + del record[field] + + json.dump(record, sys.stdout, allow_nan=False, indent=None, separators=',:') + print() diff --git a/ingest/bin/ndjson-to-tsv-and-fasta b/ingest/bin/ndjson-to-tsv-and-fasta new file mode 100755 index 00000000..d9d7331d --- /dev/null +++ b/ingest/bin/ndjson-to-tsv-and-fasta @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 +""" +Parses NDJSON records from stdin to two different files: a metadata TSV and a +sequences FASTA. + +Records that do not have an ID or sequence will be excluded from the output files. +""" +import argparse +import csv +import json +from sys import stderr, stdin + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument("--metadata", metavar="TSV", default="data/metadata.tsv", + help="The output metadata TSV file") + parser.add_argument("--fasta", metavar="FASTA", default="data/sequences.fasta", + help="The output sequences FASTA file") + parser.add_argument("--metadata-columns", nargs="+", + help="List of fields from the NDJSON records to include as columns in the metadata TSV. " + + "Metadata TSV columns will be in the order of the columns provided.") + parser.add_argument("--id-field", default='strain', + help="Field from the records to use as the sequence ID in the FASTA file.") + parser.add_argument("--sequence-field", default='sequence', + help="Field from the record that holds the genomic sequence for the FASTA file.") + + args = parser.parse_args() + + with open(args.metadata, 'wt') as metadata_output: + with open(args.fasta, 'wt') as fasta_output: + metadata_csv = csv.DictWriter( + metadata_output, + args.metadata_columns, + restval="", + extrasaction='ignore', + delimiter='\t', + lineterminator='\n', + ) + metadata_csv.writeheader() + + for index, record in enumerate(stdin): + record = json.loads(record) + + sequence_id = str(record.get(args.id_field, '')) + sequence = str(record.get(args.sequence_field, '')) + + if not sequence_id: + print( + f"WARNING: Record number {index} does not have a sequence ID.", + "This record will be excluded from the output files.", + file=stderr + ) + elif not sequence: + print( + f"WARNING: Record number {index} does not have a sequence.", + "This record will be excluded from the output files.", + file=stderr + ) + else: + metadata_csv.writerow(record) + + print(f">{sequence_id}", file=fasta_output) + print(f"{sequence}" , file= fasta_output) diff --git a/ingest/config/config.yaml b/ingest/config/config.yaml new file mode 100644 index 00000000..231a5bb2 --- /dev/null +++ b/ingest/config/config.yaml @@ -0,0 +1,103 @@ +# Sources of sequences to include in the ingest run +sources: ['genbank'] +# Pathogen NCBI Taxonomy ID +ncbi_taxon_id: '12637' +# The list of NCBI Datasets fields to include from NCBI Datasets output +# These need to be the mneumonics of the NCBI Datasets fields, see docs for full list of fields +# https://www.ncbi.nlm.nih.gov/datasets/docs/v2/reference-docs/command-line/dataformat/tsv/dataformat_tsv_virus-genome/#fields +# Note: the "accession" field MUST be provided to match with the sequences +ncbi_datasets_fields: + - accession + - sourcedb + - isolate-lineage + - geo-region + - geo-location + - isolate-collection-date + - release-date + - update-date + - length + - host-name + - isolate-lineage-source + - submitter-names + - submitter-affiliation + +# Params for the transform rule +transform: + # NCBI Fields to rename to Nextstrain field names. + # This is the first step in the pipeline, so any references to field names + # in the configs below should use the new field names + field_map: [ + 'accession=genbank_accession', + 'accession-rev=genbank_accession_rev', + 'isolate-lineage=strain', + 'sourcedb=database', # necessary for applying geo location rules + 'geo-region=region', + 'geo-location=location', + 'host-name=host', + 'isolate-collection-date=date', + 'release-date=release_date', + 'update-date=update_date', + 'sra-accs=sra_accessions', + 'submitter-names=authors', + 'submitter-affiliation=institution', + ] + # Standardized strain name regex + # Currently accepts any characters because we do not have a clear standard for strain names + strain_regex: '^.+$' + # Back up strain name field if 'strain' doesn't match regex above + strain_backup_fields: ['genbank_accession'] + # List of date fields to standardize + date_fields: ['date', 'release_date', 'update_date'] + # Expected date formats present in date fields + # These date formats should use directives expected by datetime + # See https://docs.python.org/3.9/library/datetime.html#strftime-and-strptime-format-codes + expected_date_formats: ['%Y', '%Y-%m', '%Y-%m-%d', '%Y-%m-%dT%H:%M:%SZ'] + # Titlecase rules + titlecase: + # Abbreviations not cast to titlecase, keeps uppercase + abbreviations: ['USA'] + # Articles that should not be cast to titlecase + articles: [ + 'and', 'd', 'de', 'del', 'des', 'di', 'do', 'en', 'l', 'la', 'las', 'le', + 'los', 'nad', 'of', 'op', 'sur', 'the', 'y' + ] + # List of string fields to titlecase + fields: ['region', 'country', 'division', 'location'] + # Authors field name + authors_field: 'authors' + # Authors default value if authors value is empty + authors_default_value: '?' + # Field name for the generated abbreviated authors + abbr_authors_field: 'abbr_authors' + # General geolocation rules to apply to geolocation fields + geolocation_rules_url: 'https://raw.githubusercontent.com/nextstrain/ncov-ingest/master/source-data/gisaid_geoLocationRules.tsv' + # Local geolocation rules that are only applicable to mpox data + # Local rules can overwrite the general geolocation rules provided above + local_geolocation_rules: 'source-data/geolocation-rules.tsv' + # User annotations file + annotations: 'source-data/annotations.tsv' + # ID field used to merge annotations + annotations_id: 'genbank_accession' + # Field to use as the sequence ID in the FASTA file + id_field: 'genbank_accession' + # Field to use as the sequence in the FASTA file + sequence_field: 'sequence' + # Final output columns for the metadata TSV + metadata_columns: [ + 'strain', + 'genbank_accession', + 'genbank_accession_rev', + 'date', + 'region', + 'country', + 'division', + 'location', + 'length', + 'host', + 'release_date', + 'update_date', + 'sra_accessions', + 'abbr_authors', + 'authors', + 'institution' + ] diff --git a/ingest/config/optional.yaml b/ingest/config/optional.yaml new file mode 100644 index 00000000..727206ef --- /dev/null +++ b/ingest/config/optional.yaml @@ -0,0 +1,25 @@ +# Optional configs used by Nextstrain team +# Params for uploads +upload: + # Upload params for AWS S3 + s3: + # AWS S3 Bucket with prefix + dst: 's3://nextstrain-data/files/workflows/dengue' + # Mapping of files to upload, with key as remote file name and the value + # the local file path relative to the ingest directory. + files_to_upload: + genbank.ndjson.xz: data/genbank.ndjson + all_sequences.ndjson.xz: data/sequences.ndjson + metadata.tsv.gz: results/metadata.tsv + sequences.fasta.xz: results/sequences.fasta + alignment.fasta.xz: data/alignment.fasta + insertions.csv.gz: data/insertions.csv + translations.zip: data/translations.zip + + cloudfront_domain: 'data.nextstrain.org' + +# Toggle for Slack notifications +send_slack_notifications: True + +# Toggle for triggering builds +trigger_rebuild: True diff --git a/ingest/profiles/default/config.yaml b/ingest/profiles/default/config.yaml new file mode 100644 index 00000000..c69390b8 --- /dev/null +++ b/ingest/profiles/default/config.yaml @@ -0,0 +1,4 @@ +cores: all +rerun-incomplete: true +printshellcmds: true +reason: true diff --git a/ingest/source-data/annotations.tsv b/ingest/source-data/annotations.tsv new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/ingest/source-data/annotations.tsv @@ -0,0 +1 @@ + diff --git a/ingest/source-data/geolocation-rules.tsv b/ingest/source-data/geolocation-rules.tsv new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/ingest/source-data/geolocation-rules.tsv @@ -0,0 +1 @@ + diff --git a/ingest/vendored/.cramrc b/ingest/vendored/.cramrc new file mode 100644 index 00000000..153d20f5 --- /dev/null +++ b/ingest/vendored/.cramrc @@ -0,0 +1,3 @@ +[cram] +shell = /bin/bash +indent = 2 diff --git a/ingest/vendored/.github/pull_request_template.md b/ingest/vendored/.github/pull_request_template.md new file mode 100644 index 00000000..ed4a5b27 --- /dev/null +++ b/ingest/vendored/.github/pull_request_template.md @@ -0,0 +1,16 @@ +### Description of proposed changes + + + +### Related issue(s) + + + +### Checklist + + + +- [ ] Checks pass +- [ ] If adding a script, add an entry for it in the README. + + diff --git a/ingest/vendored/.github/workflows/ci.yaml b/ingest/vendored/.github/workflows/ci.yaml new file mode 100644 index 00000000..c6a218a5 --- /dev/null +++ b/ingest/vendored/.github/workflows/ci.yaml @@ -0,0 +1,23 @@ +name: CI + +on: + push: + branches: + - main + pull_request: + workflow_dispatch: + +jobs: + shellcheck: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: nextstrain/.github/actions/shellcheck@master + + cram: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + - run: pip install cram + - run: cram tests/ \ No newline at end of file diff --git a/ingest/vendored/.gitrepo b/ingest/vendored/.gitrepo new file mode 100644 index 00000000..3904b333 --- /dev/null +++ b/ingest/vendored/.gitrepo @@ -0,0 +1,12 @@ +; DO NOT EDIT (unless you know what you are doing) +; +; This subdirectory is a git "subrepo", and this file is maintained by the +; git-subrepo command. See https://github.com/ingydotnet/git-subrepo#readme +; +[subrepo] + remote = https://github.com/nextstrain/ingest + branch = main + commit = a0faef53a0c6e7cc4057209454ef0852875dc3a9 + parent = 11b67b1f837b53f25755db097e8bcceebb64e4f8 + method = merge + cmdver = 0.4.6 diff --git a/ingest/vendored/.shellcheckrc b/ingest/vendored/.shellcheckrc new file mode 100644 index 00000000..ebed438c --- /dev/null +++ b/ingest/vendored/.shellcheckrc @@ -0,0 +1,6 @@ +# Use of this file requires Shellcheck v0.7.0 or newer. +# +# SC2064 - We intentionally want variables to expand immediately within traps +# so the trap can not fail due to variable interpolation later. +# +disable=SC2064 diff --git a/ingest/vendored/README.md b/ingest/vendored/README.md new file mode 100644 index 00000000..0ad83f4c --- /dev/null +++ b/ingest/vendored/README.md @@ -0,0 +1,140 @@ +# ingest + +Shared internal tooling for pathogen data ingest. Used by our individual +pathogen repos which produce Nextstrain builds. Expected to be vendored by +each pathogen repo using `git subtree`. + +Some tools may only live here temporarily before finding a permanent home in +`augur curate` or Nextstrain CLI. Others may happily live out their days here. + +## Vendoring + +Nextstrain maintained pathogen repos will use [`git subrepo`](https://github.com/ingydotnet/git-subrepo) to vendor ingest scripts. +(See discussion on this decision in https://github.com/nextstrain/ingest/issues/3) + +For a list of Nextstrain repos that are currently using this method, use [this +GitHub code search](https://github.com/search?type=code&q=org%3Anextstrain+subrepo+%22remote+%3D+https%3A%2F%2Fgithub.com%2Fnextstrain%2Fingest%22). + +If you don't already have `git subrepo` installed, follow the [git subrepo installation instructions](https://github.com/ingydotnet/git-subrepo#installation). +Then add the latest ingest scripts to the pathogen repo by running: + +``` +git subrepo clone https://github.com/nextstrain/ingest ingest/vendored +``` + +Any future updates of ingest scripts can be pulled in with: + +``` +git subrepo pull ingest/vendored +``` + +If you run into merge conflicts and would like to pull in a fresh copy of the +latest ingest scripts, pull with the `--force` flag: + +``` +git subrepo pull ingest/vendored --force +``` + +> **Warning** +> Beware of rebasing/dropping the parent commit of a `git subrepo` update + +`git subrepo` relies on metadata in the `ingest/vendored/.gitrepo` file, +which includes the hash for the parent commit in the pathogen repos. +If this hash no longer exists in the commit history, there will be errors when +running future `git subrepo pull` commands. + +If you run into an error similar to the following: +``` +$ git subrepo pull ingest/vendored +git-subrepo: Command failed: 'git branch subrepo/ingest/vendored '. +fatal: not a valid object name: '' +``` +Check the parent commit hash in the `ingest/vendored/.gitrepo` file and make +sure the commit exists in the commit history. Update to the appropriate parent +commit hash if needed. + +## History + +Much of this tooling originated in +[ncov-ingest](https://github.com/nextstrain/ncov-ingest) and was passaged thru +[mpox's ingest/](https://github.com/nextstrain/mpox/tree/@/ingest/). It +subsequently proliferated from [mpox][] to other pathogen repos ([rsv][], +[zika][], [dengue][], [hepatitisB][], [forecasts-ncov][]) primarily thru +copying. To [counter that +proliferation](https://bedfordlab.slack.com/archives/C7SDVPBLZ/p1688577879947079), +this repo was made. + +[mpox]: https://github.com/nextstrain/mpox +[rsv]: https://github.com/nextstrain/rsv +[zika]: https://github.com/nextstrain/zika/pull/24 +[dengue]: https://github.com/nextstrain/dengue/pull/10 +[hepatitisB]: https://github.com/nextstrain/hepatitisB +[forecasts-ncov]: https://github.com/nextstrain/forecasts-ncov + +## Elsewhere + +The creation of this repo, in both the abstract and concrete, and the general +approach to "ingest" has been discussed in various internal places, including: + +- https://github.com/nextstrain/private/issues/59 +- @joverlee521's [workflows document](https://docs.google.com/document/d/1rLWPvEuj0Ayc8MR0O1lfRJZfj9av53xU38f20g8nU_E/edit#heading=h.4g0d3mjvb89i) +- [5 July 2023 Slack thread](https://bedfordlab.slack.com/archives/C7SDVPBLZ/p1688577879947079) +- [6 July 2023 team meeting](https://docs.google.com/document/d/1FPfx-ON5RdqL2wyvODhkrCcjgOVX3nlXgBwCPhIEsco/edit) +- _…many others_ + +## Scripts + +Scripts for supporting ingest workflow automation that don’t really belong in any of our existing tools. + +- [notify-on-diff](notify-on-diff) - Send Slack message with diff of a local file and an S3 object +- [notify-on-job-fail](notify-on-job-fail) - Send Slack message with details about failed workflow job on GitHub Actions and/or AWS Batch +- [notify-on-job-start](notify-on-job-start) - Send Slack message with details about workflow job on GitHub Actions and/or AWS Batch +- [notify-on-record-change](notify-on-recod-change) - Send Slack message with details about line count changes for a file compared to an S3 object's metadata `recordcount`. + If the S3 object's metadata does not have `recordcount`, then will attempt to download S3 object to count lines locally, which only supports `xz` compressed S3 objects. +- [notify-slack](notify-slack) - Send message or file to Slack +- [s3-object-exists](s3-object-exists) - Used to prevent 404 errors during S3 file comparisons in the notify-* scripts +- [trigger](trigger) - Triggers downstream GitHub Actions via the GitHub API using repository_dispatch events. +- [trigger-on-new-data](trigger-on-new-data) - Triggers downstream GitHub Actions if the provided `upload-to-s3` outputs do not contain the `identical_file_message` + A hacky way to ensure that we only trigger downstream phylogenetic builds if the S3 objects have been updated. + +NCBI interaction scripts that are useful for fetching public metadata and sequences. + +- [fetch-from-ncbi-entrez](fetch-from-ncbi-entrez) - Fetch metadata and nucleotide sequences from [NCBI Entrez](https://www.ncbi.nlm.nih.gov/books/NBK25501/) and output to a GenBank file. + Useful for pathogens with metadata and annotations in custom fields that are not part of the standard [NCBI Datasets](https://www.ncbi.nlm.nih.gov/datasets/) outputs. + +Historically, some pathogen repos used the undocumented NCBI Virus API through [fetch-from-ncbi-virus](https://github.com/nextstrain/ingest/blob/c97df238518171c2b1574bec0349a55855d1e7a7/fetch-from-ncbi-virus) to fetch data. However we've opted to drop the NCBI Virus scripts due to https://github.com/nextstrain/ingest/issues/18. + +Potential Nextstrain CLI scripts + +- [sha256sum](sha256sum) - Used to check if files are identical in upload-to-s3 and download-from-s3 scripts. +- [cloudfront-invalidate](cloudfront-invalidate) - CloudFront invalidation is already supported in the [nextstrain remote command for S3 files](https://github.com/nextstrain/cli/blob/a5dda9c0579ece7acbd8e2c32a4bbe95df7c0bce/nextstrain/cli/remote/s3.py#L104). + This exists as a separate script to support CloudFront invalidation when using the upload-to-s3 script. +- [upload-to-s3](upload-to-s3) - Upload file to AWS S3 bucket with compression based on file extension in S3 URL. + Skips upload if the local file's hash is identical to the S3 object's metadata `sha256sum`. + Adds the following user defined metadata to uploaded S3 object: + - `sha256sum` - hash of the file generated by [sha256sum](sha256sum) + - `recordcount` - the line count of the file +- [download-from-s3](download-from-s3) - Download file from AWS S3 bucket with decompression based on file extension in S3 URL. + Skips download if the local file already exists and has a hash identical to the S3 object's metadata `sha256sum`. + +Potential augur curate scripts + +- [apply-geolocation-rules](apply-geolocation-rules) - Applies user curated geolocation rules to NDJSON records +- [merge-user-metadata](merge-user-metadata) - Merges user annotations with NDJSON records +- [transform-authors](transform-authors) - Abbreviates full author lists to ' et al.' +- [transform-field-names](transform-field-names) - Rename fields of NDJSON records +- [transform-genbank-location](transform-genbank-location) - Parses `location` field with the expected pattern `"[:][, ]"` based on [GenBank's country field](https://www.ncbi.nlm.nih.gov/genbank/collab/country/) +- [transform-strain-names](transform-strain-names) - Ordered search for strain names across several fields. + +## Software requirements + +Some scripts may require Bash ≥4. If you are running these scripts on macOS, the builtin Bash (`/bin/bash`) does not meet this requirement. You can install [Homebrew's Bash](https://formulae.brew.sh/formula/bash) which is more up to date. + +## Testing + +Most scripts are untested within this repo, relying on "testing in production". That is the only practical testing option for some scripts such as the ones interacting with S3 and Slack. + +For more locally testable scripts, Cram-style functional tests live in `tests` and are run as part of CI. To run these locally, + +1. Download Cram: `pip install cram` +2. Run the tests: `cram tests/` diff --git a/ingest/vendored/apply-geolocation-rules b/ingest/vendored/apply-geolocation-rules new file mode 100755 index 00000000..776cf16a --- /dev/null +++ b/ingest/vendored/apply-geolocation-rules @@ -0,0 +1,234 @@ +#!/usr/bin/env python3 +""" +Applies user curated geolocation rules to the geolocation fields in the NDJSON +records from stdin. The modified records are output to stdout. This does not do +any additional transformations on top of the user curations. +""" +import argparse +import json +from collections import defaultdict +from sys import exit, stderr, stdin, stdout + + +class CyclicGeolocationRulesError(Exception): + pass + + +def load_geolocation_rules(geolocation_rules_file): + """ + Loads the geolocation rules from the provided *geolocation_rules_file*. + Returns the rules as a dict: + { + regions: { + countries: { + divisions: { + locations: corrected_geolocations_tuple + } + } + } + } + """ + geolocation_rules = defaultdict(lambda: defaultdict(lambda: defaultdict(dict))) + with open(geolocation_rules_file, 'r') as rules_fh: + for line in rules_fh: + # ignore comments + if line.strip()=="" or line.lstrip()[0] == '#': + continue + + row = line.strip('\n').split('\t') + # Skip lines that cannot be split into raw and annotated geolocations + if len(row) != 2: + print( + f"WARNING: Could not decode geolocation rule {line!r}.", + "Please make sure rules are formatted as", + "'region/country/division/locationregion/country/division/location'.", + file=stderr) + continue + + # remove trailing comments + row[-1] = row[-1].partition('#')[0].rstrip() + raw , annot = tuple( row[0].split('/') ) , tuple( row[1].split('/') ) + + # Skip lines where raw or annotated geolocations cannot be split into 4 fields + if len(raw) != 4: + print( + f"WARNING: Could not decode the raw geolocation {row[0]!r}.", + "Please make sure it is formatted as 'region/country/division/location'.", + file=stderr + ) + continue + + if len(annot) != 4: + print( + f"WARNING: Could not decode the annotated geolocation {row[1]!r}.", + "Please make sure it is formatted as 'region/country/division/location'.", + file=stderr + ) + continue + + + geolocation_rules[raw[0]][raw[1]][raw[2]][raw[3]] = annot + + return geolocation_rules + + +def get_annotated_geolocation(geolocation_rules, raw_geolocation, rule_traversal = None): + """ + Gets the annotated geolocation for the *raw_geolocation* in the provided + *geolocation_rules*. + + Recursively traverses the *geolocation_rules* until we get the annotated + geolocation, which must be a Tuple. Returns `None` if there are no + applicable rules for the provided *raw_geolocation*. + + Rules are applied in the order of region, country, division, location. + First checks the provided raw values for geolocation fields, then if there + are not matches, tries to use general rules marked with '*'. + """ + # Always instantiate the rule traversal as an empty list if not provided, + # e.g. the first call of this recursive function + if rule_traversal is None: + rule_traversal = [] + + current_rules = geolocation_rules + # Traverse the geolocation rules based using the rule_traversal values + for field_value in rule_traversal: + current_rules = current_rules.get(field_value) + # If we hit `None`, then we know there are no matching rules, so stop the rule traversal + if current_rules is None: + break + + # We've found the tuple of the annotated geolocation + if isinstance(current_rules, tuple): + return current_rules + + # We've reach the next level of geolocation rules, + # so try to traverse the rules with the next target in raw_geolocation + if isinstance(current_rules, dict): + next_traversal_target = raw_geolocation[len(rule_traversal)] + rule_traversal.append(next_traversal_target) + return get_annotated_geolocation(geolocation_rules, raw_geolocation, rule_traversal) + + # We did not find any matching rule for the last traversal target + if current_rules is None: + # If we've used all general rules and we still haven't found a match, + # then there are no applicable rules for this geolocation + if all(value == '*' for value in rule_traversal): + return None + + # If we failed to find matching rule with a general rule as the last + # traversal target, then delete all trailing '*'s to reset rule_traversal + # to end with the last index that is currently NOT a '*' + # [A, *, B, *] => [A, *, B] + # [A, B, *, *] => [A, B] + # [A, *, *, *] => [A] + if rule_traversal[-1] == '*': + # Find the index of the first of the consecutive '*' from the + # end of the rule_traversal + # [A, *, B, *] => first_consecutive_general_rule_index = 3 + # [A, B, *, *] => first_consecutive_general_rule_index = 2 + # [A, *, *, *] => first_consecutive_general_rule_index = 1 + for index, field_value in reversed(list(enumerate(rule_traversal))): + if field_value == '*': + first_consecutive_general_rule_index = index + else: + break + + rule_traversal = rule_traversal[:first_consecutive_general_rule_index] + + # Set the final value to '*' in hopes that by moving to a general rule, + # we can find a matching rule. + rule_traversal[-1] = '*' + + return get_annotated_geolocation(geolocation_rules, raw_geolocation, rule_traversal) + + +def transform_geolocations(geolocation_rules, geolocation): + """ + Transform the provided *geolocation* by looking it up in the provided + *geolocation_rules*. + + This will use all rules that apply to the geolocation and rules will + be applied in the order of region, country, division, location. + + Returns the original geolocation if no geolocation rules apply. + + Raises a `CyclicGeolocationRulesError` if more than 1000 rules have + been applied to the raw geolocation. + """ + transformed_values = geolocation + rules_applied = 0 + continue_to_apply = True + + while continue_to_apply: + annotated_values = get_annotated_geolocation(geolocation_rules, transformed_values) + + # Stop applying rules if no annotated values were found + if annotated_values is None: + continue_to_apply = False + else: + rules_applied += 1 + + if rules_applied > 1000: + raise CyclicGeolocationRulesError( + "ERROR: More than 1000 geolocation rules applied on the same entry {geolocation!r}." + ) + + # Create a new list of values for comparison to previous values + new_values = list(transformed_values) + for index, value in enumerate(annotated_values): + # Keep original value if annotated value is '*' + if value != '*': + new_values[index] = value + + # Stop applying rules if this rule did not change the values, + # since this means we've reach rules with '*' that no longer change values + if new_values == transformed_values: + continue_to_apply = False + + transformed_values = new_values + + return transformed_values + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument("--region-field", default="region", + help="Field that contains regions in NDJSON records.") + parser.add_argument("--country-field", default="country", + help="Field that contains countries in NDJSON records.") + parser.add_argument("--division-field", default="division", + help="Field that contains divisions in NDJSON records.") + parser.add_argument("--location-field", default="location", + help="Field that contains location in NDJSON records.") + parser.add_argument("--geolocation-rules", metavar="TSV", required=True, + help="TSV file of geolocation rules with the format: " + + "'' where the raw and annotated geolocations " + + "are formatted as '///'. " + + "If creating a general rule, then the raw field value can be substituted with '*'." + + "Lines starting with '#' will be ignored as comments." + + "Trailing '#' will be ignored as comments.") + + args = parser.parse_args() + + location_fields = [args.region_field, args.country_field, args.division_field, args.location_field] + + geolocation_rules = load_geolocation_rules(args.geolocation_rules) + + for record in stdin: + record = json.loads(record) + + try: + annotated_values = transform_geolocations(geolocation_rules, [record.get(field, '') for field in location_fields]) + except CyclicGeolocationRulesError as e: + print(e, file=stderr) + exit(1) + + for index, field in enumerate(location_fields): + record[field] = annotated_values[index] + + json.dump(record, stdout, allow_nan=False, indent=None, separators=',:') + print() diff --git a/ingest/vendored/cloudfront-invalidate b/ingest/vendored/cloudfront-invalidate new file mode 100755 index 00000000..dbea3981 --- /dev/null +++ b/ingest/vendored/cloudfront-invalidate @@ -0,0 +1,42 @@ +#!/usr/bin/env bash +# Originally from @tsibley's gist: https://gist.github.com/tsibley/a66262d341dedbea39b02f27e2837ea8 +set -euo pipefail + +main() { + local domain="$1" + shift + local paths=("$@") + local distribution invalidation + + echo "-> Finding CloudFront distribution" + distribution=$( + aws cloudfront list-distributions \ + --query "DistributionList.Items[?contains(Aliases.Items, \`$domain\`)] | [0].Id" \ + --output text + ) + + if [[ -z $distribution || $distribution == None ]]; then + exec >&2 + echo "Unable to find CloudFront distribution id for $domain" + echo + echo "Are your AWS CLI credentials for the right account?" + exit 1 + fi + + echo "-> Creating CloudFront invalidation for distribution $distribution" + invalidation=$( + aws cloudfront create-invalidation \ + --distribution-id "$distribution" \ + --paths "${paths[@]}" \ + --query Invalidation.Id \ + --output text + ) + + echo "-> Waiting for CloudFront invalidation $invalidation to complete" + echo " Ctrl-C to stop waiting." + aws cloudfront wait invalidation-completed \ + --distribution-id "$distribution" \ + --id "$invalidation" +} + +main "$@" diff --git a/ingest/vendored/download-from-s3 b/ingest/vendored/download-from-s3 new file mode 100755 index 00000000..49811863 --- /dev/null +++ b/ingest/vendored/download-from-s3 @@ -0,0 +1,48 @@ +#!/usr/bin/env bash +set -euo pipefail + +bin="$(dirname "$0")" + +main() { + local src="${1:?A source s3:// URL is required as the first argument.}" + local dst="${2:?A destination file path is required as the second argument.}" + # How many lines to subsample to. 0 means no subsampling. Optional. + # It is not advised to use this for actual subsampling! This is intended to be + # used for debugging workflows with large datasets such as ncov-ingest as + # described in https://github.com/nextstrain/ncov-ingest/pull/367 + + # Uses `tsv-sample` to subsample, so it will not work as expected with files + # that have a single record split across multiple lines (i.e. FASTA sequences) + local n="${3:-0}" + + local s3path="${src#s3://}" + local bucket="${s3path%%/*}" + local key="${s3path#*/}" + + local src_hash dst_hash no_hash=0000000000000000000000000000000000000000000000000000000000000000 + dst_hash="$("$bin/sha256sum" < "$dst" || true)" + src_hash="$(aws s3api head-object --bucket "$bucket" --key "$key" --query Metadata.sha256sum --output text 2>/dev/null || echo "$no_hash")" + + echo "[ INFO] Downloading $src → $dst" + if [[ $src_hash != "$dst_hash" ]]; then + aws s3 cp --no-progress "$src" - | + if [[ "$src" == *.gz ]]; then + gunzip -cfq + elif [[ "$src" == *.xz ]]; then + xz -T0 -dcq + elif [[ "$src" == *.zst ]]; then + zstd -T0 -dcq + else + cat + fi | + if [[ "$n" -gt 0 ]]; then + tsv-sample -H -i -n "$n" + else + cat + fi >"$dst" + else + echo "[ INFO] Files are identical, skipping download" + fi +} + +main "$@" diff --git a/ingest/vendored/fetch-from-ncbi-entrez b/ingest/vendored/fetch-from-ncbi-entrez new file mode 100755 index 00000000..194a0c81 --- /dev/null +++ b/ingest/vendored/fetch-from-ncbi-entrez @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 +""" +Fetch metadata and nucleotide sequences from NCBI Entrez and output to a GenBank file. +""" +import json +import argparse +from Bio import SeqIO, Entrez + +# To use the efetch API, the docs indicate only around 10,000 records should be fetched per request +# https://www.ncbi.nlm.nih.gov/books/NBK25499/#chapter4.EFetch +# However, in my testing with HepB, the max records returned was 9,999 +# - Jover, 16 August 2023 +BATCH_SIZE = 9999 + +Entrez.email = "hello@nextstrain.org" + +def parse_args(): + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument('--term', required=True, type=str, + help='Genbank search term. Replace spaces with "+", e.g. "Hepatitis+B+virus[All+Fields]complete+genome[All+Fields]"') + parser.add_argument('--output', required=True, type=str, help='Output file (Genbank)') + return parser.parse_args() + + +def get_esearch_history(term): + """ + Search for the provided *term* via ESearch and store the results using the + Entrez history server.¹ + + Returns the total count of returned records, query key, and web env needed + to access the records from the server. + + ¹ https://www.ncbi.nlm.nih.gov/books/NBK25497/#chapter2.Using_the_Entrez_History_Server + """ + handle = Entrez.esearch(db="nucleotide", term=term, retmode="json", usehistory="y", retmax=0) + esearch_result = json.loads(handle.read())['esearchresult'] + print(f"Search term {term!r} returned {esearch_result['count']} IDs.") + return { + "count": int(esearch_result["count"]), + "query_key": esearch_result["querykey"], + "web_env": esearch_result["webenv"] + } + + +def fetch_from_esearch_history(count, query_key, web_env): + """ + Fetch records in batches from Entrez history server using the provided + *query_key* and *web_env* and yields them as a BioPython SeqRecord iterator. + """ + print(f"Fetching GenBank records in batches of n={BATCH_SIZE}") + + for start in range(0, count, BATCH_SIZE): + handle = Entrez.efetch( + db="nucleotide", + query_key=query_key, + webenv=web_env, + retstart=start, + retmax=BATCH_SIZE, + rettype="gb", + retmode="text") + + yield SeqIO.parse(handle, "genbank") + + +if __name__=="__main__": + args = parse_args() + + with open(args.output, "w") as output_handle: + for batch_results in fetch_from_esearch_history(**get_esearch_history(args.term)): + SeqIO.write(batch_results, output_handle, "genbank") diff --git a/ingest/vendored/merge-user-metadata b/ingest/vendored/merge-user-metadata new file mode 100755 index 00000000..341c2dfa --- /dev/null +++ b/ingest/vendored/merge-user-metadata @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 +""" +Merges user curated annotations with the NDJSON records from stdin, with the user +curations overwriting the existing fields. The modified records are output +to stdout. This does not do any additional transformations on top of the user +curations. +""" +import argparse +import csv +import json +from collections import defaultdict +from sys import exit, stdin, stderr, stdout + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument("--annotations", metavar="TSV", required=True, + help="Manually curated annotations TSV file. " + + "The TSV should not have a header and should have exactly three columns: " + + "id to match existing metadata, field name, and field value. " + + "If there are multiple annotations for the same id and field, then the last value is used. " + + "Lines starting with '#' are treated as comments. " + + "Any '#' after the field value are treated as comments.") + parser.add_argument("--id-field", default="accession", + help="The ID field in the metadata to use to merge with the annotations.") + + args = parser.parse_args() + + annotations = defaultdict(dict) + with open(args.annotations, 'r') as annotations_fh: + csv_reader = csv.reader(annotations_fh, delimiter='\t') + for row in csv_reader: + if not row or row[0].lstrip()[0] == '#': + continue + elif len(row) != 3: + print("WARNING: Could not decode annotation line " + "\t".join(row), file=stderr) + continue + id, field, value = row + annotations[id][field] = value.partition('#')[0].rstrip() + + for record in stdin: + record = json.loads(record) + + record_id = record.get(args.id_field) + if record_id is None: + print(f"ERROR: ID field {args.id_field!r} does not exist in record", file=stderr) + exit(1) + + record.update(annotations.get(record_id, {})) + + json.dump(record, stdout, allow_nan=False, indent=None, separators=',:') + print() diff --git a/ingest/vendored/notify-on-diff b/ingest/vendored/notify-on-diff new file mode 100755 index 00000000..ddbe7da0 --- /dev/null +++ b/ingest/vendored/notify-on-diff @@ -0,0 +1,35 @@ +#!/usr/bin/env bash + +set -euo pipefail + +: "${SLACK_TOKEN:?The SLACK_TOKEN environment variable is required.}" +: "${SLACK_CHANNELS:?The SLACK_CHANNELS environment variable is required.}" + +bin="$(dirname "$0")" + +src="${1:?A source file is required as the first argument.}" +dst="${2:?A destination s3:// URL is required as the second argument.}" + +dst_local="$(mktemp -t s3-file-XXXXXX)" +diff="$(mktemp -t diff-XXXXXX)" + +trap "rm -f '$dst_local' '$diff'" EXIT + +# if the file is not already present, just exit +"$bin"/s3-object-exists "$dst" || exit 0 + +"$bin"/download-from-s3 "$dst" "$dst_local" + +# diff's exit code is 0 for no differences, 1 for differences found, and >1 for errors +diff_exit_code=0 +diff "$dst_local" "$src" > "$diff" || diff_exit_code=$? + +if [[ "$diff_exit_code" -eq 1 ]]; then + echo "Notifying Slack about diff." + "$bin"/notify-slack --upload "$src.diff" < "$diff" +elif [[ "$diff_exit_code" -gt 1 ]]; then + echo "Notifying Slack about diff failure" + "$bin"/notify-slack "Diff failed for $src" +else + echo "No change in $src." +fi diff --git a/ingest/vendored/notify-on-job-fail b/ingest/vendored/notify-on-job-fail new file mode 100755 index 00000000..7dd24095 --- /dev/null +++ b/ingest/vendored/notify-on-job-fail @@ -0,0 +1,23 @@ +#!/usr/bin/env bash +set -euo pipefail + +: "${SLACK_TOKEN:?The SLACK_TOKEN environment variable is required.}" +: "${SLACK_CHANNELS:?The SLACK_CHANNELS environment variable is required.}" + +: "${AWS_BATCH_JOB_ID:=}" +: "${GITHUB_RUN_ID:=}" + +bin="$(dirname "$0")" +job_name="${1:?A job name is required as the first argument}" +github_repo="${2:?A GitHub repository with owner and repository name is required as the second argument}" + +echo "Notifying Slack about failed ${job_name} job." +message="❌ ${job_name} job has FAILED 😞 " + +if [[ -n "${AWS_BATCH_JOB_ID}" ]]; then + message+="See AWS Batch job \`${AWS_BATCH_JOB_ID}\` () for error details. " +elif [[ -n "${GITHUB_RUN_ID}" ]]; then + message+="See GitHub Action for error details. " +fi + +"$bin"/notify-slack "$message" diff --git a/ingest/vendored/notify-on-job-start b/ingest/vendored/notify-on-job-start new file mode 100755 index 00000000..1c8ce7d6 --- /dev/null +++ b/ingest/vendored/notify-on-job-start @@ -0,0 +1,27 @@ +#!/usr/bin/env bash +set -euo pipefail + +: "${SLACK_TOKEN:?The SLACK_TOKEN environment variable is required.}" +: "${SLACK_CHANNELS:?The SLACK_CHANNELS environment variable is required.}" + +: "${AWS_BATCH_JOB_ID:=}" +: "${GITHUB_RUN_ID:=}" + +bin="$(dirname "$0")" +job_name="${1:?A job name is required as the first argument}" +github_repo="${2:?A GitHub repository with owner and repository name is required as the second argument}" +build_dir="${3:-ingest}" + +echo "Notifying Slack about started ${job_name} job." +message="${job_name} job has started." + +if [[ -n "${GITHUB_RUN_ID}" ]]; then + message+=" The job was submitted by GitHub Action ." +fi + +if [[ -n "${AWS_BATCH_JOB_ID}" ]]; then + message+=" The job was launched as AWS Batch job \`${AWS_BATCH_JOB_ID}\` ()." + message+=" Follow along in your local clone of ${github_repo} with: "'```'"nextstrain build --aws-batch --no-download --attach ${AWS_BATCH_JOB_ID} ${build_dir}"'```' +fi + +"$bin"/notify-slack "$message" diff --git a/ingest/vendored/notify-on-record-change b/ingest/vendored/notify-on-record-change new file mode 100755 index 00000000..f424252c --- /dev/null +++ b/ingest/vendored/notify-on-record-change @@ -0,0 +1,53 @@ +#!/usr/bin/env bash +set -euo pipefail + +: "${SLACK_TOKEN:?The SLACK_TOKEN environment variable is required.}" +: "${SLACK_CHANNELS:?The SLACK_CHANNELS environment variable is required.}" + +bin="$(dirname "$0")" + +src="${1:?A source ndjson file is required as the first argument.}" +dst="${2:?A destination ndjson s3:// URL is required as the second argument.}" +source_name=${3:?A record source name is required as the third argument.} + +# if the file is not already present, just exit +"$bin"/s3-object-exists "$dst" || exit 0 + +s3path="${dst#s3://}" +bucket="${s3path%%/*}" +key="${s3path#*/}" + +src_record_count="$(wc -l < "$src")" + +# Try getting record count from S3 object metadata +dst_record_count="$(aws s3api head-object --bucket "$bucket" --key "$key" --query "Metadata.recordcount || ''" --output text 2>/dev/null || true)" +if [[ -z "$dst_record_count" ]]; then + # This object doesn't have the record count stored as metadata + # We have to download it and count the lines locally + dst_record_count="$(wc -l < <(aws s3 cp --no-progress "$dst" - | xz -T0 -dcfq))" +fi + +added_records="$(( src_record_count - dst_record_count ))" + +printf "%'4d %s\n" "$src_record_count" "$src" +printf "%'4d %s\n" "$dst_record_count" "$dst" +printf "%'4d added records\n" "$added_records" + +slack_message="" + +if [[ $added_records -gt 0 ]]; then + echo "Notifying Slack about added records (n=$added_records)" + slack_message="📈 New records (n=$added_records) found on $source_name." + +elif [[ $added_records -lt 0 ]]; then + echo "Notifying Slack about fewer records (n=$added_records)" + slack_message="📉 Fewer records (n=$added_records) found on $source_name." + +else + echo "Notifying Slack about same number of records" + slack_message="⛔ No new records found on $source_name." +fi + +slack_message+=" (Total record count: $src_record_count)" + +"$bin"/notify-slack "$slack_message" diff --git a/ingest/vendored/notify-slack b/ingest/vendored/notify-slack new file mode 100755 index 00000000..a343435f --- /dev/null +++ b/ingest/vendored/notify-slack @@ -0,0 +1,56 @@ +#!/usr/bin/env bash +set -euo pipefail + +: "${SLACK_TOKEN:?The SLACK_TOKEN environment variable is required.}" +: "${SLACK_CHANNELS:?The SLACK_CHANNELS environment variable is required.}" + +upload=0 +output=/dev/null +thread_ts="" +broadcast=0 +args=() + +for arg; do + case "$arg" in + --upload) + upload=1;; + --output=*) + output="${arg#*=}";; + --thread-ts=*) + thread_ts="${arg#*=}";; + --broadcast) + broadcast=1;; + *) + args+=("$arg");; + esac +done + +set -- "${args[@]}" + +text="${1:?Some message text is required.}" + +if [[ "$upload" == 1 ]]; then + echo "Uploading data to Slack with the message: $text" + curl https://slack.com/api/files.upload \ + --header "Authorization: Bearer $SLACK_TOKEN" \ + --form-string channels="$SLACK_CHANNELS" \ + --form-string title="$text" \ + --form-string filename="$text" \ + --form-string thread_ts="$thread_ts" \ + --form file=@/dev/stdin \ + --form filetype=text \ + --fail --silent --show-error \ + --http1.1 \ + --output "$output" +else + echo "Posting Slack message: $text" + curl https://slack.com/api/chat.postMessage \ + --header "Authorization: Bearer $SLACK_TOKEN" \ + --form-string channel="$SLACK_CHANNELS" \ + --form-string text="$text" \ + --form-string thread_ts="$thread_ts" \ + --form-string reply_broadcast="$broadcast" \ + --fail --silent --show-error \ + --http1.1 \ + --output "$output" +fi diff --git a/ingest/vendored/s3-object-exists b/ingest/vendored/s3-object-exists new file mode 100755 index 00000000..679c20a3 --- /dev/null +++ b/ingest/vendored/s3-object-exists @@ -0,0 +1,8 @@ +#!/usr/bin/env bash +set -euo pipefail + +url="${1#s3://}" +bucket="${url%%/*}" +key="${url#*/}" + +aws s3api head-object --bucket "$bucket" --key "$key" &>/dev/null diff --git a/ingest/vendored/sha256sum b/ingest/vendored/sha256sum new file mode 100755 index 00000000..32d7ef82 --- /dev/null +++ b/ingest/vendored/sha256sum @@ -0,0 +1,15 @@ +#!/usr/bin/env python3 +""" +Portable sha256sum utility. +""" +from hashlib import sha256 +from sys import stdin + +chunk_size = 5 * 1024**2 # 5 MiB + +h = sha256() + +for chunk in iter(lambda: stdin.buffer.read(chunk_size), b""): + h.update(chunk) + +print(h.hexdigest()) diff --git a/ingest/vendored/tests/transform-strain-names/transform-strain-names.t b/ingest/vendored/tests/transform-strain-names/transform-strain-names.t new file mode 100644 index 00000000..1c05df7b --- /dev/null +++ b/ingest/vendored/tests/transform-strain-names/transform-strain-names.t @@ -0,0 +1,17 @@ +Look for strain name in "strain" or a list of backup fields. + +If strain entry exists, do not do anything. + + $ echo '{"strain": "i/am/a/strain", "strain_s": "other"}' \ + > | $TESTDIR/../../transform-strain-names \ + > --strain-regex '^.+$' \ + > --backup-fields strain_s accession + {"strain":"i/am/a/strain","strain_s":"other"} + +If strain entry does not exists, search the backup fields + + $ echo '{"strain_s": "other"}' \ + > | $TESTDIR/../../transform-strain-names \ + > --strain-regex '^.+$' \ + > --backup-fields accession strain_s + {"strain_s":"other","strain":"other"} \ No newline at end of file diff --git a/ingest/vendored/transform-authors b/ingest/vendored/transform-authors new file mode 100755 index 00000000..0bade20e --- /dev/null +++ b/ingest/vendored/transform-authors @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +""" +Abbreviates a full list of authors to be ' et al.' of the NDJSON +record from stdin and outputs modified records to stdout. + +Note: This is a "best effort" approach and can potentially mangle the author name. +""" +import argparse +import json +import re +from sys import stderr, stdin, stdout + + +def parse_authors(record: dict, authors_field: str, default_value: str, + index: int, abbr_authors_field: str = None) -> dict: + # Strip and normalize whitespace + new_authors = re.sub(r'\s+', ' ', record[authors_field]) + + if new_authors == "": + new_authors = default_value + else: + # Split authors list on comma/semicolon + # OR "and"/"&" with at least one space before and after + new_authors = re.split(r'(?:\s*[,,;;]\s*|\s+(?:and|&)\s+)', new_authors)[0] + + # if it does not already end with " et al.", add it + if not new_authors.strip('. ').endswith(" et al"): + new_authors += ' et al' + + if abbr_authors_field: + if record.get(abbr_authors_field): + print( + f"WARNING: the {abbr_authors_field!r} field already exists", + f"in record {index} and will be overwritten!", + file=stderr + ) + + record[abbr_authors_field] = new_authors + else: + record[authors_field] = new_authors + + return record + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument("--authors-field", default="authors", + help="The field containing list of authors.") + parser.add_argument("--default-value", default="?", + help="Default value to use if authors list is empty.") + parser.add_argument("--abbr-authors-field", + help="The field for the generated abbreviated authors. " + + "If not provided, the original authors field will be modified.") + + args = parser.parse_args() + + for index, record in enumerate(stdin): + record = json.loads(record) + + parse_authors(record, args.authors_field, args.default_value, index, args.abbr_authors_field) + + json.dump(record, stdout, allow_nan=False, indent=None, separators=',:') + print() diff --git a/ingest/vendored/transform-field-names b/ingest/vendored/transform-field-names new file mode 100755 index 00000000..fde223fc --- /dev/null +++ b/ingest/vendored/transform-field-names @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 +""" +Renames fields of the NDJSON record from stdin and outputs modified records +to stdout. +""" +import argparse +import json +from sys import stderr, stdin, stdout + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument("--field-map", nargs="+", + help="Fields names in the NDJSON record mapped to new field names, " + + "formatted as '{old_field_name}={new_field_name}'. " + + "If the old field does not exist in record, the new field will be added with an empty string value." + + "If the new field already exists in record, then the renaming of the old field will be skipped.") + parser.add_argument("--force", action="store_true", + help="Force renaming of old field even if the new field already exists. " + + "Please keep in mind this will overwrite the value of the new field.") + + args = parser.parse_args() + + field_map = {} + for field in args.field_map: + old_name, new_name = field.split('=') + field_map[old_name] = new_name + + for record in stdin: + record = json.loads(record) + + for old_field, new_field in field_map.items(): + + if record.get(new_field) and not args.force: + print( + f"WARNING: skipping rename of {old_field} because record", + f"already has a field named {new_field}.", + file=stderr + ) + continue + + record[new_field] = record.pop(old_field, '') + + json.dump(record, stdout, allow_nan=False, indent=None, separators=',:') + print() diff --git a/ingest/vendored/transform-genbank-location b/ingest/vendored/transform-genbank-location new file mode 100755 index 00000000..70ba56fb --- /dev/null +++ b/ingest/vendored/transform-genbank-location @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 +""" +Parses GenBank's 'location' field of the NDJSON record from stdin to 3 separate +fields: 'country', 'division', and 'location'. Checks that a record is from +GenBank by verifying that the 'database' field has a value of "GenBank" or "RefSeq". + +Outputs the modified record to stdout. +""" +import json +from sys import stdin, stdout + + +def parse_location(record: dict) -> dict: + # Expected pattern for the location field is "[:][, ]" + # See GenBank docs for their "country" field: + # https://www.ncbi.nlm.nih.gov/genbank/collab/country/ + geographic_data = record['location'].split(':') + + country = geographic_data[0] + division = '' + location = '' + + if len(geographic_data) == 2: + division , _ , location = geographic_data[1].partition(',') + + record['country'] = country.strip() + record['division'] = division.strip() + record['location'] = location.strip() + + return record + + +if __name__ == '__main__': + + for record in stdin: + record = json.loads(record) + + database = record.get('database', '') + if database in {'GenBank', 'RefSeq'}: + parse_location(record) + + json.dump(record, stdout, allow_nan=False, indent=None, separators=',:') + print() diff --git a/ingest/vendored/transform-strain-names b/ingest/vendored/transform-strain-names new file mode 100755 index 00000000..d86c0e40 --- /dev/null +++ b/ingest/vendored/transform-strain-names @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +""" +Verifies strain name pattern in the 'strain' field of the NDJSON record from +stdin. Adds a 'strain' field to the record if it does not already exist. + +Outputs the modified records to stdout. +""" +import argparse +import json +import re +from sys import stderr, stdin, stdout + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument("--strain-regex", default="^.+$", + help="Regex pattern for strain names. " + + "Strain names that do not match the pattern will be dropped.") + parser.add_argument("--backup-fields", nargs="*", + help="List of backup fields to use as strain name if the value in 'strain' " + + "does not match the strain regex pattern. " + + "If multiple fields are provided, will use the first field that has a non-empty string.") + + args = parser.parse_args() + + strain_name_pattern = re.compile(args.strain_regex) + + for index, record in enumerate(stdin): + record = json.loads(record) + + # Verify strain name matches the strain regex pattern + if strain_name_pattern.match(record.get('strain', '')) is None: + # Default to empty string if not matching pattern + record['strain'] = '' + # Use non-empty value of backup fields if provided + if args.backup_fields: + for field in args.backup_fields: + if record.get(field): + record['strain'] = str(record[field]) + break + + if record['strain'] == '': + print(f"WARNING: Record number {index} has an empty string as the strain name.", file=stderr) + + + json.dump(record, stdout, allow_nan=False, indent=None, separators=',:') + print() diff --git a/ingest/vendored/trigger b/ingest/vendored/trigger new file mode 100755 index 00000000..586f9cc0 --- /dev/null +++ b/ingest/vendored/trigger @@ -0,0 +1,56 @@ +#!/usr/bin/env bash +set -euo pipefail + +: "${PAT_GITHUB_DISPATCH:=}" + +github_repo="${1:?A GitHub repository with owner and repository name is required as the first argument.}" +event_type="${2:?An event type is required as the second argument.}" +shift 2 + +if [[ $# -eq 0 && -z $PAT_GITHUB_DISPATCH ]]; then + cat >&2 <<. +You must specify options to curl for your GitHub credentials. For example, you +can specify your GitHub username, and will be prompted for your password: + + $0 $github_repo $event_type --user + +Be sure to enter a personal access token¹ as your password since GitHub has +discontinued password authentication to the API starting on November 13, 2020². + +You can also store your credentials or a personal access token in a netrc +file³: + + machine api.github.com + login + password + +and then tell curl to use it: + + $0 $github_repo $event_type --netrc + +which will then not require you to type your password every time. + +¹ https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line +² https://docs.github.com/en/rest/overview/other-authentication-methods#via-username-and-password +³ https://ec.haxx.se/usingcurl/usingcurl-netrc +. + exit 1 +fi + +auth=':' +if [[ -n $PAT_GITHUB_DISPATCH ]]; then + auth="Authorization: Bearer ${PAT_GITHUB_DISPATCH}" +fi + +if curl -fsS "https://api.github.com/repos/${github_repo}/dispatches" \ + -H 'Accept: application/vnd.github.v3+json' \ + -H 'Content-Type: application/json' \ + -H "$auth" \ + -d '{"event_type":"'"$event_type"'"}' \ + "$@" +then + echo "Successfully triggered $event_type" +else + echo "Request failed" >&2 + exit 1 +fi diff --git a/ingest/vendored/trigger-on-new-data b/ingest/vendored/trigger-on-new-data new file mode 100755 index 00000000..470d2f4f --- /dev/null +++ b/ingest/vendored/trigger-on-new-data @@ -0,0 +1,32 @@ +#!/usr/bin/env bash +set -euo pipefail + +: "${PAT_GITHUB_DISPATCH:?The PAT_GITHUB_DISPATCH environment variable is required.}" + +bin="$(dirname "$0")" + +github_repo="${1:?A GitHub repository with owner and repository name is required as the first argument.}" +event_type="${2:?An event type is required as the second argument.}" +metadata="${3:?A metadata upload output file is required as the third argument.}" +sequences="${4:?An sequence FASTA upload output file is required as the fourth argument.}" +identical_file_message="${5:-files are identical}" + +new_metadata=$(grep "$identical_file_message" "$metadata" >/dev/null; echo $?) +new_sequences=$(grep "$identical_file_message" "$sequences" >/dev/null; echo $?) + +slack_message="" + +# grep exit status 0 for found match, 1 for no match, 2 if an error occurred +if [[ $new_metadata -eq 1 || $new_sequences -eq 1 ]]; then + slack_message="Triggering new builds due to updated metadata and/or sequences" + "$bin"/trigger "$github_repo" "$event_type" +elif [[ $new_metadata -eq 0 && $new_sequences -eq 0 ]]; then + slack_message="Skipping trigger of rebuild: Both metadata TSV and sequences FASTA are identical to S3 files." +else + slack_message="Skipping trigger of rebuild: Unable to determine if data has been updated." +fi + + +if ! "$bin"/notify-slack "$slack_message"; then + echo "Notifying Slack failed, but exiting with success anyway." +fi diff --git a/ingest/vendored/upload-to-s3 b/ingest/vendored/upload-to-s3 new file mode 100755 index 00000000..36d171c6 --- /dev/null +++ b/ingest/vendored/upload-to-s3 @@ -0,0 +1,78 @@ +#!/usr/bin/env bash +set -euo pipefail + +bin="$(dirname "$0")" + +main() { + local quiet=0 + + for arg; do + case "$arg" in + --quiet) + quiet=1 + shift;; + *) + break;; + esac + done + + local src="${1:?A source file is required as the first argument.}" + local dst="${2:?A destination s3:// URL is required as the second argument.}" + local cloudfront_domain="${3:-}" + + local s3path="${dst#s3://}" + local bucket="${s3path%%/*}" + local key="${s3path#*/}" + + local src_hash dst_hash no_hash=0000000000000000000000000000000000000000000000000000000000000000 + src_hash="$("$bin/sha256sum" < "$src")" + dst_hash="$(aws s3api head-object --bucket "$bucket" --key "$key" --query Metadata.sha256sum --output text 2>/dev/null || echo "$no_hash")" + + if [[ $src_hash != "$dst_hash" ]]; then + # The record count may have changed + src_record_count="$(wc -l < "$src")" + + echo "Uploading $src → $dst" + if [[ "$dst" == *.gz ]]; then + gzip -c "$src" + elif [[ "$dst" == *.xz ]]; then + xz -2 -T0 -c "$src" + elif [[ "$dst" == *.zst ]]; then + zstd -T0 -c "$src" + else + cat "$src" + fi | aws s3 cp --no-progress - "$dst" --metadata sha256sum="$src_hash",recordcount="$src_record_count" "$(content-type "$dst")" + + if [[ -n $cloudfront_domain ]]; then + echo "Creating CloudFront invalidation for $cloudfront_domain/$key" + if ! "$bin"/cloudfront-invalidate "$cloudfront_domain" "/$key"; then + echo "CloudFront invalidation failed, but exiting with success anyway." + fi + fi + + if [[ $quiet == 1 ]]; then + echo "Quiet mode. No Slack notification sent." + exit 0 + fi + + if ! "$bin"/notify-slack "Updated $dst available."; then + echo "Notifying Slack failed, but exiting with success anyway." + fi + else + echo "Uploading $src → $dst: files are identical, skipping upload" + fi +} + +content-type() { + case "$1" in + *.tsv) echo --content-type=text/tab-separated-values;; + *.csv) echo --content-type=text/comma-separated-values;; + *.ndjson) echo --content-type=application/x-ndjson;; + *.gz) echo --content-type=application/gzip;; + *.xz) echo --content-type=application/x-xz;; + *.zst) echo --content-type=application/zstd;; + *) echo --content-type=text/plain;; + esac +} + +main "$@" diff --git a/ingest/workflow/snakemake_rules/fetch_sequences.smk b/ingest/workflow/snakemake_rules/fetch_sequences.smk new file mode 100644 index 00000000..efd9fbb7 --- /dev/null +++ b/ingest/workflow/snakemake_rules/fetch_sequences.smk @@ -0,0 +1,108 @@ +""" +This part of the workflow handles fetching sequences from various sources. +Uses `config.sources` to determine which sequences to include in final output. + +Currently only fetches sequences from GenBank, but other sources can be +defined in the config. If adding other sources, add a new rule upstream +of rule `fetch_all_sequences` to create the file `data/{source}.ndjson` or the +file must exist as a static file in the repo. + +Produces final output as + + sequences_ndjson = "data/sequences.ndjson" + +""" + + +rule fetch_ncbi_dataset_package: + output: + dataset_package=temp("data/ncbi_dataset.zip"), + retries: 5 # Requires snakemake 7.7.0 or later + benchmark: + "benchmarks/fetch_ncbi_dataset_package.txt" + params: + ncbi_taxon_id=config["ncbi_taxon_id"], + shell: + """ + datasets download virus genome taxon {params.ncbi_taxon_id} \ + --no-progressbar \ + --filename {output.dataset_package} + """ + + +rule extract_ncbi_dataset_sequences: + input: + dataset_package="data/ncbi_dataset.zip", + output: + ncbi_dataset_sequences=temp("data/ncbi_dataset_sequences.fasta"), + benchmark: + "benchmarks/extract_ncbi_dataset_sequences.txt" + shell: + """ + unzip -jp {input.dataset_package} \ + ncbi_dataset/data/genomic.fna > {output.ncbi_dataset_sequences} + """ + + +rule format_ncbi_dataset_report: + # Formats the headers to match the NCBI mnemonic names + input: + dataset_package="data/ncbi_dataset.zip", + output: + ncbi_dataset_tsv=temp("data/ncbi_dataset_report.tsv"), + params: + ncbi_datasets_fields=",".join(config["ncbi_datasets_fields"]), + benchmark: + "benchmarks/format_ncbi_dataset_report.txt" + shell: + """ + dataformat tsv virus-genome \ + --package {input.dataset_package} \ + --fields {params.ncbi_datasets_fields:q} \ + --elide-header \ + | csvtk add-header -t -l -n {params.ncbi_datasets_fields:q} \ + | csvtk rename -t -f accession -n accession-rev \ + | csvtk -tl mutate -f accession-rev -n accession -p "^(.+?)\." \ + | tsv-select -H -f accession --rest last \ + > {output.ncbi_dataset_tsv} + """ + + +rule format_ncbi_datasets_ndjson: + input: + ncbi_dataset_sequences="data/ncbi_dataset_sequences.fasta", + ncbi_dataset_tsv="data/ncbi_dataset_report.tsv", + output: + ndjson="data/genbank.ndjson", + params: + ncbi_datasets_fields=",".join(config["ncbi_datasets_fields"]), + log: + "logs/format_ncbi_datasets_ndjson.txt", + benchmark: + "benchmarks/format_ncbi_datasets_ndjson.txt" + shell: + """ + augur curate passthru \ + --metadata {input.ncbi_dataset_tsv} \ + --fasta {input.ncbi_dataset_sequences} \ + --seq-id-column accession-rev \ + --seq-field sequence \ + --unmatched-reporting warn \ + --duplicate-reporting warn \ + 2> {log} > {output.ndjson} + """ + + +def _get_all_sources(wildcards): + return [f"data/{source}.ndjson" for source in config["sources"]] + + +rule fetch_all_sequences: + input: + all_sources=_get_all_sources, + output: + sequences_ndjson="data/sequences.ndjson", + shell: + """ + cat {input.all_sources} > {output.sequences_ndjson} + """ diff --git a/ingest/workflow/snakemake_rules/slack_notifications.smk b/ingest/workflow/snakemake_rules/slack_notifications.smk new file mode 100644 index 00000000..2cf86abb --- /dev/null +++ b/ingest/workflow/snakemake_rules/slack_notifications.smk @@ -0,0 +1,55 @@ +""" +This part of the workflow handles various Slack notifications. +Designed to be used internally by the Nextstrain team with hard-coded paths +to files on AWS S3. + +All rules here require two environment variables: + * SLACK_TOKEN + * SLACK_CHANNELS +""" +import os +import sys + +slack_envvars_defined = "SLACK_CHANNELS" in os.environ and "SLACK_TOKEN" in os.environ +if not slack_envvars_defined: + print( + "ERROR: Slack notifications require two environment variables: 'SLACK_CHANNELS' and 'SLACK_TOKEN'.", + file=sys.stderr, + ) + sys.exit(1) + +S3_SRC = "s3://nextstrain-data/files/workflows/dengue" + + +rule notify_on_genbank_record_change: + input: + genbank_ndjson="data/genbank.ndjson", + output: + touch("data/notify/genbank-record-change.done"), + params: + s3_src=S3_SRC, + shell: + """ + ./vendored/notify-on-record-change {input.genbank_ndjson} {params.s3_src:q}/genbank.ndjson.xz Genbank + """ + + +rule notify_on_metadata_diff: + input: + metadata="results/metadata.tsv", + output: + touch("data/notify/metadata-diff.done"), + params: + s3_src=S3_SRC, + shell: + """ + ./vendored/notify-on-diff {input.metadata} {params.s3_src:q}/metadata.tsv.gz + """ + + +onstart: + shell("./vendored/notify-on-job-start Ingest nextstrain/dengue") + + +onerror: + shell("./vendored/notify-on-job-fail Ingest nextstrain/dengue") diff --git a/ingest/workflow/snakemake_rules/transform.smk b/ingest/workflow/snakemake_rules/transform.smk new file mode 100644 index 00000000..ec63d00f --- /dev/null +++ b/ingest/workflow/snakemake_rules/transform.smk @@ -0,0 +1,97 @@ +""" +This part of the workflow handles transforming the data into standardized +formats and expects input file + + sequences_ndjson = "data/sequences.ndjson" + +This will produce output files as + + metadata = "results/metadata.tsv" + sequences = "results/sequences.fasta" + +Parameters are expected to be defined in `config.transform`. +""" + + +rule fetch_general_geolocation_rules: + output: + general_geolocation_rules="data/general-geolocation-rules.tsv", + params: + geolocation_rules_url=config["transform"]["geolocation_rules_url"], + shell: + """ + curl {params.geolocation_rules_url} > {output.general_geolocation_rules} + """ + + +rule concat_geolocation_rules: + input: + general_geolocation_rules="data/general-geolocation-rules.tsv", + local_geolocation_rules=config["transform"]["local_geolocation_rules"], + output: + all_geolocation_rules="data/all-geolocation-rules.tsv", + shell: + """ + cat {input.general_geolocation_rules} {input.local_geolocation_rules} >> {output.all_geolocation_rules} + """ + + +rule transform: + input: + sequences_ndjson="data/sequences.ndjson", + all_geolocation_rules="data/all-geolocation-rules.tsv", + annotations=config["transform"]["annotations"], + output: + metadata="results/metadata.tsv", + sequences="results/sequences.fasta", + log: + "logs/transform.txt", + params: + field_map=config["transform"]["field_map"], + strain_regex=config["transform"]["strain_regex"], + strain_backup_fields=config["transform"]["strain_backup_fields"], + date_fields=config["transform"]["date_fields"], + expected_date_formats=config["transform"]["expected_date_formats"], + articles=config["transform"]["titlecase"]["articles"], + abbreviations=config["transform"]["titlecase"]["abbreviations"], + titlecase_fields=config["transform"]["titlecase"]["fields"], + authors_field=config["transform"]["authors_field"], + authors_default_value=config["transform"]["authors_default_value"], + abbr_authors_field=config["transform"]["abbr_authors_field"], + annotations_id=config["transform"]["annotations_id"], + metadata_columns=config["transform"]["metadata_columns"], + id_field=config["transform"]["id_field"], + sequence_field=config["transform"]["sequence_field"], + shell: + """ + (cat {input.sequences_ndjson} \ + | ./vendored/transform-field-names \ + --field-map {params.field_map} \ + | augur curate normalize-strings \ + | ./vendored/transform-strain-names \ + --strain-regex {params.strain_regex} \ + --backup-fields {params.strain_backup_fields} \ + | augur curate format-dates \ + --date-fields {params.date_fields} \ + --expected-date-formats {params.expected_date_formats} \ + | ./vendored/transform-genbank-location \ + | augur curate titlecase \ + --titlecase-fields {params.titlecase_fields} \ + --articles {params.articles} \ + --abbreviations {params.abbreviations} \ + | ./vendored/transform-authors \ + --authors-field {params.authors_field} \ + --default-value {params.authors_default_value} \ + --abbr-authors-field {params.abbr_authors_field} \ + | ./vendored/apply-geolocation-rules \ + --geolocation-rules {input.all_geolocation_rules} \ + | ./vendored/merge-user-metadata \ + --annotations {input.annotations} \ + --id-field {params.annotations_id} \ + | ./bin/ndjson-to-tsv-and-fasta \ + --metadata-columns {params.metadata_columns} \ + --metadata {output.metadata} \ + --fasta {output.sequences} \ + --id-field {params.id_field} \ + --sequence-field {params.sequence_field} ) 2>> {log} + """ diff --git a/ingest/workflow/snakemake_rules/trigger_rebuild.smk b/ingest/workflow/snakemake_rules/trigger_rebuild.smk new file mode 100644 index 00000000..77a13530 --- /dev/null +++ b/ingest/workflow/snakemake_rules/trigger_rebuild.smk @@ -0,0 +1,22 @@ +""" +This part of the workflow handles triggering new mpox builds after the +latest metadata TSV and sequence FASTA files have been uploaded to S3. + +Designed to be used internally by the Nextstrain team with hard-coded paths +to expected upload flag files. +""" + + +rule trigger_build: + """ + Triggering dengue builds via repository action type `rebuild`. + """ + input: + metadata_upload="data/upload/s3/metadata.tsv.gz.done", + fasta_upload="data/upload/s3/sequences.fasta.xz.done", + output: + touch("data/trigger/rebuild.done"), + shell: + """ + ./vendored/trigger-on-new-data nextstrain/dengue rebuild {input.metadata_upload} {input.fasta_upload} + """ diff --git a/ingest/workflow/snakemake_rules/upload.smk b/ingest/workflow/snakemake_rules/upload.smk new file mode 100644 index 00000000..60c5c9b7 --- /dev/null +++ b/ingest/workflow/snakemake_rules/upload.smk @@ -0,0 +1,64 @@ +""" +This part of the workflow handles uploading files to a specified destination. + +Uses predefined wildcard `file_to_upload` determine input and predefined +wildcard `remote_file_name` as the remote file name in the specified destination. + +Produces output files as `data/upload/{upload_target_name}/{remote_file_name}.done`. + +Currently only supports uploads to AWS S3, but additional upload rules can +be easily added as long as they follow the output pattern described above. +""" +import os + +slack_envvars_defined = "SLACK_CHANNELS" in os.environ and "SLACK_TOKEN" in os.environ +send_notifications = ( + config.get("send_slack_notifications", False) and slack_envvars_defined +) + + +def _get_upload_inputs(wildcards): + """ + If the file_to_upload has Slack notifications that depend on diffs with S3 files, + then we want the upload rule to run after the notification rule. + + This function is mostly to keep track of which flag files to expect for + the rules in `slack_notifications.smk`, so it only includes flag files if + `send_notifications` is True. + """ + inputs = { + "file_to_upload": config["upload"]["s3"]["files_to_upload"][ + wildcards.remote_file_name + ], + } + + if send_notifications: + flag_file = [] + + if file_to_upload == "data/genbank.ndjson": + flag_file = "data/notify/genbank-record-change.done" + elif file_to_upload == "results/metadata.tsv": + flag_file = "data/notify/metadata-diff.done" + + inputs["notify_flag_file"] = flag_file + + return inputs + + +rule upload_to_s3: + input: + unpack(_get_upload_inputs), + output: + "data/upload/s3/{remote_file_name}.done", + params: + quiet="" if send_notifications else "--quiet", + s3_dst=config["upload"].get("s3", {}).get("dst", ""), + cloudfront_domain=config["upload"].get("s3", {}).get("cloudfront_domain", ""), + shell: + """ + ./vendored/upload-to-s3 \ + {params.quiet} \ + {input.file_to_upload:q} \ + {params.s3_dst:q}/{wildcards.remote_file_name:q} \ + {params.cloudfront_domain} 2>&1 | tee {output} + """