Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Harmonize ingest with pathogen repo guide #35

Merged
merged 10 commits into from
Mar 19, 2024
19 changes: 11 additions & 8 deletions ingest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ A pair of files for each dengue serotype (denv1 - denv4)
Run the complete ingest pipeline and upload results to AWS S3 with

```sh
nextstrain build ingest --configfiles config/config.yaml config/optional.yaml
nextstrain build \
--env AWS_ACCESS_KEY_ID \
--env AWS_SECRET_ACCESS_KEY \
ingest \
upload_all \
--configfile build-configs/nextstrain-automation/config.yaml
```

### Adding new sequences not from GenBank
Expand All @@ -70,27 +75,25 @@ Do the following to include sequences from static FASTA files.
!ingest/data/{file-name}.ndjson
```

3. Add the `file-name` (without the `.ndjson` extension) as a source to `ingest/config/config.yaml`. This will tell the ingest pipeline to concatenate the records to the GenBank sequences and run them through the same transform pipeline.
3. Add the `file-name` (without the `.ndjson` extension) as a source to `ingest/defaults/config.yaml`. This will tell the ingest pipeline to concatenate the records to the GenBank sequences and run them through the same transform pipeline.

## Configuration

Configuration takes place in `config/config.yaml` by default.
Optional configs for uploading files and Slack notifications are in `config/optional.yaml`.
Configuration takes place in `defaults/config.yaml` by default.
Optional configs for uploading files are in `build-configs/nextstrain-automation/config.yaml`.

### Environment Variables

The complete ingest pipeline with AWS S3 uploads and Slack notifications uses the following environment variables:
The complete ingest pipeline with AWS S3 uploads uses the following environment variables:

#### Required

- `AWS_ACCESS_KEY_ID`
- `AWS_SECRET_ACCESS_KEY`
- `SLACK_TOKEN`
- `SLACK_CHANNELS`

#### Optional

These are optional environment variables used in our automated pipeline for providing detailed Slack notifications.
These are optional environment variables used in our automated pipeline.

- `GITHUB_RUN_ID` - provided via [`github.run_id` in a GitHub Action workflow](https://docs.github.com/en/actions/learn-github-actions/contexts#github-context)
- `AWS_BATCH_JOB_ID` - provided via [AWS Batch Job environment variables](https://docs.aws.amazon.com/batch/latest/userguide/job_env_vars.html)
Expand Down
62 changes: 10 additions & 52 deletions ingest/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,65 +4,23 @@ min_version(
"7.7.0"
) # Snakemake 7.7.0 introduced `retries` directive used in fetch-sequences

if not config:

configfile: "config/config.yaml"


send_slack_notifications = config.get("send_slack_notifications", False)
configfile: "defaults/config.yaml"

serotypes = ['all', 'denv1', 'denv2', 'denv3', 'denv4']

def _get_all_targets(wildcards):
# Default targets are the metadata TSV and sequences FASTA files
all_targets = expand(["results/sequences_{serotype}.fasta", "results/metadata_{serotype}.tsv"], serotype=serotypes)


# Add additional targets based on upload config
upload_config = config.get("upload", {})

for target, params in upload_config.items():
files_to_upload = params.get("files_to_upload", {})

if not params.get("dst"):
print(
f"Skipping file upload for {target!r} because the destination was not defined."
)
else:
all_targets.extend(
expand(
[f"data/upload/{target}/{{remote_file_name}}.done"],
zip,
remote_file_name=files_to_upload.keys(),
)
)

# Add additional targets for Nextstrain's internal Slack notifications
if send_slack_notifications:
all_targets.extend(
[
"data/notify/genbank-record-change.done",
"data/notify/metadata-diff.done",
]
)

if config.get("trigger_rebuild", False):
all_targets.append("data/trigger/rebuild.done")

return all_targets


rule all:
input:
_get_all_targets,

expand(["results/sequences_{serotype}.fasta", "results/metadata_{serotype}.tsv"], serotype=serotypes)

include: "workflow/snakemake_rules/fetch_sequences.smk"
include: "workflow/snakemake_rules/transform.smk"
include: "workflow/snakemake_rules/split_serotypes.smk"
include: "workflow/snakemake_rules/nextclade.smk"

include: "rules/fetch_from_ncbi.smk"
include: "rules/curate.smk"
include: "rules/split_serotypes.smk"
include: "rules/nextclade.smk"

if config.get("upload", False):
# Include custom rules defined in the config.
if "custom_rules" in config:
for rule_file in config["custom_rules"]:

include: "workflow/snakemake_rules/upload.smk"
include: rule_file
29 changes: 29 additions & 0 deletions ingest/build-configs/nextstrain-automation/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# This configuration file should contain all required configuration parameters
# for the ingest workflow to run with additional Nextstrain automation rules.

# Custom rules to run as part of the Nextstrain automated workflow
# The paths should be relative to the ingest directory.
custom_rules:
- build-configs/nextstrain-automation/upload.smk

# Nextstrain CloudFront domain to ensure that we invalidate CloudFront after the S3 uploads
# This is required as long as we are using the AWS CLI for uploads
cloudfront_domain: "data.nextstrain.org"

# Nextstrain AWS S3 Bucket with pathogen prefix
s3_dst: "s3://nextstrain-data/files/workflows/dengue"

# Mapping of files to upload
files_to_upload:
genbank.ndjson.xz: data/genbank.ndjson
all_sequences.ndjson.xz: data/sequences.ndjson
metadata_all.tsv.zst: results/metadata_all.tsv
sequences_all.fasta.zst: results/sequences_all.fasta
metadata_denv1.tsv.zst: results/metadata_denv1.tsv
sequences_denv1.fasta.zst: results/sequences_denv1.fasta
metadata_denv2.tsv.zst: results/metadata_denv2.tsv
sequences_denv2.fasta.zst: results/sequences_denv2.fasta
metadata_denv3.tsv.zst: results/metadata_denv3.tsv
sequences_denv3.fasta.zst: results/sequences_denv3.fasta
metadata_denv4.tsv.zst: results/metadata_denv4.tsv
sequences_denv4.fasta.zst: results/sequences_denv4.fasta
42 changes: 42 additions & 0 deletions ingest/build-configs/nextstrain-automation/upload.smk
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""
This part of the workflow handles uploading files to AWS S3.

Files to upload must be defined in the `files_to_upload` config param, where
the keys are the remote files and the values are the local filepaths
relative to the ingest directory.

Produces a single file for each uploaded file:
"results/upload/{remote_file}.upload"

The rule `upload_all` can be used as a target to upload all files.
"""
import os


rule upload_to_s3:
input:
file_to_upload=lambda wildcards: config["files_to_upload"][wildcards.remote_file],
output:
"results/upload/{remote_file}.upload",
params:
quiet="" if send_notifications else "--quiet",
s3_dst=config["s3_dst"],
cloudfront_domain=config["cloudfront_domain"],
shell:
"""
./vendored/upload-to-s3 \
{params.quiet} \
{input.file_to_upload:q} \
{params.s3_dst:q}/{wildcards.remote_file:q} \
{params.cloudfront_domain} 2>&1 | tee {output}
"""


rule upload_all:
input:
uploads=[
f"results/upload/{remote_file}.upload"
for remote_file in config["files_to_upload"].keys()
],
output:
touch("results/upload_all.done")
25 changes: 0 additions & 25 deletions ingest/config/optional.yaml

This file was deleted.

File renamed without changes.
41 changes: 20 additions & 21 deletions ingest/config/config.yaml → ingest/defaults/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,27 @@ ncbi_datasets_fields:
- submitter-names
- submitter-affiliation

# Params for the transform rule
transform:
# Params for the curate rule
curate:
# NCBI Fields to rename to Nextstrain field names.
# This is the first step in the pipeline, so any references to field names
# in the configs below should use the new field names
field_map: [
'accession=genbank_accession',
'accession-rev=genbank_accession_rev',
'isolate-lineage=strain',
'sourcedb=database', # necessary for applying geo location rules
'geo-region=region',
'geo-location=location',
'host-name=host',
'isolate-collection-date=date',
'release-date=release_date',
'update-date=update_date',
'virus-tax-id=virus_tax_id',
'virus-name=virus_name',
'sra-accs=sra_accessions',
'submitter-names=authors',
'submitter-affiliation=institution',
]
field_map:
accession: genbank_accession
accession-rev: genbank_accession_rev
isolate-lineage: strain
sourcedb: database
geo-region: region
geo-location: location
host-name: host
isolate-collection-date: date
release-date: release_date
update-date: update_date
virus-tax-id: virus_tax_id
virus-name: virus_name
sra-accs: sra_accessions
submitter-names: authors
submitter-affiliation: institution
# Standardized strain name regex
# Currently accepts any characters because we do not have a clear standard for strain names
strain_regex: '^.+$'
Expand Down Expand Up @@ -77,9 +76,9 @@ transform:
geolocation_rules_url: 'https://raw.githubusercontent.com/nextstrain/ncov-ingest/master/source-data/gisaid_geoLocationRules.tsv'
# Local geolocation rules that are only applicable to mpox data
# Local rules can overwrite the general geolocation rules provided above
local_geolocation_rules: 'source-data/geolocation-rules.tsv'
local_geolocation_rules: 'defaults/geolocation-rules.tsv'
# User annotations file
annotations: 'source-data/annotations.tsv'
annotations: 'defaults/annotations.tsv'
# ID field used to merge annotations
annotations_id: 'genbank_accession'
# Field to use as the sequence ID in the FASTA file
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
This part of the workflow handles transforming the data into standardized
This part of the workflow handles curating the data into standardized
formats and expects input file

sequences_ndjson = "data/sequences.ndjson"
Expand All @@ -9,15 +9,15 @@ This will produce output files as
metadata = "results/metadata_all.tsv"
sequences = "results/sequences_all.fasta"

Parameters are expected to be defined in `config.transform`.
Parameters are expected to be defined in `config.curate`.
"""


rule fetch_general_geolocation_rules:
output:
general_geolocation_rules="data/general-geolocation-rules.tsv",
params:
geolocation_rules_url=config["transform"]["geolocation_rules_url"],
geolocation_rules_url=config["curate"]["geolocation_rules_url"],
shell:
"""
curl {params.geolocation_rules_url} > {output.general_geolocation_rules}
Expand All @@ -27,7 +27,7 @@ rule fetch_general_geolocation_rules:
rule concat_geolocation_rules:
input:
general_geolocation_rules="data/general-geolocation-rules.tsv",
local_geolocation_rules=config["transform"]["local_geolocation_rules"],
local_geolocation_rules=config["curate"]["local_geolocation_rules"],
output:
all_geolocation_rules="data/all-geolocation-rules.tsv",
shell:
Expand All @@ -36,32 +36,39 @@ rule concat_geolocation_rules:
"""


rule transform:
def format_field_map(field_map: dict[str, str]) -> str:
"""
Format dict to `"key1"="value1" "key2"="value2"...` for use in shell commands.
"""
return " ".join([f'"{key}"="{value}"' for key, value in field_map.items()])


rule curate:
input:
sequences_ndjson="data/sequences.ndjson",
all_geolocation_rules="data/all-geolocation-rules.tsv",
annotations=config["transform"]["annotations"],
annotations=config["curate"]["annotations"],
output:
metadata="data/metadata_all.tsv",
sequences="results/sequences_all.fasta",
log:
"logs/transform.txt",
"logs/curate.txt",
params:
field_map=config["transform"]["field_map"],
strain_regex=config["transform"]["strain_regex"],
strain_backup_fields=config["transform"]["strain_backup_fields"],
date_fields=config["transform"]["date_fields"],
expected_date_formats=config["transform"]["expected_date_formats"],
articles=config["transform"]["titlecase"]["articles"],
abbreviations=config["transform"]["titlecase"]["abbreviations"],
titlecase_fields=config["transform"]["titlecase"]["fields"],
authors_field=config["transform"]["authors_field"],
authors_default_value=config["transform"]["authors_default_value"],
abbr_authors_field=config["transform"]["abbr_authors_field"],
annotations_id=config["transform"]["annotations_id"],
metadata_columns=config["transform"]["metadata_columns"],
id_field=config["transform"]["id_field"],
sequence_field=config["transform"]["sequence_field"],
field_map=format_field_map(config["curate"]["field_map"]),
strain_regex=config["curate"]["strain_regex"],
strain_backup_fields=config["curate"]["strain_backup_fields"],
date_fields=config["curate"]["date_fields"],
expected_date_formats=config["curate"]["expected_date_formats"],
articles=config["curate"]["titlecase"]["articles"],
abbreviations=config["curate"]["titlecase"]["abbreviations"],
titlecase_fields=config["curate"]["titlecase"]["fields"],
authors_field=config["curate"]["authors_field"],
authors_default_value=config["curate"]["authors_default_value"],
abbr_authors_field=config["curate"]["abbr_authors_field"],
annotations_id=config["curate"]["annotations_id"],
metadata_columns=config["curate"]["metadata_columns"],
id_field=config["curate"]["id_field"],
sequence_field=config["curate"]["sequence_field"],
shell:
"""
(cat {input.sequences_ndjson} \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ rule concat_nextclade_subtype_results:
output:
nextclade_subtypes="results/nextclade_subtypes.tsv",
params:
id_field=config["transform"]["id_field"],
id_field=config["curate"]["id_field"],
nextclade_field=config["nextclade"]["nextclade_field"],
shell:
"""
Expand All @@ -75,7 +75,7 @@ rule append_nextclade_columns:
output:
metadata_all="results/metadata_all.tsv",
params:
id_field=config["transform"]["id_field"],
id_field=config["curate"]["id_field"],
nextclade_field=config["nextclade"]["nextclade_field"],
shell:
"""
Expand Down
Loading