From fec5ab6d86daf3faadf1a12b7502b1ba1c17dc19 Mon Sep 17 00:00:00 2001 From: james hadfield Date: Mon, 4 Nov 2024 17:17:43 +1300 Subject: [PATCH] Add INRB-specific build pipeline This includes one modification to the shared snakemake rules - if a config defines both 'private_sequences' and 'private_metadata' then private data will be merged in. The script which merges them should be replaced by `augur merge` in the future. --- .../build-configs/inrb/auspice_config.json | 87 ++++++ phylogenetic/build-configs/inrb/config.yaml | 15 + .../build-configs/inrb/curate_private_data.py | 265 ++++++++++++++++++ .../build-configs/inrb/description.md | 38 +++ phylogenetic/rules/prepare_sequences.smk | 48 +++- phylogenetic/scripts/combine_data_sources.py | 91 ++++++ 6 files changed, 541 insertions(+), 3 deletions(-) create mode 100644 phylogenetic/build-configs/inrb/auspice_config.json create mode 100644 phylogenetic/build-configs/inrb/config.yaml create mode 100644 phylogenetic/build-configs/inrb/curate_private_data.py create mode 100644 phylogenetic/build-configs/inrb/description.md create mode 100644 phylogenetic/scripts/combine_data_sources.py diff --git a/phylogenetic/build-configs/inrb/auspice_config.json b/phylogenetic/build-configs/inrb/auspice_config.json new file mode 100644 index 0000000..4c711e6 --- /dev/null +++ b/phylogenetic/build-configs/inrb/auspice_config.json @@ -0,0 +1,87 @@ +{ + "title": "Genomic epidemiology of mpox clade I viruses: INRB build", + "maintainers": [ + {"name": "INRB", "url": "https://inrb.net/"}, + {"name": "Nextstrain team", "url": "http://nextstrain.org"} + ], + "data_provenance": [ + { + "name": "INRB, DRC" + }, + { + "name": "GenBank", + "url": "https://www.ncbi.nlm.nih.gov/genbank/" + } + ], + "build_url": "https://github.com/nextstrain/mpox", + "colorings": [ + { + "key": "region", + "title": "Region", + "type": "categorical" + }, + { + "key": "country", + "title": "Country", + "type": "categorical" + }, + { + "key": "division", + "title": "Province", + "type": "categorical" + }, + { + "key": "location", + "title": "Health Zone", + "type": "categorical" + }, + { + "key": "source_private", + "title": "INRB (private data)", + "type": "boolean" + }, + { + "key": "host", + "title": "Host", + "type": "categorical" + }, + { + "key": "GA_CT_fraction", + "title": "G→A or C→T fraction", + "type": "continuous" + }, + { + "key": "dinuc_context_fraction", + "title": "NGA/TCN context of G→A/C→T mutations", + "type": "continuous" + }, + { + "key": "recency", + "title": "Submission Recency", + "type": "categorical" + }, + { + "key": "date_submitted", + "title": "Release Date", + "type": "categorical" + }, + { + "key": "date", + "title": "Collection date", + "type": "categorical" + } + ], + "geo_resolutions": [ + "region", + "country", + "division", + "location" + ], + "display_defaults": { + "geo_resolution": "location", + "color_by": "num_date", + "map_triplicate": false, + "distance_measure": "div", + "transmission_lines": true + } +} diff --git a/phylogenetic/build-configs/inrb/config.yaml b/phylogenetic/build-configs/inrb/config.yaml new file mode 100644 index 0000000..91513fe --- /dev/null +++ b/phylogenetic/build-configs/inrb/config.yaml @@ -0,0 +1,15 @@ +# Custom config file for INRB Clade-I builds. +# This should be used as an additional config on top of the clade-i config, i.e. +# --configfile defaults/clade-i/config.yaml build-configs/inrb/config.yaml + +# Custom INRB footer contents & auspice config +description: "build-configs/inrb/description.md" +auspice_config: "build-configs/inrb/auspice_config.json" + +# INRB builds inject private data, which is itself created by calling `curate_private_data.py` +private_sequences: "data/sequences-private.fasta" +private_metadata: "data/metadata-private.tsv" + +traits: + columns: region country division location + sampling_bias_correction: 3 diff --git a/phylogenetic/build-configs/inrb/curate_private_data.py b/phylogenetic/build-configs/inrb/curate_private_data.py new file mode 100644 index 0000000..d4febcf --- /dev/null +++ b/phylogenetic/build-configs/inrb/curate_private_data.py @@ -0,0 +1,265 @@ +from typing import Any +from Bio import SeqIO +import argparse +from openpyxl import load_workbook +from datetime import datetime +from os import path, mkdir +from sys import exit + +Sequences = dict[str, SeqIO.SeqRecord] +Metadata = dict[str, dict[str, str]] +MetadataHeader = list[str] + +# The following seem reasonable to hardcode, as they're central to the current mpox workflows +DATE_COLUMN = 'date' +ACCESSION_COLUMN = 'accession' +REQUIRED_COLUMNS = [ACCESSION_COLUMN, DATE_COLUMN, 'strain'] +LAT_LONG_FNAME = path.join(path.dirname(path.realpath(__file__)), "..", "..", "defaults", "lat_longs.tsv") +GEOGRAPHIC_RENAMINGS = { + "division": { + "Sud Ubangi": "Sud-Ubangi", + "Nord Ubangi": "Nord-Ubangi", + "Mai ndombe": "Maindombe", + "Mai-Ndombe": "Maindombe", + "South Kivu": "Sud Kivu", + "Sud-Kivu": "Sud Kivu", + "Nord-Kivu": "Nord Kivu", + "Mongala ": "Mongala", # note trailing space + }, + "location": { + "Miti-muresa": "Miti-Murhesa", + "Ototi": "Ototo", # Not sure which is correct, but Ototo is in lat-longs file from Eddy + # "Yelenge", - now included in lat-longs file + "Lilanga": "Lilanga-Bobangi", + "Lilanga Bobangi": "Lilanga-Bobangi", + "Nyiragongo": "Nyirangongo", + "Bunyakiru": "Bunyakiri", + "Nyatende": "Nyantende", + "Nyagezi": "Nyangezi", + "Kalamu I": "Kalamu-I", + "Kalamu II": "Kalamu-II", + "Masina I": "Masina-I", + "Masina II": "Masina-II", + "Omendjadji": "Omendjadi", + "Police": "Kokolo", # Eddy WhatsApp 2024-09-13. 24MPX2706V + "NYIRAGONGO": "Nyirangongo", + "KARISIMBI": "Karisimbi", + "GOMA": "Goma", + } +} + +# The following could be better provided via parsing the (S3) metadata here, but I want to keep this script isolated +# from the Snakemake workflow as much as possible +RECOMMENDED_COLUMNS = ['country', 'division', 'location'] + +def parse_args(): + parser = argparse.ArgumentParser( + description="Parse metadata (xlsx format) and sequences (FASTA) for integration into our canonical mpox pipelines", + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument('--sequences', type=str, required=True, nargs='+', metavar='FASTA', help="input sequences") + parser.add_argument('--fasta-header-idx', type=int, + help='If FASTA headers are "|" separated, this index (1-based) is the accession') + parser.add_argument('--xlsx', type=str, required=True, help="Metadata file (Excel .xlsx format)") + parser.add_argument('--remap-columns', type=str, nargs='+', default=[], metavar='old:new', required=False, + help="Change column names. Note all column names are converted to lower case.") + return parser.parse_args() + + +def convert(accession: str, k: str, v: Any) -> str: + if k==DATE_COLUMN: + # If we need to attempt to parse a string as a date see + # + assert type(v)==datetime, f"The provided {DATE_COLUMN!r} for {accession!r} must be encoded as a date within Excel" + return f"{v.year}-{v.month:02}-{v.day:02}" + return str(v) + + +def column_map(names: tuple[str], remap: list[tuple[str, str]]) -> list[tuple[str, str]]: + remap_idx_used = [] + columns = [] + for name in names: + # any matching renames / duplications? + changes = [(idx, name_map) for idx, name_map in enumerate(remap) if name_map[0]==name.lower()] + if len(changes): + for idx, name_map in changes: + remap_idx_used.append(idx) + columns.append((name, name_map[1])) + else: + columns.append((name, name.lower())) + + assert len(set([n[1] for n in columns]))==len(columns), "Requested column names aren't unique!" + + for i,name_map in enumerate(remap): + if i not in remap_idx_used: + print(f"WARNING: You asked to remap column {name_map[0]!r} but that column doesn't exist!") + return columns + +def parse_excel(fname: str, remap: list[tuple[str, str]]) -> tuple[Metadata, MetadataHeader]: + workbook = load_workbook(filename=fname) + worksheet = workbook.active + n_rows = 0 + + rows = worksheet.values # type: ignore + assert rows is not None, f"The metadata file {fname!r} seemed to be empty!" + + existing_column_names: tuple[str] = next(rows) # type: ignore + column_names = column_map(existing_column_names, remap) + + for name in REQUIRED_COLUMNS: + assert name in [c[1] for c in column_names], f"Metadata didn't have an {name!r} column (after column names were remapped)" + for name in RECOMMENDED_COLUMNS: + if name not in [c[1] for c in column_names]: + print(f"Warning: Metadata didn't have an {name!r} column (after column names were remapped) which is recommended ") + + accession_idx = [c[1] for c in column_names].index(ACCESSION_COLUMN) + + metadata: Metadata = {} + for row in rows: + n_rows+=1 + accession = str(row[accession_idx]) + metadata[accession] = {new_name:convert(accession, new_name, row[existing_column_names.index(old_name)]) for old_name,new_name in column_names} + + print(f"Parsed {n_rows} metadata rows (excluding header) from xlsx file") + return (metadata, [c[1] for c in column_names]) + + +def compare_ids(sequences: Sequences, metadata: Metadata) -> tuple[Sequences, Metadata]: + + acc_meta = set(list(metadata.keys())) + acc_seqs = set(list(sequences.keys())) + + meta_not_seqs = acc_meta - acc_seqs + seqs_not_meta = acc_seqs - acc_meta + + if meta_not_seqs: + print(f"WARNING! Metadata contained entries for {meta_not_seqs!r} but these are not present in the provided sequences and will be removed") + metadata = {k:v for k,v in metadata.items() if k not in meta_not_seqs} + + if seqs_not_meta: + print(f"WARNING! Sequences provided for {seqs_not_meta!r} but there is no corresponding metadata. These will be removed") + sequences = {k:v for k,v in sequences.items() if k not in seqs_not_meta} + + return (sequences, metadata) + +def read_lat_longs(): + """Adapted from augur.utils to avoid this script needing augur to be installed""" + coordinates = {} + # TODO: make parsing of tsv files more robust while allow for whitespace delimiting for backwards compatibility + def add_line_to_coordinates(line): + if line.startswith('#') or line.strip() == "": + return + fields = line.strip().split() if not '\t' in line else line.strip().split('\t') + if len(fields) == 4: + geo_field, loc = fields[0].lower(), fields[1].lower() + lat, long = float(fields[2]), float(fields[3]) + coordinates[(geo_field, loc)] = { + "latitude": lat, + "longitude": long + } + else: + print("WARNING: geo-coordinate file contains invalid line. Please make sure not to mix tabs and spaces as delimiters (use only tabs):",line) + with open(LAT_LONG_FNAME) as fh: + for line in fh: + add_line_to_coordinates(line) + return coordinates + + +def curate_metadata(metadata: Metadata) -> Metadata: + lat_longs = read_lat_longs() + logs = set() + errors = set() + + for resolution in ["division", "location"]: + for strain, data in metadata.items(): + original_deme = data[resolution] + if original_deme in GEOGRAPHIC_RENAMINGS[resolution]: + deme = GEOGRAPHIC_RENAMINGS[resolution][original_deme] + logs.add(f"\t'{strain}' {resolution} '{original_deme}' -> '{deme}'") + data[resolution] = deme + + if (resolution, data[resolution].lower()) not in lat_longs: + errors.add(f"\t{resolution} {data[resolution]}") + + if len(logs): + print("\nThe following metadata changes were made:") + for l in logs: + print(l) + if len(errors): + print("\nERROR! The following geographic locations did not have associated lat-longs. " + "We cannot proceed until this is fixed. There are two potential ways to solve this: " + "\n(1) Within the phylogenetic/defaults/clade-i/lat_longs.tsv file we can add coordinates" + "\n(2) We can change the value in the metadata to match a lat-long which already exists in that file." + "\nHere are the problematic values:" + ) + for l in errors: + print(l) + exit(2) + + return metadata + + +def parse_sequences(fnames: list[str], fasta_header_idx: int|None) -> Sequences: + sequences = {} + errors = False + seq_count = 0 + for fname in fnames: + for seq_record in SeqIO.parse(fname, "fasta"): + seq_count+=1 + name = seq_record.id + if fasta_header_idx is not None: + try: + name = name.split('|')[fasta_header_idx-1] # convert 1-based to 0-based + except IndexError: + print("Sequence name {name!r}, when split by '|', did not have enough fields") + seq_record.id = name + seq_record.description = seq_record.id + if name in sequences: + print(f"ERROR - the sequence {name!r} (from {fname!r}) has already been seen!") + errors = True + sequences[name] = seq_record + + assert errors is not True, "Please remove those duplicate sequences!" + print(f"Parsed {seq_count} sequences from FASTA file(s)") + return sequences + +def fname_in_data_dir(fname: str) -> str: + # This assumes the folder structure used in mpox doesn't change... + data_dir = path.normpath(path.join(path.dirname(path.realpath(__file__)), "..", "..", "data")) + if not path.isdir(data_dir): + mkdir(data_dir) + return path.join(data_dir, fname) + +def write_sequences(sequences: Sequences) -> None: + fname = fname_in_data_dir("sequences-private.fasta") + print(f"Writing sequences to {fname}") + SeqIO.write([x for x in sequences.values()], fname, "fasta") + +def write_metadata(metadata: Metadata, header: MetadataHeader) -> None: + fname = fname_in_data_dir("metadata-private.tsv") + print(f"Writing metadata to {fname}") + with open(fname, "w") as fh: + print("\t".join(header), file=fh) + for _, value in metadata.items(): + print("\t".join([value[field] for field in header]), file=fh) + +def parse_remap_columns(arg: list[str]) -> list[tuple[str, str]]: + try: + return [(x[0].lower(),x[1].lower()) for x in [a.split(':') for a in arg]] + except: + print("Error while parsing the remap-columns argument. Each entry must be two column names with a ':' between them.") + print("For instance: \"--remap-columns 'collection date:date' 'province:division'\"") + exit(2) + +if __name__=="__main__": + args = parse_args() + print() + metadata, header = parse_excel(args.xlsx, parse_remap_columns(args.remap_columns)) + sequences = parse_sequences(args.sequences, args.fasta_header_idx) + sequences, metadata = compare_ids(sequences, metadata) + + metadata = curate_metadata(metadata) + + print() + write_sequences(sequences) + write_metadata(metadata, header) diff --git a/phylogenetic/build-configs/inrb/description.md b/phylogenetic/build-configs/inrb/description.md new file mode 100644 index 0000000..9fddaba --- /dev/null +++ b/phylogenetic/build-configs/inrb/description.md @@ -0,0 +1,38 @@ + +### Laboratories involved in genomic surveillance of mpox in the DRC +Institut National de Recherche Biomédicale (INRB) through : +* Pathogen Genomics Laboratory, Epidemiology and Global Health Department, INRB, Kinshasa +* Rodolphe Merieux INRB Goma Laboratory, Goma +* Mpox National Reference Laboratory, Virology Department, INRB, Kinshasa + +### Genomic surveillance of mpox in the DRC +This work is made possible by the open sharing of genetic data among research groups and constitutes a part of the genomic surveillance of mpox in the DRC, led by the Ministry of Public Health, Hygiene and Prevention through INRB, DSE, PNLFHMPX and INSP. The provincial health divisions and health zones affected by the mpox outbreak are also involved in genomic surveillance efforts. + +Data will be shared via virological.org and [Nextstrain](https://nextstrain.org/) , and will be continuously updated as new mpox sequences are generated . If you intend to use these sequences before their publication, please contact Profs. Placide Mbala and Steve Ahuka for request and coordination. + +### Partner institutions +* These sequences were generated by the Pathogen Genomics Laboratory at INRB in the DRC in partnership with : +* Africa-CDC/ASLM +* AFROSCREEN consortium +* Institute of Tropical Medicine, Antwerp, Belgium +* ARCTIC Network +* Biosurv international, UK +* TransVIHMI :Université de Montpellier, Institut de Recherche pour le Développement (IRD), Institut National de la Santé et de la Recherche Médicale (INSERM) ; Montpellier ; France, +* UCLA +* USDA +* WHO +* U.S. Centers for Disease Control and Prevention (CDC) + +### Acknowledgements +This work was funded by : +* Africa Pathogen Genomics Initiative through CARES grants ; +* Agence Francaise de Dévelopement through the AFROSCREEN project (grant agreement CZZ3209, coordinated by ANRS-MIE Maladies infectieuses émergentes in partnership with Institut de Recherche pour le Développement (IRD) and Pasteur Institute) and PANAFPOX project funded by ANRS-MIE. +* Belgian Directorate-general Development Cooperation and Humanitarian Aid and the Research Foundation - Flanders (FWO, grant number G096222 N ); +* Department of Defense, Defense Threat Reduction Agency, Monkeypox Threat Reduction Network; +* USDA Non-Assistance Cooperative Agreement #20230048; +* International Mpox Research Consortium (IMReC) through funding from the Canadian Institutes of Health Research and International Development Research Centre (grant no. MRR-184813); +* US NIAID/NIH grant number U01AI151799 through the Center for Research in Emerging Infectious Disease-East and Central Africa (CREID-ECA) + + +#### List of abbreviations: +**DRC**: Democratic Republic of the Congo; **DSE**: Direction de la Surveillance Epidémiologique ; **INRB**: Institut National de Recherche Biomédicale ; **INSP**: Institut National de Santé Publique ; **PNLFHMPX**: Programme National de Lutte contre les Fièvres Hénorragiques et le Mpox ; **UCLA**: University of California Los -Angeles ; **WHO**: World Health Organization ; **USDA**: United States Department of Agriculture diff --git a/phylogenetic/rules/prepare_sequences.smk b/phylogenetic/rules/prepare_sequences.smk index 2fc7ce8..28f9d71 100644 --- a/phylogenetic/rules/prepare_sequences.smk +++ b/phylogenetic/rules/prepare_sequences.smk @@ -87,9 +87,43 @@ rule filter: """ -rule subsample: +# Basic config sanity checking in lieu of a proper schema +if any([k in config for k in ["private_sequences", "private_metadata"]]): + assert all( + [k in config for k in ["private_sequences", "private_metadata"]] + ), "Your config defined one of ['private_sequences', 'private_metadata'] but both must be supplied together" + + +# At this point we merge in private data (iff requested) +rule add_private_data: + """ + This rule is conditionally added to the DAG if a config defines 'private_sequences' and 'private_metadata' + """ input: + sequences=build_dir + "/{build_name}/good_sequences.fasta", metadata=build_dir + "/{build_name}/good_metadata.tsv", + private_sequences=config.get("private_sequences", ""), + private_metadata=config.get("private_metadata", ""), + output: + sequences=build_dir + "/{build_name}/good_sequences_combined.fasta", + metadata=build_dir + "/{build_name}/good_metadata_combined.tsv", + shell: + """ + python3 scripts/combine_data_sources.py \ + --metadata nextstrain={input.metadata} private={input.private_metadata} \ + --sequences {input.sequences} {input.private_sequences} \ + --output-metadata {output.metadata} \ + --output-sequences {output.sequences} + """ + + +rule subsample: + input: + metadata=( + build_dir + "/{build_name}/good_metadata_combined.tsv" + if config.get("private_metadata", False) + else build_dir + "/{build_name}/good_metadata.tsv" + ), output: strains=build_dir + "/{build_name}/{sample}_strains.txt", log=build_dir + "/{build_name}/{sample}_filter.log", @@ -131,8 +165,16 @@ rule combine_samples: f"{build_dir}/{w.build_name}/{sample}_strains.txt" for sample in config["subsample"] ], - sequences=build_dir + "/{build_name}/good_sequences.fasta", - metadata=build_dir + "/{build_name}/good_metadata.tsv", + sequences=( + build_dir + "/{build_name}/good_sequences_combined.fasta" + if config.get("private_sequences", False) + else build_dir + "/{build_name}/good_sequences.fasta" + ), + metadata=( + build_dir + "/{build_name}/good_metadata_combined.tsv" + if config.get("private_metadata", False) + else build_dir + "/{build_name}/good_metadata.tsv" + ), include=config["include"], output: sequences=build_dir + "/{build_name}/filtered.fasta", diff --git a/phylogenetic/scripts/combine_data_sources.py b/phylogenetic/scripts/combine_data_sources.py new file mode 100644 index 0000000..4ec9d49 --- /dev/null +++ b/phylogenetic/scripts/combine_data_sources.py @@ -0,0 +1,91 @@ +# TODO: with the release of `augur merge` this script should be replaced +# (it was written prior to `augur merge` existing, however it should be a drop-in replacement) + +from typing import Any +from Bio import SeqIO +import argparse +import csv + +Sequences = dict[str, SeqIO.SeqRecord] +Metadata = list[dict[str, Any]] +MetadataHeader = list[str] + +ACCESSION = 'accession' + +def parse_args(): + parser = argparse.ArgumentParser( + description="Merge sequences and metadata. Duplicate sequences: last one used. Duplicate metadata: values are backfilled, in the case of conflicts the last seen is used.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument('--metadata', type=str, required=True, nargs='+', metavar='TSV', + help="Input metadata files. If entries are specified as name=FILE then one-hot columns named 'source_{name}' will be added.") + parser.add_argument('--sequences', type=str, required=True, nargs='+', metavar='FASTA', help="Input fasta sequences") + parser.add_argument('--metadata-id-column', type=str, default=ACCESSION, help="Metadata column to match with sequence name") + parser.add_argument('--output-metadata', type=str, required=True, help="output metadata") + parser.add_argument('--output-sequences', type=str, required=True, help="output sequences") + return parser.parse_args() + +def parse_tsv(fname: str) -> Metadata: + source_name = None + assert list(fname).count('=')<=1, f"Too many '=' characters in argument {fname!r}" + if '=' in fname: + source_name, fname = fname.split('=') + with open(fname, "r") as fh: + reader = csv.DictReader(fh, delimiter="\t") + metadata = [row for row in reader] + if source_name: + for row in metadata: + row[f"source_{source_name}"] = 'true' + return metadata + +def parse_sequences(fnames: list[str]) -> tuple[Sequences, set[str]]: + sequences = {} + for fname in fnames: + for seq_record in SeqIO.parse(fname, "fasta"): + name = seq_record.id + seq_record.id = name + seq_record.description = seq_record.id + if name in sequences: + print(f"WARNING: the sequence {name!r} (from {fname!r}) has already been seen! Overwriting...") + sequences[name] = seq_record + return sequences, set(list(sequences.keys())) + +def merge_meta(data: list[Metadata], id_col:str) -> tuple[Metadata, MetadataHeader]: + header: MetadataHeader = list(data[0][0].keys()) # first metadata file... + for metadata in data[1:]: + for col_name in list(metadata[0].keys()): + if col_name not in header: + header.append(col_name) + + row_by_id: dict[str, dict[str, Any]] = {} + for metadata in data: + for row in metadata: + assert id_col in row, f"ERROR: metadata file missing {id_col!r}" + if row[id_col] in row_by_id: + print(f"Multiple entries for {row[id_col]} - merging!") + master_row = row_by_id[row[id_col]] + for key,value in row.items(): + master_row[key] = value + else: + row_by_id[row[id_col]] = row + + return list(row_by_id.values()), header + +def write_sequences(fname: str, sequences: Sequences) -> None: + print(f"Writing sequences to {fname}") + SeqIO.write([x for x in sequences.values()], fname, "fasta") + +def write_metadata(fname: str, metadata: Metadata, header: MetadataHeader) -> None: + print(f"Writing metadata to {fname}") + with open(fname, "w") as fh: + print("\t".join(header), file=fh) + for row in metadata: + print("\t".join([row.get(field, '') for field in header]), file=fh) + +if __name__=="__main__": + args = parse_args() + metadatas = [parse_tsv(f) for f in args.metadata] + sequences, sequence_names = parse_sequences(args.sequences) + metadata, header = merge_meta(metadatas, args.metadata_id_column) + write_sequences(args.output_sequences, sequences) + write_metadata(args.output_metadata, metadata, header)