From 85e6eba872acec43b769df9f754b738e21e61b45 Mon Sep 17 00:00:00 2001 From: Thomas Sibley Date: Tue, 16 Jul 2024 12:24:46 -0700 Subject: [PATCH] augur merge Support generalized merging of two or more metadata tables. A long desired command. Behaviour is based on much discussion with the team and bespoke implementations like ncov's combine_metadata.py. Implementation requirements include handling inputs of arbitrary size (i.e. without needing to read any dataset fully into memory) and handling more than two inputs. SQLite is used in the implementation but could be replaced by another implementation in the future. One thing that's notable with this implementation is that it's stupidly slow for tiny datasets, e.g. a couple seconds. That's due to Augur's own slow startup time and having to wait for that 2+n times, where n is the number of metadata tables being joined, plus once for the initial startup of `augur merge` and once more for writing the output. On large datasets, this fixed startup time shouldn't matter, but on small datasets it feels really dumb. Cutting out the additional startup times by cutting out the use of `augur read-file` and `augur write-file` makes it quite quick, as it should be. I think we can live with this slowness for now, but if it turns out we can't, we can improve startup times or take a different approach to handling inputs. --- CHANGES.md | 2 + augur/__init__.py | 1 + augur/merge.py | 304 ++++++++++++++++++++++++++++ docs/api/developer/augur.merge.rst | 7 + docs/api/developer/augur.rst | 1 + docs/usage/cli/cli.rst | 1 + docs/usage/cli/merge.rst | 9 + tests/functional/merge/cram/merge.t | 229 +++++++++++++++++++++ 8 files changed, 554 insertions(+) create mode 100644 augur/merge.py create mode 100644 docs/api/developer/augur.merge.rst create mode 100644 docs/usage/cli/merge.rst create mode 100644 tests/functional/merge/cram/merge.t diff --git a/CHANGES.md b/CHANGES.md index c16ff138d..c4a23b2d5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,7 @@ ### Features +* A new command, `augur merge`, now allows for generalized merging of two or more metadata tables. [#1563][] (@tsibley) * Two new commands, `augur read-file` and `augur write-file`, now allow external programs to do i/o like Augur by piping from/to these new commands. They provide handling of compression formats and newlines consistent with the rest of Augur. [#1562][] (@tsibley) ### Bug Fixes @@ -12,6 +13,7 @@ [#1561]: https://github.com/nextstrain/augur/pull/1561 [#1562]: https://github.com/nextstrain/augur/pull/1562 +[#1563]: https://github.com/nextstrain/augur/pull/1563 diff --git a/augur/__init__.py b/augur/__init__.py index 7b4f2066b..a5c9535c2 100644 --- a/augur/__init__.py +++ b/augur/__init__.py @@ -21,6 +21,7 @@ command_strings = [ "parse", "curate", + "merge", "index", "filter", "mask", diff --git a/augur/merge.py b/augur/merge.py new file mode 100644 index 000000000..2162dd91b --- /dev/null +++ b/augur/merge.py @@ -0,0 +1,304 @@ +""" +Merge two or more metadata tables into one. + +Tables must be given unique names to identify them in the output and are +merged in the order given. + +Rows are joined by id (e.g. "strain" or "name" or other +--metadata-id-columns), and ids must be unique within an input table (i.e. +tables cannot contain duplicate ids). All rows are output, even if they +appear in only a single table (i.e. a full outer join in SQL terms). + +Columns are combined by name, either extending the combined table with a new +column or overwriting values in an existing column. For columns appearing in +more than one table, non-empty values on the right hand side overwrite values +on the left hand side. The first table's id column name is used as the output +id column name. + +One generated column per input table is appended to the end of the output +table to identify the source of each row's data. Column names are generated +as "__source_metadata_{NAME}" where "{NAME}" is the table name given to +--metadata. Values in each column are 1 or 0 for present or absent in that +input table. + +Metadata tables of arbitrary size can be handled, limited only by available +disk space. Tables are not required to be entirely loadable into memory. The +transient disk space required is approximately the sum of the uncompressed size +of the inputs. + +SQLite is used behind the scenes to implement the merge, but, at least for now, +this should be considered an implementation detail that may change in the +future. The SQLite 3 CLI, sqlite3, must be available. If it's not on PATH (or +you want to use a version different from what's on PATH), set the SQLITE3 +environment variable to path of the desired sqlite3 executable. +""" +import os +import subprocess +import sys +from functools import reduce +from itertools import starmap +from shlex import quote as shquote +from shutil import which +from tempfile import mkstemp +from textwrap import dedent +from typing import Iterable, Tuple, TypeVar + +from augur.argparse_ import ExtendOverwriteDefault +from augur.errors import AugurError +from augur.io.metadata import DEFAULT_DELIMITERS, DEFAULT_ID_COLUMNS, Metadata +from augur.io.print import print_err +from augur.utils import first_line + + +T = TypeVar('T') + + +class NamedMetadata(Metadata): + name: str + """User-provided descriptive name for this metadata file.""" + + table_name: str + """Generated SQLite table name for this metadata file, based on *name*.""" + + def __init__(self, name: str, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = name + self.table_name = f"metadata_{self.name}" + + def __repr__(self): + return f"" + + +def register_parser(parent_subparsers): + parser = parent_subparsers.add_parser("merge", help=first_line(__doc__)) + + input_group = parser.add_argument_group("inputs", "options related to input") + input_group.add_argument("--metadata", nargs="+", action="extend", required=True, metavar="NAME=FILE", help="metadata files with assigned names") + + input_group.add_argument("--metadata-id-columns", default=DEFAULT_ID_COLUMNS, nargs="+", action=ExtendOverwriteDefault, metavar="COLUMN", help="names of possible metadata columns containing identifier information, ordered by priority. Only one ID column will be inferred.") + input_group.add_argument("--metadata-delimiters", default=DEFAULT_DELIMITERS, nargs="+", action=ExtendOverwriteDefault, metavar="CHARACTER", help="delimiters to accept when reading a metadata file. Only one delimiter will be inferred.") + + output_group = parser.add_argument_group("outputs", "options related to output") + output_group.add_argument('--output-metadata', required=True, metavar="FILE", help="merged metadata as TSV") + output_group.add_argument('--quiet', action="store_true", default=False, help="suppress informational messages on stderr") + + return parser + + +def run(args): + print_info = print_err if not args.quiet else lambda *_: None + + # Parse --metadata arguments + if not len(args.metadata) >= 2: + raise AugurError(f"At least two metadata inputs are required for merging.") + + if unnamed := [repr(x) for x in args.metadata if "=" not in x or x.startswith("=")]: + raise AugurError(dedent(f"""\ + All metadata inputs must be assigned a name, e.g. with NAME=FILE. + + The following inputs were missing a name: + + {indented_list(unnamed, ' ' + ' ')} + """)) + + metadata = [name_path.split("=", 1) for name_path in args.metadata] + + if duplicate_names := [repr(name) for name, count + in count_unique(name for name, _ in metadata) + if count > 1]: + raise AugurError(dedent(f"""\ + Metadata input names must be unique. + + The following names were used more than once: + + {indented_list(duplicate_names, ' ' + ' ')} + """)) + + + # Infer delimiters and id columns + metadata = [ + NamedMetadata(name, path, args.metadata_delimiters, args.metadata_id_columns) + for name, path in metadata] + + + # Locate how to re-invoke ourselves (_this_ specific Augur). + if sys.executable: + augur = f"{shquote(sys.executable)} -m augur" + else: + # A bit unusual we don't know our own Python executable, but assume we + # can access ourselves as the ``augur`` command. + augur = f"augur" + + + # Work with a temporary, on-disk SQLite database under a name we control so + # we can access it from multiple (serial) processes. + db_fd, db_path = mkstemp(prefix="augur-merge-", suffix=".sqlite") + os.close(db_fd) + + # Clean up database file by default + delete_db = True + + # Track columns as we see them, in order. The first metadata's id column + # is always the first output column of the merge, so insert it now. + output_id_column = metadata[0].id_column + output_columns = { output_id_column: [] } + + try: + # Read all metadata files into a SQLite db + for m in metadata: + # All other metadata reading in Augur (i.e. via the csv module) + # uses Python's "universal newlines"¹ definition and accepts \n, + # \r\n, and \r as newlines interchangably (even mixed within the + # same file!). We accomplish the same behaviour here with SQLite's + # less flexible newline handling by relying on the universal + # newline translation of `augur read-file`. + # -trs, 24 July 2024 + # + # ¹ + newline = os.linesep + + print_info(f"Reading {m.name!r} metadata from {m.path!r}…") + sqlite3(db_path, + f'.mode csv', + f'.separator {sqlite_quote_dot(m.delimiter)} {sqlite_quote_dot(newline)}', + f'.import {sqlite_quote_dot(f"|{augur} read-file {shquote(m.path)}")} {sqlite_quote_dot(m.table_name)}', + + f'create unique index {sqlite_quote_id(f"{m.table_name}_id")} on {sqlite_quote_id(m.table_name)}({sqlite_quote_id(m.id_column)});', + + # + f'pragma optimize;') + + # We're going to use Metadata.columns to generate the select + # statement, so ensure it matches what SQLite's .import created. + assert m.columns == (table_columns := sqlite3_table_columns(db_path, m.table_name)), \ + f"{m.columns!r} == {table_columns!r}" + + # Track which columns appear in which metadata inputs, preserving + # the order of both. + for column in m.columns: + # Match different id column names in different metadata files + # since they're logically equivalent. + output_column = output_id_column if column == m.id_column else column + + output_columns.setdefault(output_column, []) + output_columns[output_column] += [(m.table_name, column)] + + + # Construct query to produce merged metadata. + select_list = [ + # Output metadata columns coalesced across input metadata columns + *(f"""coalesce({', '.join(f"nullif({x}, '')" for x in starmap(sqlite_quote_id, reversed(input_columns)))}, null) as {sqlite_quote_id(output_column)}""" + for output_column, input_columns in output_columns.items()), + + # Source columns + *(f"""{sqlite_quote_id(m.table_name, m.id_column)} is not null as {sqlite_quote_id(f'__source_metadata_{m.name}')}""" + for m in metadata)] + + from_list = [ + sqlite_quote_id(metadata[0].table_name), + *(f"full outer join {sqlite_quote_id(m.table_name)} on {sqlite_quote_id(m.table_name, m.id_column)} in ({', '.join(sqlite_quote_id(m.table_name, m.id_column) for m in reversed(preceding))})" + for m, preceding in [(m, metadata[:i]) for i, m in enumerate(metadata[1:], 1)])] + + # Take some small pains to make the query readable since it makes + # debugging and development easier. Note that backslashes aren't + # allowed inside f-string expressions, hence the *newline* variable. + newline = '\n' + query = dedent(f"""\ + select + {(',' + newline + ' ').join(select_list)} + from + {(newline + ' ').join(from_list)} + ; + """) + + + # Write merged metadata as export from SQLite db. + # + # Assume TSV like nearly all other extant --output-metadata options. + print_info(f"Merging metadata and writing to {args.output_metadata!r}…") + sqlite3(db_path, + f'.mode csv', + f'.separator "\\t" "\\n"', + f'.headers on', + f'.once {sqlite_quote_dot(f"|{augur} write-file {shquote(args.output_metadata)}")}', + query) + + except SQLiteError as err: + delete_db = False + raise AugurError(str(err)) from err + + finally: + if delete_db: + os.unlink(db_path) + else: + print_info(f"WARNING: Skipped deletion of {db_path} due to error, but you may want to clean it up yourself (e.g. if it's large).") + + +def sqlite3(*args, **kwargs): + """ + Internal helper for invoking ``sqlite3``, the SQLite CLI program. + """ + sqlite3 = os.environ.get("SQLITE3", which("sqlite3")) + + if not sqlite3: + raise AugurError(dedent(f"""\ + Unable to find the program `sqlite3`. Is it installed? + + In order to use `augur merge`, the SQLite 3 CLI (version ≥3.39) + must be installed separately. It is typically provided by a + Nextstrain runtime. + """)) + + proc = subprocess.run([sqlite3, "-batch", *args], encoding="utf-8", text=True, **kwargs) + + try: + proc.check_returncode() + except subprocess.CalledProcessError as err: + raise SQLiteError(f"sqlite3 invocation failed") from err + + return proc + + +class SQLiteError(Exception): + pass + + +def sqlite3_table_columns(db_path, table: str) -> Iterable[str]: + return sqlite3(db_path, f"select name from pragma_table_info({sqlite_quote_id(table)})", capture_output=True).stdout.splitlines(); + + +def sqlite_quote_id(*xs): + """ + Quote a SQLite identifier. + + + + >>> sqlite_quote_id('foo bar') + '"foo bar"' + >>> sqlite_quote_id('table name', 'column name') + '"table name"."column name"' + >>> sqlite_quote_id('weird"name') + '"weird""name"' + """ + return '.'.join('"' + x.replace('"', '""') + '"' for x in xs) + + +def sqlite_quote_dot(x): + """ + Quote a SQLite CLI dot-command argument. + + + """ + return '"' + x.replace('\\', '\\\\').replace('"', '\\"') + '"' + + +def count_unique(xs: Iterable[T]) -> Iterable[Tuple[T, int]]: + # Using reduce() with a dict because it preserves input order, unlike + # itertools.groupby(), which requires a sort. Preserving order is a nice + # property for the user since we generate an error message with this. + # -trs, 24 July 2024 + yield from reduce(lambda counts, x: {**counts, x: counts.get(x, 0) + 1}, xs, counts := {}).items() # type: ignore + + +def indented_list(xs, prefix): + return f"\n{prefix}".join(xs) diff --git a/docs/api/developer/augur.merge.rst b/docs/api/developer/augur.merge.rst new file mode 100644 index 000000000..e3f404969 --- /dev/null +++ b/docs/api/developer/augur.merge.rst @@ -0,0 +1,7 @@ +augur.merge module +================== + +.. automodule:: augur.merge + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/api/developer/augur.rst b/docs/api/developer/augur.rst index 8d611731b..b778fff5b 100644 --- a/docs/api/developer/augur.rst +++ b/docs/api/developer/augur.rst @@ -41,6 +41,7 @@ Submodules augur.index augur.lbi augur.mask + augur.merge augur.parse augur.read_file augur.reconstruct_sequences diff --git a/docs/usage/cli/cli.rst b/docs/usage/cli/cli.rst index 06dfccf21..ca559654e 100644 --- a/docs/usage/cli/cli.rst +++ b/docs/usage/cli/cli.rst @@ -11,6 +11,7 @@ We're in the process of adding examples and more extensive documentation for eac parse curate/index + merge index filter mask diff --git a/docs/usage/cli/merge.rst b/docs/usage/cli/merge.rst new file mode 100644 index 000000000..0e59650ff --- /dev/null +++ b/docs/usage/cli/merge.rst @@ -0,0 +1,9 @@ +=========== +augur merge +=========== + +.. argparse:: + :module: augur + :func: make_parser + :prog: augur + :path: merge diff --git a/tests/functional/merge/cram/merge.t b/tests/functional/merge/cram/merge.t new file mode 100644 index 000000000..f25b50c9d --- /dev/null +++ b/tests/functional/merge/cram/merge.t @@ -0,0 +1,229 @@ +SETUP + + $ export AUGUR="${AUGUR:-$TESTDIR/../../../../bin/augur}" + + +BASIC USAGE + +Full outer join like behaviour, column coalescing/overwriting, and recording of +each row's source file(s) in extra columns. + + $ cat >x.tsv <<~~ + > strain a b c + > one X1a X1b X1c + > two X2a X2b X2c + > ~~ + + $ cat >y.tsv <<~~ + > strain b c f e d + > two Y2c Y2f Y2e Y2d + > three Y3f Y3e Y3d + > ~~ + + $ ${AUGUR} merge \ + > --metadata X=x.tsv Y=y.tsv \ + > --output-metadata - --quiet | tsv-pretty + strain a b c f e d __source_metadata_X __source_metadata_Y + one X1a X1b X1c 1 0 + two X2a X2b Y2c Y2f Y2e Y2d 1 1 + three Y3f Y3e Y3d 0 1 + +More than two inputs. + + $ cat >z.tsv <<~~ + > strain g c + > one Z1g + > two Z2g Z2c + > three Z3g + > ~~ + + $ ${AUGUR} merge \ + > --metadata X=x.tsv Y=y.tsv Z=z.tsv \ + > --output-metadata - --quiet | tsv-pretty + strain a b c f e d g __source_metadata_X __source_metadata_Y __source_metadata_Z + one X1a X1b X1c Z1g 1 0 1 + two X2a X2b Z2c Y2f Y2e Y2d Z2g 1 1 1 + three Y3f Y3e Y3d Z3g 0 1 1 + +Supports Augur's standard id column detection. Note that the first file's id +column name (e.g. "name" here) is used as the output id column name, per +Augur's convention of preserving the input id column name. + + $ sed '1s/^strain/name/g' < x.tsv > x-name-column.tsv + $ ${AUGUR} merge \ + > --metadata X=x-name-column.tsv Y=y.tsv \ + > --output-metadata - --quiet | tsv-pretty + name a b c f e d __source_metadata_X __source_metadata_Y + one X1a X1b X1c 1 0 + two X2a X2b Y2c Y2f Y2e Y2d 1 1 + three Y3f Y3e Y3d 0 1 + + $ sed '1s/^strain/name/g' < y.tsv > y-name-column.tsv + $ ${AUGUR} merge \ + > --metadata X=x.tsv Y=y-name-column.tsv \ + > --output-metadata - --quiet | tsv-pretty + strain a b c f e d __source_metadata_X __source_metadata_Y + one X1a X1b X1c 1 0 + two X2a X2b Y2c Y2f Y2e Y2d 1 1 + three Y3f Y3e Y3d 0 1 + +Supports --metadata-id-columns. + + $ sed '1s/^strain/id/g' < x.tsv > x-id-column.tsv + $ ${AUGUR} merge \ + > --metadata X=x-id-column.tsv Y=y.tsv \ + > --metadata-id-columns id strain \ + > --output-metadata - --quiet | tsv-pretty + id a b c f e d __source_metadata_X __source_metadata_Y + one X1a X1b X1c 1 0 + two X2a X2b Y2c Y2f Y2e Y2d 1 1 + three Y3f Y3e Y3d 0 1 + +Supports Augur's standard delimiter detection. + + $ sed 's/\t/,/g' < x.tsv > x.csv + $ ${AUGUR} merge \ + > --metadata X=x.csv Y=y.tsv \ + > --output-metadata - --quiet | tsv-pretty + strain a b c f e d __source_metadata_X __source_metadata_Y + one X1a X1b X1c 1 0 + two X2a X2b Y2c Y2f Y2e Y2d 1 1 + three Y3f Y3e Y3d 0 1 + +Supports --metadata-delimiters. + + $ sed 's/\t/|/g' < x.tsv > x.txt + $ ${AUGUR} merge \ + > --metadata X=x.txt Y=y.tsv \ + > --metadata-delimiters '|' $'\t' \ + > --output-metadata - --quiet | tsv-pretty + strain a b c f e d __source_metadata_X __source_metadata_Y + one X1a X1b X1c 1 0 + two X2a X2b Y2c Y2f Y2e Y2d 1 1 + three Y3f Y3e Y3d 0 1 + +Supports Augur's standard accepted compression formats. + + $ xz < x.tsv > x.tsv.xz + $ zstd < y.tsv > y.tsv.zst + $ ${AUGUR} merge \ + > --metadata X=x.tsv.xz Y=y.tsv.zst \ + > --output-metadata - --quiet | tsv-pretty + strain a b c f e d __source_metadata_X __source_metadata_Y + one X1a X1b X1c 1 0 + two X2a X2b Y2c Y2f Y2e Y2d 1 1 + three Y3f Y3e Y3d 0 1 + + +OFF THE BEATEN PATH + +Metadata names are only the part before the first '='. + + $ cp x.tsv x=first.tsv + $ ${AUGUR} merge \ + > --metadata X=x=first.tsv Y=y.tsv \ + > --output-metadata /dev/null + Reading 'X' metadata from 'x=first.tsv'… + Reading 'Y' metadata from 'y.tsv'… + Merging metadata and writing to '/dev/null'… + +Metadata field values with metachars (field or record delimiters) are handled properly. + + $ cat >metachars.csv <<~~ + > strain,comma,tab,newline + > one,"x,x","x x","x + > x" + > two,"","","" + > ~~ + $ ${AUGUR} merge \ + > --metadata X=x.tsv metachars=metachars.csv \ + > --output-metadata - --quiet + strain a b c comma tab newline __source_metadata_X __source_metadata_metachars + one X1a X1b X1c x,x "x x" "x + x" 1 1 + two X2a X2b X2c 1 1 + + +ERROR HANDLING + +At least two metadata inputs are required. + + $ ${AUGUR} merge \ + > --metadata X=x.tsv \ + > --output-metadata - + ERROR: At least two metadata inputs are required for merging. + [2] + +Metadata names are required. + + $ ${AUGUR} merge \ + > --metadata x.tsv =y.tsv \ + > --output-metadata - + ERROR: All metadata inputs must be assigned a name, e.g. with NAME=FILE. + + The following inputs were missing a name: + + 'x.tsv' + '=y.tsv' + + [2] + +Metadata names must be unique. + + $ ${AUGUR} merge \ + > --metadata data=x.tsv data=y.tsv \ + > --output-metadata - + ERROR: Metadata input names must be unique. + + The following names were used more than once: + + 'data' + + [2] + +Duplicates. + + $ cat >dups.tsv <<~~ + > strain a b c + > one 1a 1b 1c + > one 2a 2b 2c + > ~~ + $ ${AUGUR} merge \ + > --metadata dups=dups.tsv Y=y.tsv \ + > --output-metadata /dev/null + Reading 'dups' metadata from 'dups.tsv'… + Error: stepping, UNIQUE constraint failed: metadata_dups.strain (19) + WARNING: Skipped deletion of */augur-merge-*.sqlite due to error, but you may want to clean it up yourself (e.g. if it's large). (glob) + ERROR: sqlite3 invocation failed + [2] + +SQLITE3 env var can be used to override `sqlite3` location (and failure is +handled). + + $ cat >sqlite3 <<~~ + > #!/bin/bash + > echo "This isn't the program you're looking for." >&2 + > exit 1 + > ~~ + $ chmod +x sqlite3 + $ SQLITE3="$PWD/sqlite3" \ + > augur merge \ + > --metadata X=x.tsv Y=y.tsv \ + > --output-metadata /dev/null --quiet + This isn't the program you're looking for. + ERROR: sqlite3 invocation failed + [2] + +No `sqlite3`. + + $ SQLITE3= \ + > augur merge \ + > --metadata X=x.tsv Y=y.tsv \ + > --output-metadata /dev/null --quiet + ERROR: Unable to find the program `sqlite3`. Is it installed? + + In order to use `augur merge`, the SQLite 3 CLI (version ≥3.39) + must be installed separately. It is typically provided by a + Nextstrain runtime. + + [2]