Skip to content

Commit

Permalink
Merge pull request #13 from nextstrain/cp_ingest
Browse files Browse the repository at this point in the history
The primary scope includes:

* Copying the Monkeypox ingest directory.
* Using git subtree to copy and link the reusable scripts in an ingest/vendored subdirectory.
* Temporarily removing Nextclade-related rules, pending the compilation of a Nextclade dengue dataset and potential v3 changes.
* Pulling and processing one "sequences.fasta" and "metadata.tsv" file pair.

Future PRs will include:

* Splitting the fetch process into dengue serotypes (denv1-4).
* Adding dengue-specific annotations and data fixes.
* Integrating Nextclade-related rules and datasets.
  • Loading branch information
j23414 authored Dec 5, 2023
2 parents a20a8e8 + 2684e24 commit f513d31
Show file tree
Hide file tree
Showing 40 changed files with 2,059 additions and 0 deletions.
96 changes: 96 additions & 0 deletions ingest/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# nextstrain.org/dengue/ingest

This is the ingest pipeline for dengue virus sequences.

## Software requirements

Follow the [standard installation instructions](https://docs.nextstrain.org/en/latest/install.html) for Nextstrain's suite of software tools.

## Usage

> NOTE: All command examples assume you are within the `ingest` directory.
> If running commands from the outer `dengue` directory, please replace the `.` with `ingest`
Fetch sequences with

```sh
nextstrain build . data/sequences.ndjson
```

Run the complete ingest pipeline with

```sh
nextstrain build .
```

This will produce two files (within the `ingest` directory):

- `results/metadata.tsv`
- `results/sequences.fasta`

Run the complete ingest pipeline and upload results to AWS S3 with

```sh
nextstrain build . --configfiles config/config.yaml config/optional.yaml
```

### Adding new sequences not from GenBank

#### Static Files

Do the following to include sequences from static FASTA files.

1. Convert the FASTA files to NDJSON files with:

```sh
./ingest/bin/fasta-to-ndjson \
--fasta {path-to-fasta-file} \
--fields {fasta-header-field-names} \
--separator {field-separator-in-header} \
--exclude {fields-to-exclude-in-output} \
> ingest/data/{file-name}.ndjson
```

2. Add the following to the `.gitignore` to allow the file to be included in the repo:

```gitignore
!ingest/data/{file-name}.ndjson
```

3. Add the `file-name` (without the `.ndjson` extension) as a source to `ingest/config/config.yaml`. This will tell the ingest pipeline to concatenate the records to the GenBank sequences and run them through the same transform pipeline.

## Configuration

Configuration takes place in `config/config.yaml` by default.
Optional configs for uploading files and Slack notifications are in `config/optional.yaml`.

### Environment Variables

The complete ingest pipeline with AWS S3 uploads and Slack notifications uses the following environment variables:

#### Required

- `AWS_ACCESS_KEY_ID`
- `AWS_SECRET_ACCESS_KEY`
- `SLACK_TOKEN`
- `SLACK_CHANNELS`

#### Optional

These are optional environment variables used in our automated pipeline for providing detailed Slack notifications.

- `GITHUB_RUN_ID` - provided via [`github.run_id` in a GitHub Action workflow](https://docs.github.com/en/actions/learn-github-actions/contexts#github-context)
- `AWS_BATCH_JOB_ID` - provided via [AWS Batch Job environment variables](https://docs.aws.amazon.com/batch/latest/userguide/job_env_vars.html)

## Input data

### GenBank data

GenBank sequences and metadata are fetched via [NCBI datasets](https://www.ncbi.nlm.nih.gov/datasets/docs/v2/download-and-install/).

## `ingest/vendored`

This repository uses [`git subrepo`](https://github.com/ingydotnet/git-subrepo) to manage copies of ingest scripts in [ingest/vendored](./vendored), from [nextstrain/ingest](https://github.com/nextstrain/ingest).

See [vendored/README.md](vendored/README.md#vendoring) for instructions on how to update
the vendored scripts.
74 changes: 74 additions & 0 deletions ingest/Snakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from snakemake.utils import min_version

min_version(
"7.7.0"
) # Snakemake 7.7.0 introduced `retries` directive used in fetch-sequences

if not config:

configfile: "config/config.yaml"


send_slack_notifications = config.get("send_slack_notifications", False)


def _get_all_targets(wildcards):
# Default targets are the metadata TSV and sequences FASTA files
all_targets = ["results/sequences.fasta", "results/metadata.tsv"]

# Add additional targets based on upload config
upload_config = config.get("upload", {})

for target, params in upload_config.items():
files_to_upload = params.get("files_to_upload", {})

if not params.get("dst"):
print(
f"Skipping file upload for {target!r} because the destination was not defined."
)
else:
all_targets.extend(
expand(
[f"data/upload/{target}/{{remote_file_name}}.done"],
zip,
remote_file_name=files_to_upload.keys(),
)
)

# Add additional targets for Nextstrain's internal Slack notifications
if send_slack_notifications:
all_targets.extend(
[
"data/notify/genbank-record-change.done",
"data/notify/metadata-diff.done",
]
)

if config.get("trigger_rebuild", False):
all_targets.append("data/trigger/rebuild.done")

return all_targets


rule all:
input:
_get_all_targets,


include: "workflow/snakemake_rules/fetch_sequences.smk"
include: "workflow/snakemake_rules/transform.smk"


if config.get("upload", False):

include: "workflow/snakemake_rules/upload.smk"


if send_slack_notifications:

include: "workflow/snakemake_rules/slack_notifications.smk"


if config.get("trigger_rebuild", False):

include: "workflow/snakemake_rules/trigger_rebuild.smk"
86 changes: 86 additions & 0 deletions ingest/bin/fasta-to-ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#!/usr/bin/env python3
"""
Parse delimited fields from FASTA header into NDJSON format to stdout.
The output NDJSON records are guaranteed to have at least two fields:
1. strain
2. sequence
Uses the `augur.io.read_sequences` function to read the FASTA file,
so `augur` must be installed in the environment running the script.
"""

import argparse
import json
import sys

from augur.io import read_sequences


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("--fasta", required=True,
help="FASTA file to be transformed into NDJSON format")
parser.add_argument("--fields", nargs="+",
help="Fields in the FASTA header, listed in the same order as the header. " +
"These will be used as the keys in the final NDJSON output. " +
"One of the fields must be 'strain'. " +
"These cannot include the field 'sequence' as this field is reserved for the genomic sequence.")
parser.add_argument("--separator", default='|',
help="Field separator in the FASTA header")
parser.add_argument("--exclude", nargs="*",
help="List of fields to exclude from final NDJSON record. "
"These cannot include 'strain' or 'sequence'.")

args = parser.parse_args()

fasta_fields = [field.lower() for field in args.fields]

exclude_fields = []
if args.exclude:
exclude_fields = [field.lower() for field in args.exclude]

passed_checks = True

if 'strain' not in fasta_fields:
print("ERROR: FASTA fields must include a 'strain' field.", file=sys.stderr)
passed_checks = False

if 'sequence' in fasta_fields:
print("ERROR: FASTA fields cannot include a 'sequence' field.", file=sys.stderr)
passed_checks = False

if 'strain' in exclude_fields:
print("ERROR: The field 'strain' cannot be excluded from the output.", file=sys.stderr)
passed_checks = False

if 'sequence' in exclude_fields:
print("ERROR: The field 'sequence' cannot be excluded from the output.", file=sys.stderr)
passed_checks = False

missing_fields = [field for field in exclude_fields if field not in fasta_fields]
if missing_fields:
print(f"ERROR: The following exclude fields do not match any FASTA fields: {missing_fields}", file=sys.stderr)
passed_checks = False

if not passed_checks:
print("ERROR: Failed to parse FASTA file into NDJSON records.","See detailed errors above.", file=sys.stderr)
sys.exit(1)

sequences = read_sequences(args.fasta)

for sequence in sequences:
field_values = [
value.strip()
for value in sequence.description.split(args.separator)
]
record = dict(zip(fasta_fields, field_values))
record['sequence'] = str(sequence.seq).upper()

for field in exclude_fields:
del record[field]

json.dump(record, sys.stdout, allow_nan=False, indent=None, separators=',:')
print()
67 changes: 67 additions & 0 deletions ingest/bin/ndjson-to-tsv-and-fasta
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/usr/bin/env python3
"""
Parses NDJSON records from stdin to two different files: a metadata TSV and a
sequences FASTA.
Records that do not have an ID or sequence will be excluded from the output files.
"""
import argparse
import csv
import json
from sys import stderr, stdin


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("--metadata", metavar="TSV", default="data/metadata.tsv",
help="The output metadata TSV file")
parser.add_argument("--fasta", metavar="FASTA", default="data/sequences.fasta",
help="The output sequences FASTA file")
parser.add_argument("--metadata-columns", nargs="+",
help="List of fields from the NDJSON records to include as columns in the metadata TSV. " +
"Metadata TSV columns will be in the order of the columns provided.")
parser.add_argument("--id-field", default='strain',
help="Field from the records to use as the sequence ID in the FASTA file.")
parser.add_argument("--sequence-field", default='sequence',
help="Field from the record that holds the genomic sequence for the FASTA file.")

args = parser.parse_args()

with open(args.metadata, 'wt') as metadata_output:
with open(args.fasta, 'wt') as fasta_output:
metadata_csv = csv.DictWriter(
metadata_output,
args.metadata_columns,
restval="",
extrasaction='ignore',
delimiter='\t',
lineterminator='\n',
)
metadata_csv.writeheader()

for index, record in enumerate(stdin):
record = json.loads(record)

sequence_id = str(record.get(args.id_field, ''))
sequence = str(record.get(args.sequence_field, ''))

if not sequence_id:
print(
f"WARNING: Record number {index} does not have a sequence ID.",
"This record will be excluded from the output files.",
file=stderr
)
elif not sequence:
print(
f"WARNING: Record number {index} does not have a sequence.",
"This record will be excluded from the output files.",
file=stderr
)
else:
metadata_csv.writerow(record)

print(f">{sequence_id}", file=fasta_output)
print(f"{sequence}" , file= fasta_output)
Loading

0 comments on commit f513d31

Please sign in to comment.