diff --git a/source-data/vidrl_flat_file_column_map.tsv b/source-data/vidrl_flat_file_column_map.tsv deleted file mode 100644 index 384ad4c..0000000 --- a/source-data/vidrl_flat_file_column_map.tsv +++ /dev/null @@ -1,6 +0,0 @@ -virus virus_strain -virus.passage virus_passage -antisera.passage serum_passage -ferret serum_id -value titer -antisera.name serum_strain diff --git a/tdb/elife_upload.py b/tdb/elife_upload.py index 9c98d89..77a76cb 100644 --- a/tdb/elife_upload.py +++ b/tdb/elife_upload.py @@ -49,18 +49,20 @@ def format_measurements(self, measurements, **kwargs): self.format_subtype(meas) self.format_assay_type(meas) self.format_date(meas) - tmp = kwargs['fstem'].split('-')[0] - if len(tmp) > 8: - tmp = tmp[:(8-len(tmp))] - elif len(tmp) < 8: - meas['assay_date'] = "XXXX-XX-XX" - else: - if tmp[0:2] == '20': - meas['assay_date'] = "{}-{}-{}".format(tmp[0:4],tmp[4:6],tmp[6:8]) + # Only overwrite the "assay_date" if it's not already available + if meas.get("assay_date", None) is None: + tmp = kwargs['fstem'].split('-')[0] + if len(tmp) > 8: + tmp = tmp[:(8-len(tmp))] + elif len(tmp) < 8: + meas['assay_date'] = "XXXX-XX-XX" else: + if tmp[0:2] == '20': + meas['assay_date'] = "{}-{}-{}".format(tmp[0:4],tmp[4:6],tmp[6:8]) + else: + meas['assay_date'] = "XXXX-XX-XX" + if 'assay_date' not in meas.keys() or meas['assay_date'] is None: meas['assay_date'] = "XXXX-XX-XX" - if 'assay_date' not in meas.keys() or meas['assay_date'] is None: - meas['assay_date'] = "XXXX-XX-XX" self.format_passage(meas, 'serum_passage', 'serum_passage_category') self.format_passage(meas, 'virus_passage', 'virus_passage_category') self.format_ref(meas) diff --git a/tdb/vidrl_upload.py b/tdb/vidrl_upload.py index aaa92b3..29d7d55 100644 --- a/tdb/vidrl_upload.py +++ b/tdb/vidrl_upload.py @@ -1,4 +1,4 @@ -import os, re, time, datetime, csv, sys, json, errno +import os, re, time, datetime, csv, sys, json, errno, filecmp import pandas as pd from upload import upload from rethinkdb import r @@ -8,6 +8,7 @@ from parse import parse from upload import parser import xlrd +from typing import Iterator, Optional, Tuple sys.path.append('') # need to import from base from base.rethink_io import rethink_io from vdb.flu_upload import flu_upload @@ -17,8 +18,21 @@ parser.add_argument('--human-ref-only', action="store_true", help="Only ingest human sera references, used for backfilling data that was skipped in previous ingests.") -ELIFE_COLUMNS = ["virus_strain", "serum_strain","serum_id", "titer", "source", "virus_passage", "virus_passage_category", "serum_passage", "serum_passage_category", "assay_type"] +ELIFE_COLUMNS = [ + "virus_strain", + "serum_strain", + "serum_id", + "titer", + "source", + "virus_passage", + "virus_passage_category", + "serum_passage", + "serum_passage_category", + "assay_type", + "date", # Using "date" instead of "assay_date" because elife_upload/upload/format_date overwrites "assay_date" with the parsed "date" +] EXPECTED_SUBTYPES = {"h1n1pdm", "h3n2", "vic", "yam"} +HUMAN_SERA_YEAR_REGEX = r"SH(vax|VAX|\s)?(\d{4})" # Vaccine mapping used for mapping human pooled sera to a specific reference virus # This is based on the proxy reference viruses used for human pooled sera @@ -54,14 +68,25 @@ } } -def parse_tsv_mapping_to_dict(tsv_file): - map_dict = {} - with open(tsv_file, 'r') as f: - for line in f: - (key, value) = line.split('\t') - key = key.lower() - map_dict[key] = value.rstrip('\n') - return map_dict +def parse_human_serum_id(original_id: str, year_regex: str) -> Tuple[Optional[str],Optional[str]]: + """ + Attempts to parse the year (YYYY) from the provided *original_id* to + construct a new standard serum id with the provided *year_regex*. + + Returns None if the year could not be parsed from the *original_id*. + If year is successfully parsed, then returns a tuple of (year, new_serum_id). + """ + year = new_serum_id = None + matches = re.match(year_regex, original_id) + if matches is None: + return (None, None) + + year = matches.group(2) + # Follow a standard pattern where serum_id is `Human pool ` + # Need "human" in serum_id because this is how we match for human sera in seasonal flu + # + new_serum_id = f"Human pool {year}" + return (year, new_serum_id) def parse_human_serum_references(human_serum_data, subtype): @@ -72,7 +97,6 @@ def parse_human_serum_references(human_serum_data, subtype): data with serum id, serum passage, and serum strain. """ human_serum_references = {} - year_regex = r"SH(vax|VAX|\s)?(\d{4})" egg_or_cell_regex = r"^(egg|cell)$" # Used with re.IGNORECASE potential_year_fields = ['serum_id', 'serum_passage', 'serum_abbrev'] @@ -83,14 +107,9 @@ def parse_human_serum_references(human_serum_data, subtype): # First try to parse the year from the human serum data year = new_serum_id = None for field in potential_year_fields: - matches = re.match(year_regex, human_serum[field]) + year, new_serum_id = parse_human_serum_id(human_serum[field], HUMAN_SERA_YEAR_REGEX) # Use the first match of the potential fields - if matches is not None: - year = matches.group(2) - # Follow a standard pattern where serum_id is `Human pool ` - # Need "human" in serum_id because this is how we match for human sera in seasonal flu - # - new_serum_id = f"Human pool {year}" + if new_serum_id is not None: break # year is required to know which vaccine reference strain to use @@ -98,7 +117,7 @@ def parse_human_serum_references(human_serum_data, subtype): if year is None: raise Exception(f"Unable to process human sera column {column} ", f"because none of {potential_year_fields} fields ", - f"matched the year regex {year_regex!r}") + f"matched the year regex {HUMAN_SERA_YEAR_REGEX!r}") # Then try to parse egg or cell from the human serum data egg_or_cell = None @@ -161,6 +180,7 @@ def convert_vidrl_xls_to_tsv(path, fstem, ind, assay_type, subtype, human_ref_on serum_abbrev_pattern = r"\w+\s{0,1}\w+/\d+.*" human_serum_pattern = r"(^SH\d+|SHVAX|SHvax|sera|vaxpool).*" crick = False + assay_date = "unknown" for worksheet_index, worksheet in enumerate(workbook.sheets(), start=1): print(f"Reading worksheet {worksheet_index} '{worksheet.name}' in file '{fstem}'") @@ -311,27 +331,276 @@ def convert_vidrl_xls_to_tsv(path, fstem, ind, assay_type, subtype, human_ref_on serum_strain = serum_mapping.get(serum_abbr, serum_abbr) titer = str(mat.cell_value(i,j)).strip() - line = "%s\n" % ("\t".join([ virus_strain, serum_strain, serum_id, titer, source, virus_passage, virus_passage_category, serum_passage, serum_passage_category, assay_type])) + line = "%s\n" % ("\t".join([ virus_strain, serum_strain, serum_id, titer, source, virus_passage, virus_passage_category, serum_passage, serum_passage_category, assay_type, assay_date])) outfile.write(line) +def read_csv_to_dict(csv_file: str) -> Iterator[dict]: + """ + Read provided *csv_file* and yields each row as a dict. + """ + with open(csv_file, newline="") as fh: + reader = csv.DictReader(fh, delimiter=",") + yield from reader + + +def curate_flat_records(records: Iterator[dict], fstem: str, assay_type: str) -> Iterator[dict]: + """ + Curate the measurement records expected from the _flat_file.csv files. + """ + # The new column names need to be one of the ELIFE_COLUMNS in order to be + # included in the temporary output file that's then passed to elife_upload.py + column_map = { + # Using original strain name _without_ VIDRL standardization so that + # the strain name can be standardized to match our sequence strain names + "original designation": "virus_strain", + "test virus passage": "virus_passage", + "reference antigen": "serum_strain", + "reference passage": "human_serum_passage", + "antisera passage": "serum_passage", + "ferret": "serum_id", + "titre": "titer", + "antisera": "serum_abbr", + "test date": "date", + } + + for record in records: + new_record = rename_record_fields(record, column_map) + new_record = add_hardcoded_fields(new_record, assay_type, fstem) + new_record = standardize_human_serum(new_record) + yield new_record + + +def curate_reference_panel_records( + records: Iterator[dict], + serum_abbr_map: dict, + test_date: str, + fstem: str, + assay_type: str) -> Iterator[dict]: + """ + Curate the measurement records expected from the _reference_panel.csv files. + """ + # The new column names need to be one of the ELIFE_COLUMNS in order to be + # included in the temporary output file that's then passed to elife_upload.py + column_map = { + "reference antigen": "virus_strain", + "reference passage": "virus_passage", + # _reference_panel.csv does not include the full serum_strain + # serum_strain will need to be mapped from the serum_abbr + "antisera": "serum_abbr", + "antisera passage": "serum_passage", + "ferret": "serum_id", + "titre": "titer", + # Used for cleaning up `virus_strain` that includes "pool" suffix + "homologous": "homologous" + } + + for record in records: + new_record = rename_record_fields(record, column_map) + new_record = add_hardcoded_fields(new_record, assay_type, fstem) + # test_date column is not included in the _reference_panel.csv, so use provided test_date. + new_record["date"] = test_date + + # Map serum_abbr to serum_strain and human_serum_passage + serum_abbr = new_record["serum_abbr"] + serum_map = serum_abbr_map.get(serum_abbr, {}) + serum_strain = serum_map.get("serum_strain") + if serum_strain is None: + print(f"WARNING: No serum strain available for {serum_abbr!r}, skipping record", file=sys.stderr) + continue + else: + new_record["serum_strain"] = serum_strain + new_record["human_serum_passage"] = serum_map.get("human_serum_passage") + + new_record = standardize_human_serum(new_record) + + # Clean up `virus_strain` that includes "pool" suffix + # Strip "pool" suffix and keep as proxy of homologous titer + # for the human serum pool reference. Skip measurements that are not + # marked as homologous since they are just duplicates of the proxy measurements + # -Jover, 04 November 2024 + if re.match(r".*pool$", new_record["virus_strain"]): + if new_record["homologous"] == "TRUE": + new_record["virus_strain"] = re.sub(r"pool$", "", new_record["virus_strain"]) + else: + continue + + yield new_record + + +def rename_record_fields(record: dict, field_map: dict) -> dict: + return {new_field: record[old_field] for old_field, new_field in field_map.items()} + + +def add_hardcoded_fields(record: dict, assay_type: str, fstem: str) -> dict: + new_record = record.copy() + new_record["assay_type"] = assay_type + new_record["virus_passage_category"] = "" + new_record["serum_passage_category"] = "" + new_record["source"] = "vidrl_{}.csv".format(fstem) + return new_record + + +def standardize_human_serum(record: dict) -> dict: + if record["serum_id"] != "NA": + return record + + new_record = record.copy() + # We are purposely _not_ verifying serum_strain/serum_passage against the VACCINE_MAPPING. + # VIDRL uses reference antigens that are proxies for the human serum + # vaccine strains so these do not always align with the exact egg/cell vaccine strains. + # -Jover, 04 November 2024 + new_record["serum_passage"] = new_record["human_serum_passage"] + human_serum_id = new_record["serum_abbr"] + new_record["serum_strain"] = re.sub(r"pool$", "", new_record["serum_strain"]) + _, new_record["serum_id"] = parse_human_serum_id(human_serum_id, HUMAN_SERA_YEAR_REGEX) + return new_record + + +def write_records_to_tsv(records: Iterator[dict], output_file: str, write_mode: str = "w"): + with open(output_file, write_mode, newline="") as fh: + tsv_writer = csv.DictWriter( + fh, + ELIFE_COLUMNS, + extrasaction="ignore", + delimiter="\t", + lineterminator="\n", + ) + if write_mode == "w": + tsv_writer.writeheader() + for record in records: + tsv_writer.writerow(record) + + +class IteratorReturnValue: + """ + An iterator that wraps another one and stores the return value. + Modified from https://discuss.python.org/t/getting-generator-return-values-with-natural-for-loop-syntax/59556/2 + """ + def __init__(self, iterable): + self.iterable = iterable + def __iter__(self): + self.return_value = yield from self.iterable + + +def validate_records(records: Iterator[dict]) -> Tuple[Iterator[dict], str]: + """ + Loop through *records* to validate the values for + - `serum_abbr` maps to a single `serum_strain` and `human_serum_passage` + - single `date` (test_date) + + Raises AssertionError if validation for a record fails or yields the + validated record. + + Once the records have all been validated, returns the serum abbr map and + the test date. The final return value is expected to be captured by + `IteratorReturnValue`. + """ + serum_abbr_map = {} + test_date = None + for record in records: + serum_abbr = record["serum_abbr"] + serum_strain = record["serum_strain"] + human_serum_passage = record["human_serum_passage"] + if serum_abbr in serum_abbr_map: + previous_serum_strain = serum_abbr_map[serum_abbr]["serum_strain"] + previous_serum_passage = serum_abbr_map[serum_abbr]["human_serum_passage"] + assert serum_strain == previous_serum_strain, \ + f"Serum abbreviation {serum_abbr} mapped to multiple serum strain names: {(serum_strain, previous_serum_strain)}" + + assert human_serum_passage == previous_serum_passage, \ + f"Serum abbreviation {serum_abbr} mapped to multiple human serum passage: {(serum_passage, previous_serum_passage)}" + else: + serum_abbr_map[serum_abbr] = { + "serum_strain": serum_strain, + "human_serum_passage": human_serum_passage, + } + + record_date = record["date"] + if test_date is None: + test_date = record_date + else: + assert test_date == record_date, \ + f"Record date {record_date!r} is different from previous record dates {test_date!r}" + + yield record + + return (serum_abbr_map, test_date) + + +def get_ref_panel_filepath(fstem, path) -> Optional[str]: + """ + Returns valid _reference_panel filepath if it should be ingested. + 1. Checks the expected _reference_panel.csv file exists + 2. Checks if the _reference_panel file is a duplicate of another file + + Note: This does depend on the user having all of the flat files locally + and expects the user to always ingest the first _reference_panel file. + """ + reference_filepath = path + fstem + ".csv" + if not os.path.isfile(reference_filepath): + print(f"WARNING: Coupled reference panel file {reference_filepath!r} does not exist.", file=sys.stderr) + return None + + # Check if the file is a potential duplicate where one Excel file got split into multiple flat files. + # Look for `b` or `_2` files that are potentially duplicates of `a` or `_1` files + # We are ignoring the capital A/B patterns because these indicate separate assays. + char_pattern = r"b" + num_pattern = r"_2" + dup_ref_pattern = rf"(\d*(?:_?[A-Z])?)({char_pattern}|{num_pattern})(\.xlsx.*)" + dup_match = re.match(dup_ref_pattern, fstem) + if dup_match: + # Construct the filepath for the first potential file that was ingested, e.g. + # 0612b.xlsx_H3_reference_panel.csv -> 0612a.xlsx_H3_reference_panel.csv + # 0710_B_2.xlsx_H3_reference_panel.csv -> 0710_B_1.xlsx_H3_reference_panel.csv + # 0717B_2.xlsx_H3_reference_panel.csv -> 0717B_1.xlsx_H3_reference_panel.csv + if re.match(char_pattern, dup_match.group(2)): + first_pattern = "a" + elif re.match(num_pattern, dup_match.group(2)): + first_pattern = "_1" + else: + # This should only occur if the `dup_ref_pattern` is out of sync with the `char_pattern` and `num_pattern` + raise Error(f"Unable to match reference {dup_match.group(2)!r} to {ab_pattern} or {num_pattern}") + + first_fstem = f"{dup_match.group(1)}{first_pattern}{dup_match.group(3)}" + first_filepath = path + first_fstem + ".csv" + + # If the first potential file exists and has the same content as the + # current file, then ignore the current file. + if os.path.isfile(first_filepath) and filecmp.cmp(first_filepath, reference_filepath, shallow=False): + print(f"WARNING: Ignoring reference panel file {fstem!r} because it is duplicate of {first_fstem!r}", file=sys.stderr) + return None + + return reference_filepath + + def read_flat_vidrl(path, fstem, assay_type): """ Read the flat CSV file with *fstem* in the provided *path* and convert to the expected TSV file at `data/tmp/.tsv` for tdb/elife_upload. """ - column_map = parse_tsv_mapping_to_dict("source-data/vidrl_flat_file_column_map.tsv") - filepath = path + fstem + ".csv" - - titer_measurements = pd.read_csv(filepath, usecols=column_map.keys()) \ - .rename(columns=column_map) - titer_measurements["assay_type"] = assay_type - titer_measurements["virus_passage_category"] = "" - titer_measurements["serum_passage_category"] = "" - titer_measurements["source"] = "vidrl_{}.csv".format(fstem) - - titer_measurements[ELIFE_COLUMNS].to_csv("data/tmp/{}.tsv".format(fstem), sep="\t", index=False) + filepath = path + fstem + ".csv" + output_filepath ="data/tmp/{}.tsv".format(fstem) + + records = read_csv_to_dict(filepath) + curated_records = curate_flat_records(records, fstem, assay_type) + validated_records = IteratorReturnValue(validate_records(curated_records)) + write_records_to_tsv(validated_records, output_filepath) + + reference_fstem = fstem.replace("_flat_file", "_reference_panel") + reference_filepath = get_ref_panel_filepath(reference_fstem, path) + serum_abbr_map, test_date = validated_records.return_value + if reference_filepath: + reference_records = read_csv_to_dict(reference_filepath) + curated_reference_records = curate_reference_panel_records( + reference_records, + serum_abbr_map, + test_date, + reference_fstem, + assay_type) + # Append to the same temp file as the flat_file.csv records + write_records_to_tsv(curated_reference_records, output_filepath, "a") if __name__=="__main__":