From df7dd76169cf50a0ffe3df4f9ead43d7c9b2fbdf Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Tue, 3 Dec 2024 13:49:19 -0500 Subject: [PATCH 01/15] Add little script for scraping a topic --- misc/re-run/read-topic.py | 99 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 misc/re-run/read-topic.py diff --git a/misc/re-run/read-topic.py b/misc/re-run/read-topic.py new file mode 100644 index 0000000..0eae0fa --- /dev/null +++ b/misc/re-run/read-topic.py @@ -0,0 +1,99 @@ +""" +Read a topic start to finish and write to a file, with messages delimited by newlines. +Example: +python misc/re-run/read-topic.py \ + ./kafka-prod.properties \ + clinvar-somatic-ftp-watcher \ + clinvar-somatic-ftp-watcher.txt +""" + +import argparse +import sys + +from confluent_kafka import Consumer, KafkaError + + +def load_properties(file_path): + """ + Load key-value pairs from a .properties file (no section headers). + """ + properties = {} + with open(file_path, "r") as file: + for line in file: + # Ignore comments and empty lines + line = line.strip() + if line and not line.startswith("#"): + key, value = line.split("=", 1) + properties[key.strip()] = value.strip() + return properties + + +def on_assign(consumer, partitions): + """ + Rebalance callback to handle partition assignment and seek to the beginning. + """ + print(f"Partitions assigned: {partitions}") + for partition in partitions: + partition.offset = 0 # Set the offset to the beginning + print("Seeking to the beginning of all newly assigned partitions.") + consumer.assign(partitions) + + +def consume_kafka_messages(kafka_config, topic, output_file): + """ + Consume messages from a Kafka topic and write them to a file. + """ + # Create a Kafka consumer + consumer = Consumer(kafka_config) + consumer.subscribe([topic], on_assign=on_assign) + print(f"Subscribed to topic: {topic}") + + try: + with open(output_file, "w") as file: + while True: + # Poll for a message + msg = consumer.poll(1.0) # 1-second timeout + print(f"Got message: {msg}") + if msg is None: + continue + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + # End of partition event + print(f"Reached end of partition for {msg.topic()}") + break + elif msg.error(): + raise KafkaError(msg.error()) + else: + print(f"{len(msg.value())}") + # Write the message to the file + file.write(msg.value().decode("utf-8") + "\n") + print(f"Message written: {msg.value().decode('utf-8')}") + + except KeyboardInterrupt: + print("Consumption interrupted by user.") + finally: + consumer.close() + + +def parse_args(argv): + """ + Parse command line arguments. + """ + parser = argparse.ArgumentParser(description="Consume messages from a Kafka topic.") + parser.add_argument( + "kafka_properties_file", help="Path to the Kafka properties file." + ) + parser.add_argument("topic", help="Kafka topic to consume messages from.") + parser.add_argument("output_file", help="Output file to write messages to.") + return parser.parse_args(argv) + + +if __name__ == "__main__": + # Parse command-line arguments + args = parse_args(sys.argv[1:]) + + # Load Kafka properties + kafka_properties = load_properties(args.kafka_properties_file) + + # Consume messages and write them to a file + consume_kafka_messages(kafka_properties, args.topic, args.output_file) From a4783874c9477d591a321cbfd46cb6dcb1dbc136 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Tue, 3 Dec 2024 18:37:24 -0500 Subject: [PATCH 02/15] Add initial revision of execute-re-runs.py --- misc/re-run/execute-re-runs.py | 97 ++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 misc/re-run/execute-re-runs.py diff --git a/misc/re-run/execute-re-runs.py b/misc/re-run/execute-re-runs.py new file mode 100644 index 0000000..043003c --- /dev/null +++ b/misc/re-run/execute-re-runs.py @@ -0,0 +1,97 @@ +import json +import os +import subprocess + +region = "us-east1" +# execute_job_script = "misc/bin/execute-job.sh" +# env_file = "./local-env.sh" +# ftp_watcher_file = "confluent-prod_clinvar-somatic-ftp-watcher_20241203.txt" +ftp_watcher_file = "misc/re-run/ftp-watcher-vcv-2024-10-27.txt" + + +def run_command( + cmd_array, + # env_dict=None +): + """ + Run a command with optional environment variables, capture output, and handle errors. + + Args: + cmd_array (list): Command and arguments as a list + env_dict (dict, optional): Environment variables to set + + Raises: + subprocess.CalledProcessError: If the command returns non-zero exit status + """ + # Run process with pipe for output + with subprocess.Popen( # noqa: S603 + cmd_array, + # env=env_dict, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, # Use text mode instead of universal_newlines + bufsize=1, # Line buffered + ) as process: + # Print output in real-time + while True: + line = process.stdout.readline() + if not line and process.poll() is not None: + break + if line: + print(line.rstrip()) + + # Check return code and raise error if non-zero + if process.returncode != 0: + raise subprocess.CalledProcessError(process.returncode, cmd_array) + + return process.returncode + + +global_env = { + "instance_name": "clinvar-vcv-ingest", + "CLINVAR_INGEST_SLACK_TOKEN": "", + "CLINVAR_INGEST_SLACK_CHANNEL": "", + "CLINVAR_INGEST_BUCKET": "clinvar-ingest-dev", + "CLINVAR_INGEST_RELEASE_TAG": "v2_0_4_alpha", + "CLINVAR_INGEST_BQ_META_DATASET": "clinvar_ingest", + "BQ_DEST_PROJECT": "clingen-dev", +} + +with open(ftp_watcher_file) as f: + for line in f: + # Each FTP Watcher message is an array of records + watcher_records = json.loads(line) + if not isinstance(watcher_records, list): + raise ValueError("Expected a list of records: " + line) + for record in watcher_records: + print(f"Executing job for {record}") + # cmd_args = ["bash", execute_job_script] + env = {} + env.update(global_env) + env.update(record) + + env_str = ",".join([f"{k}='{v}'" for k, v in env.items()]) + + # Same command as in execute-job.sh + cmd_args = [ + "gcloud", + "run", + "jobs", + "execute", + global_env["instance_name"], + "--region", + region, + "--async", + "--update-env-vars", + env_str, + ] + + # Execute the job with subprocess.popen + try: + run_command(cmd_args) + except subprocess.CalledProcessError as e: + print(f"Job failed for {record['file_name']}") + print(e) + break + + print(f"Job executed successfully for {json.dumps(record)}") From dd892d3aacbe9e2fe9296f27c411632a9375c540 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Tue, 3 Dec 2024 21:17:50 -0500 Subject: [PATCH 03/15] Add archive scripts --- misc/re-run/archive-rcvs.sh | 61 ++++++++++++++++++++++++++++++++ misc/re-run/archive-vcvs.sh | 70 +++++++++++++++++++++++++++++++++++++ 2 files changed, 131 insertions(+) create mode 100644 misc/re-run/archive-rcvs.sh create mode 100644 misc/re-run/archive-vcvs.sh diff --git a/misc/re-run/archive-rcvs.sh b/misc/re-run/archive-rcvs.sh new file mode 100644 index 0000000..2dba993 --- /dev/null +++ b/misc/re-run/archive-rcvs.sh @@ -0,0 +1,61 @@ +#!/bin/bash +set -xeo pipefail +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_rcv_2024_07_24_v1_1_0_beta4/clinvar_xml/ClinVarRCVRelease_2024-0724.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-0724.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_rcv_2024_07_30_v1_1_0_beta4/clinvar_xml/ClinVarRCVRelease_2024-0730.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-0730.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_rcv_2024_08_05_v1_1_0_beta4/clinvar_xml/ClinVarRCVRelease_2024-0805.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-0805.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_rcv_2024_08_12_v1_1_0_beta5/clinvar_xml/ClinVarRCVRelease_2024-0812.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-0812.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_rcv_2024_08_18_v1_1_0_beta5/clinvar_xml/ClinVarRCVRelease_2024-0818.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-0818.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_rcv_2024_08_25_v1_1_0_beta5/clinvar_xml/ClinVarRCVRelease_2024-0825.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-0825.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_rcv_2024_09_02_v1_1_0_beta6/clinvar_xml/ClinVarRCVRelease_2024-0902.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-0902.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_rcv_2024_09_08_v1_1_0_beta6/clinvar_xml/ClinVarRCVRelease_2024-0908.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-0908.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_rcv_2024_09_17_v1_1_0_beta6/clinvar_xml/ClinVarRCVRelease_2024-0917.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-0917.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_rcv_2024_10_01_v1_0_0/clinvar_xml/ClinVarRCVRelease_2024-1001.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-1001.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_rcv_2024_10_09_v1_0_0/clinvar_xml/ClinVarRCVRelease_2024-1009.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-1009.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_rcv_2024_10_14_v1_0_0/clinvar_xml/ClinVarRCVRelease_2024-1014.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-1014.xml.gz + +# new ones since that scrape +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_rcv_2024_10_20_v1_0_0/clinvar_xml/ClinVarRCVRelease_2024-1020.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-1020.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_rcv_2024_10_27_v2_0_0_alpha/clinvar_xml/ClinVarRCVRelease_2024-1027.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-1027.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_rcv_2024_11_03_v2_0_0_alpha/clinvar_xml/ClinVarRCVRelease_2024-1103.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-1103.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_rcv_2024_11_11_v2_0_0_alpha/clinvar_xml/ClinVarRCVRelease_2024-1111.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-1111.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_rcv_2024_11_20_v2_0_1_alpha/clinvar_xml/ClinVarRCVRelease_2024-1120.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-1120.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_rcv_2024_11_26_v2_0_1_alpha/clinvar_xml/ClinVarRCVRelease_2024-1126.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-1126.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_rcv_2024_12_01_v2_0_1_alpha/clinvar_xml/ClinVarRCVRelease_2024-1201.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-1201.xml.gz diff --git a/misc/re-run/archive-vcvs.sh b/misc/re-run/archive-vcvs.sh new file mode 100644 index 0000000..aea95c8 --- /dev/null +++ b/misc/re-run/archive-vcvs.sh @@ -0,0 +1,70 @@ +#!/bin/bash +set -xeo pipefail +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_07_24_v1_1_0_beta4_copy_only/clinvar_xml/ClinVarVCVRelease_2024-0724.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-0724.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_07_30_v1_1_0_beta4_copy_only/clinvar_xml/ClinVarVCVRelease_2024-0730.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-0730.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_08_05_v1_1_0_beta4_copy_only/clinvar_xml/ClinVarVCVRelease_2024-0805.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-0805.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_08_12_v1_1_0_beta5_copy_only/clinvar_xml/ClinVarVCVRelease_2024-0812.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-0812.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_08_19_v1_1_0_beta5_copy_only/clinvar_xml/ClinVarVCVRelease_2024-0819.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-0819.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_08_25_v1_1_0_beta5_copy_only/clinvar_xml/ClinVarVCVRelease_2024-0825.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-0825.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_09_02_v1_1_0_beta6_copy_only/clinvar_xml/ClinVarVCVRelease_2024-0902.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-0902.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_09_08_v1_1_0_beta6_copy_only/clinvar_xml/ClinVarVCVRelease_2024-0908.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-0908.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_09_17_v1_1_0_beta6_copy_only/clinvar_xml/ClinVarVCVRelease_2024-0917.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-0917.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_09_30_v1_0_0_copy_only/clinvar_xml/ClinVarVCVRelease_2024-0930.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-0930.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_10_09_v1_0_0_copy_only/clinvar_xml/ClinVarVCVRelease_2024-1009.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-1009.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_10_14_v1_0_0_copy_only/clinvar_xml/ClinVarVCVRelease_2024-1014.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-1014.xml.gz + +# new ones since that scrape +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_10_20_v1_0_0_copy_only/clinvar_xml/ClinVarVCVRelease_2024-1020.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-1020.xml.gz +# Already archived +# gcloud storage cp \ +# gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_10_27_v2_0_0_alpha/clinvar_xml/ClinVarVCVRelease_2024-1027.xml.gz \ +# gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-1027.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_11_03_v2_0_0_alpha/clinvar_xml/ClinVarVCVRelease_2024-1103.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-1103.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_11_11_v2_0_0_alpha/clinvar_xml/ClinVarVCVRelease_2024-1111.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-1111.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_11_20_v2_0_1_alpha/clinvar_xml/ClinVarVCVRelease_2024-1120.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-1120.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_11_26_v2_0_1_alpha/clinvar_xml/ClinVarVCVRelease_2024-1126.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-1126.xml.gz +gcloud storage cp \ + gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_12_01_v2_0_1_alpha/clinvar_xml/ClinVarVCVRelease_2024-1201.xml.gz \ + gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-1201.xml.gz + + +# Delete parsed content from already-ingested ones +# gcloud storage rm 'gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_11_03_v2_0_0_alpha/clinvar_parsed/**' +# gcloud storage rm 'gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_11_11_v2_0_0_alpha/clinvar_parsed/**' +# gcloud storage rm 'gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_11_20_v2_0_1_alpha/clinvar_parsed/**' +# gcloud storage rm 'gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_11_26_v2_0_1_alpha/clinvar_parsed/**' +# gcloud storage rm 'gs://clinvar-ingest-dev/executions/clinvar_vcv_2024_12_01_v2_0_1_alpha/clinvar_parsed/**' From 5631ab2ce773fc7665b6002c92b8edf4a4829dc7 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Tue, 3 Dec 2024 22:57:21 -0500 Subject: [PATCH 04/15] Refactor re-run script to allow rcv or vcv. Fix env var string construction --- misc/re-run/execute-re-runs.py | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/misc/re-run/execute-re-runs.py b/misc/re-run/execute-re-runs.py index 043003c..e7055db 100644 --- a/misc/re-run/execute-re-runs.py +++ b/misc/re-run/execute-re-runs.py @@ -6,7 +6,14 @@ # execute_job_script = "misc/bin/execute-job.sh" # env_file = "./local-env.sh" # ftp_watcher_file = "confluent-prod_clinvar-somatic-ftp-watcher_20241203.txt" -ftp_watcher_file = "misc/re-run/ftp-watcher-vcv-2024-10-27.txt" + +# ftp_watcher_file = "misc/re-run/ftp-watcher-vcv-2024-10-27.txt" +# instance_name = "clinvar-vcv-ingest" +# file_format = "vcv" + +ftp_watcher_file = "misc/re-run/ftp-watcher-rcv-2024-10-27.txt" +instance_name = "clinvar-rcv-ingest" +file_format = "rcv" def run_command( @@ -48,13 +55,17 @@ def run_command( global_env = { - "instance_name": "clinvar-vcv-ingest", - "CLINVAR_INGEST_SLACK_TOKEN": "", - "CLINVAR_INGEST_SLACK_CHANNEL": "", - "CLINVAR_INGEST_BUCKET": "clinvar-ingest-dev", - "CLINVAR_INGEST_RELEASE_TAG": "v2_0_4_alpha", - "CLINVAR_INGEST_BQ_META_DATASET": "clinvar_ingest", + "instance_name": instance_name, # Cloud run job name + # TODO can't override this because the one in the job isn't a string it's a secret manager ref + # this is okay because the channel also needs to be set in the job to send messages and we + # set that to empty string here + # "CLINVAR_INGEST_SLACK_TOKEN": "", # Override job variable to disable messaging + "CLINVAR_INGEST_SLACK_CHANNEL": "", # Override job variable to disable messaging + "CLINVAR_INGEST_BUCKET": "clinvar-ingest-dev", # Set already on job + # "CLINVAR_INGEST_RELEASE_TAG": "v2_0_4_alpha", # Optional? Set already in job + "CLINVAR_INGEST_BQ_META_DATASET": "clinvar_ingest", # Set already on job "BQ_DEST_PROJECT": "clingen-dev", + "file_format": file_format, # Set already on job but provide again for explicitness } with open(ftp_watcher_file) as f: @@ -65,12 +76,12 @@ def run_command( raise ValueError("Expected a list of records: " + line) for record in watcher_records: print(f"Executing job for {record}") - # cmd_args = ["bash", execute_job_script] env = {} env.update(global_env) env.update(record) - env_str = ",".join([f"{k}='{v}'" for k, v in env.items()]) + env_str = ",".join([f"{k}={v}" for k, v in env.items()]) + print(f"Environment variable update string: {env_str}") # Same command as in execute-job.sh cmd_args = [ @@ -90,7 +101,7 @@ def run_command( try: run_command(cmd_args) except subprocess.CalledProcessError as e: - print(f"Job failed for {record['file_name']}") + print(f"Job failed for {record}") print(e) break From ded886b896ca1a5615ce18843b919d46cb992eb1 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Wed, 4 Dec 2024 11:15:19 -0500 Subject: [PATCH 05/15] Add comment about VCV IncludedRecord Classifications --- clinvar_ingest/model/variation_archive.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/clinvar_ingest/model/variation_archive.py b/clinvar_ingest/model/variation_archive.py index 0a8dda9..2b444cd 100644 --- a/clinvar_ingest/model/variation_archive.py +++ b/clinvar_ingest/model/variation_archive.py @@ -1102,6 +1102,13 @@ def from_xml(inp: dict): if record_type == "ClassifiedRecord": raw_classifications = extract(interp_record, "Classifications") else: + # IncludedRecord classifications are added by ClinVar in the XML, however + # they are always "empty", saying the number of submissions is 0 and there + # is no classification for the variant. We will ignore these. + # e.g.: + # {"@VariationID": "267462", "@VariationType": "Deletion", "@RecordType": "included", "IncludedRecord": {"Classifications": { + # "GermlineClassification": {"@NumberOfSubmissions": "0", "@NumberOfSubmitters": "0", "ReviewStatus": {"$": "no classification for the single variant"}, "Description": {"$": "no classification for the single variant"}}, "SomaticClinicalImpact": {"@NumberOfSubmissions": "0", "@NumberOfSubmitters": "0", "ReviewStatus": {"$": "no classification for the single variant"}, "Description": {"$": "no classification for the single variant"}}, + # "OncogenicityClassification": {"@NumberOfSubmissions": "0", "@NumberOfSubmitters": "0", "ReviewStatus": {"$": "no classification for the single variant"}, "Description": {"$": "no classification for the single variant"}}}, "SubmittedClassificationList": {"SCV": {"@Accession": "SCV000328413", "@Version": "2"}}, "ClassifiedVariationList": {"ClassifiedVariation": {"@VariationID": "267444", "@Accession": "VCV000267444", "@Version": "4"}}}} raw_classifications = {} raw_classification_types = {r.value for r in StatementType}.intersection( set(raw_classifications.keys()) From 39aa426e9ad138d64823600848c1f63080ad946c Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Wed, 4 Dec 2024 11:15:33 -0500 Subject: [PATCH 06/15] Fix create tables docstring --- clinvar_ingest/cloud/bigquery/create_tables.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clinvar_ingest/cloud/bigquery/create_tables.py b/clinvar_ingest/cloud/bigquery/create_tables.py index db0545b..980c317 100644 --- a/clinvar_ingest/cloud/bigquery/create_tables.py +++ b/clinvar_ingest/cloud/bigquery/create_tables.py @@ -151,8 +151,8 @@ def create_internal_tables( ) -> CreateInternalTablesRequest: """ -- clingen-dev.2023_10_07_2024_02_19T172113657352.variation_archive - CREATE TABLE `clingen-dev.2023_10_07_2024_02_19T172113657352.variation_archive_internal` - AS SELECT * from `clingen-dev.2023_10_07_2024_02_19T172113657352.variation_archive` + CREATE TABLE `clingen-dev.2023_10_07_2024_02_19T172113657352.variation_archive` + AS SELECT * from `clingen-dev.2023_10_07_2024_02_19T172113657352.variation_archive_external` """ From e90c0ae9966b5087cda1b705605fa37eeed20e2c Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Wed, 4 Dec 2024 11:50:02 -0500 Subject: [PATCH 07/15] Add rcv example --- misc/re-run/read-topic.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/misc/re-run/read-topic.py b/misc/re-run/read-topic.py index 0eae0fa..8146d2e 100644 --- a/misc/re-run/read-topic.py +++ b/misc/re-run/read-topic.py @@ -5,6 +5,12 @@ ./kafka-prod.properties \ clinvar-somatic-ftp-watcher \ clinvar-somatic-ftp-watcher.txt + + +python misc/re-run/read-topic.py \ + ./kafka-prod.properties \ + clinvar-rcv-ftp-watcher \ + confluent-prod_clinvar-rcv-ftp-watcher_20241203.txt """ import argparse @@ -64,6 +70,7 @@ def consume_kafka_messages(kafka_config, topic, output_file): elif msg.error(): raise KafkaError(msg.error()) else: + print(msg.value()) print(f"{len(msg.value())}") # Write the message to the file file.write(msg.value().decode("utf-8") + "\n") From 67554cae52976b874afa7753a2290b26fc3e411d Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Thu, 5 Dec 2024 10:41:44 -0500 Subject: [PATCH 08/15] Add ftp watcher message files --- ...-prod_clinvar-rcv-ftp-watcher_20241203.txt | 20 ++++++++++++++++ ...d_clinvar-somatic-ftp-watcher_20241203.txt | 20 ++++++++++++++++ misc/re-run/execute-re-runs.py | 23 ++++++++++++++----- misc/re-run/ftp-watcher-rcv-2024-10-27.txt | 1 + misc/re-run/ftp-watcher-rcv-2024-11-03.txt | 1 + misc/re-run/ftp-watcher-vcv-2024-10-27.txt | 1 + misc/re-run/ftp-watcher-vcv-2024-11-03.txt | 1 + 7 files changed, 61 insertions(+), 6 deletions(-) create mode 100644 misc/re-run/confluent-prod_clinvar-rcv-ftp-watcher_20241203.txt create mode 100644 misc/re-run/confluent-prod_clinvar-somatic-ftp-watcher_20241203.txt create mode 100644 misc/re-run/ftp-watcher-rcv-2024-10-27.txt create mode 100644 misc/re-run/ftp-watcher-rcv-2024-11-03.txt create mode 100644 misc/re-run/ftp-watcher-vcv-2024-10-27.txt create mode 100644 misc/re-run/ftp-watcher-vcv-2024-11-03.txt diff --git a/misc/re-run/confluent-prod_clinvar-rcv-ftp-watcher_20241203.txt b/misc/re-run/confluent-prod_clinvar-rcv-ftp-watcher_20241203.txt new file mode 100644 index 0000000..291988a --- /dev/null +++ b/misc/re-run/confluent-prod_clinvar-rcv-ftp-watcher_20241203.txt @@ -0,0 +1,20 @@ +[{"Name":"ClinVarRCVRelease_2024-0716.xml.gz","Size":4377392140,"Released":"2024-07-16 22:57:20","Last Modified":"2024-07-16 22:57:20","Directory":"\/pub\/clinvar\/xml\/RCV_release\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-07-16"}] +[{"Name":"ClinVarRCVRelease_2024-0724.xml.gz","Size":4384393745,"Released":"2024-07-25 10:18:53","Last Modified":"2024-07-25 10:18:53","Directory":"\/pub\/clinvar\/xml\/RCV_release\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-07-24"}] +[{"Name":"ClinVarRCVRelease_2024-0730.xml.gz","Size":4382333066,"Released":"2024-07-31 24:38:20","Last Modified":"2024-07-31 24:38:20","Directory":"\/pub\/clinvar\/xml\/RCV_release\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-07-30"}] +[{"Name":"ClinVarRCVRelease_2024-0805.xml.gz","Size":4379074934,"Released":"2024-08-06 08:43:27","Last Modified":"2024-08-06 08:43:27","Directory":"\/pub\/clinvar\/xml\/RCV_release\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-08-05"}] +[{"Name":"ClinVarRCVRelease_2024-0812.xml.gz","Size":4446029713,"Released":"2024-08-13 22:46:46","Last Modified":"2024-08-13 22:46:46","Directory":"\/pub\/clinvar\/xml\/RCV_release\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-08-12"}] +[{"Name":"ClinVarRCVRelease_2024-0818.xml.gz","Size":4453442500,"Released":"2024-08-20 11:35:31","Last Modified":"2024-08-20 11:35:31","Directory":"\/pub\/clinvar\/xml\/RCV_release\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-08-18"}] +[{"Name":"ClinVarRCVRelease_2024-0825.xml.gz","Size":4456473524,"Released":"2024-08-26 12:03:10","Last Modified":"2024-08-26 12:03:10","Directory":"\/pub\/clinvar\/xml\/RCV_release\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-08-25"}] +[{"Name":"ClinVarRCVRelease_2024-0902.xml.gz","Size":4444355955,"Released":"2024-09-03 09:45:55","Last Modified":"2024-09-03 09:45:55","Directory":"\/pub\/clinvar\/xml\/RCV_release\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-09-02"}] +[{"Name":"ClinVarRCVRelease_2024-0908.xml.gz","Size":4422960898,"Released":"2024-09-09 13:20:18","Last Modified":"2024-09-09 13:20:18","Directory":"\/pub\/clinvar\/xml\/RCV_release\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-09-08"}] +[{"Name":"ClinVarRCVRelease_2024-0917.xml.gz","Size":4422362560,"Released":"2024-09-18 03:19:46","Last Modified":"2024-09-18 03:19:46","Directory":"\/pub\/clinvar\/xml\/RCV_release\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-09-17"}] +[{"Name":"ClinVarRCVRelease_2024-1001.xml.gz","Size":4499255945,"Released":"2024-10-02 13:37:41","Last Modified":"2024-10-02 13:37:41","Directory":"\/pub\/clinvar\/xml\/RCV_release\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-10-01"}] +[{"Name":"ClinVarRCVRelease_2024-1009.xml.gz","Size":4542289766,"Released":"2024-10-10 07:21:41","Last Modified":"2024-10-10 07:21:41","Directory":"\/pub\/clinvar\/xml\/RCV_release\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-10-09"}] +[{"Name":"ClinVarRCVRelease_2024-1014.xml.gz","Size":4554485213,"Released":"2024-10-15 08:46:50","Last Modified":"2024-10-15 08:46:50","Directory":"\/pub\/clinvar\/xml\/RCV_release\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-10-14"}] +[{"Name":"ClinVarRCVRelease_2024-1020.xml.gz","Size":4554019279,"Released":"2024-10-21 10:19:51","Last Modified":"2024-10-21 10:19:51","Directory":"\/pub\/clinvar\/xml\/RCV_release\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-10-20"}] +[{"Name":"ClinVarRCVRelease_2024-1027.xml.gz","Size":4561188864,"Released":"2024-10-28 06:11:13","Last Modified":"2024-10-28 06:11:13","Directory":"\/pub\/clinvar\/xml\/RCV_release\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-10-27"}] +[{"Name":"ClinVarRCVRelease_2024-1103.xml.gz","Size":4566683937,"Released":"2024-11-04 13:04:32","Last Modified":"2024-11-04 13:04:32","Directory":"\/pub\/clinvar\/xml\/RCV_release\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-11-03"}] +[{"Name":"ClinVarRCVRelease_2024-1111.xml.gz","Size":4565532801,"Released":"2024-11-11 22:59:30","Last Modified":"2024-11-11 22:59:30","Directory":"\/pub\/clinvar\/xml\/RCV_release\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-11-11"}] +[{"Name":"ClinVarRCVRelease_2024-1120.xml.gz","Size":4558640832,"Released":"2024-11-20 20:55:37","Last Modified":"2024-11-20 20:55:37","Directory":"\/pub\/clinvar\/xml\/RCV_release\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-11-20"}] +[{"Name":"ClinVarRCVRelease_2024-1126.xml.gz","Size":4538577274,"Released":"2024-11-27 08:02:32","Last Modified":"2024-11-27 08:02:32","Directory":"\/pub\/clinvar\/xml\/RCV_release\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-11-26"}] +[{"Name":"ClinVarRCVRelease_2024-1201.xml.gz","Size":4538533987,"Released":"2024-12-01 18:38:25","Last Modified":"2024-12-01 18:38:25","Directory":"\/pub\/clinvar\/xml\/RCV_release\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-12-01"}] diff --git a/misc/re-run/confluent-prod_clinvar-somatic-ftp-watcher_20241203.txt b/misc/re-run/confluent-prod_clinvar-somatic-ftp-watcher_20241203.txt new file mode 100644 index 0000000..316200c --- /dev/null +++ b/misc/re-run/confluent-prod_clinvar-somatic-ftp-watcher_20241203.txt @@ -0,0 +1,20 @@ +[{"Name":"ClinVarVCVRelease_2024-0716.xml.gz","Size":3964266964,"Released":"2024-07-17 02:57:18","Last Modified":"2024-07-17 02:57:18","Directory":"\/pub\/clinvar\/xml\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-07-16"}] +[{"Name":"ClinVarVCVRelease_2024-0724.xml.gz","Size":3970109719,"Released":"2024-07-25 10:18:46","Last Modified":"2024-07-25 10:18:46","Directory":"\/pub\/clinvar\/xml\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-07-24"}] +[{"Name":"ClinVarVCVRelease_2024-0730.xml.gz","Size":3969688234,"Released":"2024-07-31 24:38:22","Last Modified":"2024-07-31 24:38:22","Directory":"\/pub\/clinvar\/xml\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-07-30"}] +[{"Name":"ClinVarVCVRelease_2024-0805.xml.gz","Size":3968139266,"Released":"2024-08-06 08:43:24","Last Modified":"2024-08-06 08:43:24","Directory":"\/pub\/clinvar\/xml\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-08-05"}] +[{"Name":"ClinVarVCVRelease_2024-0812.xml.gz","Size":4018091737,"Released":"2024-08-13 22:47:24","Last Modified":"2024-08-13 22:47:24","Directory":"\/pub\/clinvar\/xml\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-08-12"}] +[{"Name":"ClinVarVCVRelease_2024-0819.xml.gz","Size":4021274894,"Released":"2024-08-20 10:47:08","Last Modified":"2024-08-20 10:47:08","Directory":"\/pub\/clinvar\/xml\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-08-19"}] +[{"Name":"ClinVarVCVRelease_2024-0825.xml.gz","Size":4021735028,"Released":"2024-08-26 12:03:07","Last Modified":"2024-08-26 12:03:07","Directory":"\/pub\/clinvar\/xml\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-08-25"}] +[{"Name":"ClinVarVCVRelease_2024-0902.xml.gz","Size":4021890929,"Released":"2024-09-03 09:45:53","Last Modified":"2024-09-03 09:45:53","Directory":"\/pub\/clinvar\/xml\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-09-02"}] +[{"Name":"ClinVarVCVRelease_2024-0908.xml.gz","Size":4014001043,"Released":"2024-09-09 23:12:01","Last Modified":"2024-09-09 23:12:01","Directory":"\/pub\/clinvar\/xml\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-09-08"}] +[{"Name":"ClinVarVCVRelease_2024-0917.xml.gz","Size":4009042800,"Released":"2024-09-18 03:19:46","Last Modified":"2024-09-18 03:19:46","Directory":"\/pub\/clinvar\/xml\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-09-17"}] +[{"Name":"ClinVarVCVRelease_2024-0930.xml.gz","Size":4095695420,"Released":"2024-10-02 13:43:03","Last Modified":"2024-10-02 13:43:03","Directory":"\/pub\/clinvar\/xml\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-09-30"}] +[{"Name":"ClinVarVCVRelease_2024-1009.xml.gz","Size":4117261715,"Released":"2024-10-10 07:21:34","Last Modified":"2024-10-10 07:21:34","Directory":"\/pub\/clinvar\/xml\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-10-09"}] +[{"Name":"ClinVarVCVRelease_2024-1014.xml.gz","Size":4119149779,"Released":"2024-10-15 08:46:46","Last Modified":"2024-10-15 08:46:46","Directory":"\/pub\/clinvar\/xml\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-10-14"}] +[{"Name":"ClinVarVCVRelease_2024-1020.xml.gz","Size":4118331783,"Released":"2024-10-21 10:19:51","Last Modified":"2024-10-21 10:19:51","Directory":"\/pub\/clinvar\/xml\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-10-20"}] +[{"Name":"ClinVarVCVRelease_2024-1027.xml.gz","Size":4120656527,"Released":"2024-10-28 06:11:11","Last Modified":"2024-10-28 06:11:11","Directory":"\/pub\/clinvar\/xml\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-10-27"}] +[{"Name":"ClinVarVCVRelease_2024-1103.xml.gz","Size":4124180047,"Released":"2024-11-04 13:04:31","Last Modified":"2024-11-04 13:04:31","Directory":"\/pub\/clinvar\/xml\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-11-03"}] +[{"Name":"ClinVarVCVRelease_2024-1111.xml.gz","Size":4125980242,"Released":"2024-11-11 22:59:22","Last Modified":"2024-11-11 22:59:22","Directory":"\/pub\/clinvar\/xml\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-11-11"}] +[{"Name":"ClinVarVCVRelease_2024-1120.xml.gz","Size":4123917773,"Released":"2024-11-20 20:55:35","Last Modified":"2024-11-20 20:55:35","Directory":"\/pub\/clinvar\/xml\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-11-20"}] +[{"Name":"ClinVarVCVRelease_2024-1126.xml.gz","Size":4134323060,"Released":"2024-11-27 08:02:50","Last Modified":"2024-11-27 08:02:50","Directory":"\/pub\/clinvar\/xml\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-11-26"}] +[{"Name":"ClinVarVCVRelease_2024-1201.xml.gz","Size":4137557326,"Released":"2024-12-01 18:38:25","Last Modified":"2024-12-01 18:38:25","Directory":"\/pub\/clinvar\/xml\/weekly_release","Host":"https:\/\/ftp.ncbi.nlm.nih.gov","Release Date":"2024-12-01"}] diff --git a/misc/re-run/execute-re-runs.py b/misc/re-run/execute-re-runs.py index e7055db..21fc033 100644 --- a/misc/re-run/execute-re-runs.py +++ b/misc/re-run/execute-re-runs.py @@ -1,17 +1,28 @@ +""" +Execute a cloud run job across a series of ClinVar FTP Watcher messages. +(messages can also point to GCS files) + +NOTE: Must update global variables at the top of the script to match the job, file, and file format. + +Usage: + python execute-re-runs.py +""" + import json import os import subprocess region = "us-east1" -# execute_job_script = "misc/bin/execute-job.sh" -# env_file = "./local-env.sh" # ftp_watcher_file = "confluent-prod_clinvar-somatic-ftp-watcher_20241203.txt" + # ftp_watcher_file = "misc/re-run/ftp-watcher-vcv-2024-10-27.txt" +# ftp_watcher_file = "misc/re-run/ftp-watcher-vcv-2024-11-03.txt" # instance_name = "clinvar-vcv-ingest" # file_format = "vcv" -ftp_watcher_file = "misc/re-run/ftp-watcher-rcv-2024-10-27.txt" +# ftp_watcher_file = "misc/re-run/ftp-watcher-rcv-2024-10-27.txt" +ftp_watcher_file = "misc/re-run/ftp-watcher-rcv-2024-11-03.txt" instance_name = "clinvar-rcv-ingest" file_format = "rcv" @@ -61,9 +72,9 @@ def run_command( # set that to empty string here # "CLINVAR_INGEST_SLACK_TOKEN": "", # Override job variable to disable messaging "CLINVAR_INGEST_SLACK_CHANNEL": "", # Override job variable to disable messaging - "CLINVAR_INGEST_BUCKET": "clinvar-ingest-dev", # Set already on job - # "CLINVAR_INGEST_RELEASE_TAG": "v2_0_4_alpha", # Optional? Set already in job - "CLINVAR_INGEST_BQ_META_DATASET": "clinvar_ingest", # Set already on job + "CLINVAR_INGEST_BUCKET": "clinvar-ingest-dev", # Already set on job, no need to override + # "CLINVAR_INGEST_RELEASE_TAG": "v2_0_4_alpha", # Already set on job, no need to override + "CLINVAR_INGEST_BQ_META_DATASET": "clinvar_ingest", # Already set on job, no need to override "BQ_DEST_PROJECT": "clingen-dev", "file_format": file_format, # Set already on job but provide again for explicitness } diff --git a/misc/re-run/ftp-watcher-rcv-2024-10-27.txt b/misc/re-run/ftp-watcher-rcv-2024-10-27.txt new file mode 100644 index 0000000..93b4680 --- /dev/null +++ b/misc/re-run/ftp-watcher-rcv-2024-10-27.txt @@ -0,0 +1 @@ +[{"Name":"ClinVarRCVRelease_2024-1027.xml.gz","Size":4561188864,"Released":"2024-10-28 06:11:13","Last Modified":"2024-10-28 06:11:13","Directory":"/xml_archives/rcv","Host":"gs://clinvar-ingest-dev","Release Date":"2024-10-27"}] diff --git a/misc/re-run/ftp-watcher-rcv-2024-11-03.txt b/misc/re-run/ftp-watcher-rcv-2024-11-03.txt new file mode 100644 index 0000000..af061fb --- /dev/null +++ b/misc/re-run/ftp-watcher-rcv-2024-11-03.txt @@ -0,0 +1 @@ +[{"Name":"ClinVarRCVRelease_2024-1103.xml.gz","Size":4566683937,"Released":"2024-11-04 13:04:32","Last Modified":"2024-11-04 13:04:32","Directory":"/xml_archives/rcv","Host":"gs://clinvar-ingest-dev","Release Date":"2024-11-03"}] diff --git a/misc/re-run/ftp-watcher-vcv-2024-10-27.txt b/misc/re-run/ftp-watcher-vcv-2024-10-27.txt new file mode 100644 index 0000000..e01cde1 --- /dev/null +++ b/misc/re-run/ftp-watcher-vcv-2024-10-27.txt @@ -0,0 +1 @@ +[{"Name":"ClinVarVCVRelease_2024-1027.xml.gz","Size":4120656527,"Released":"2024-10-28 06:11:11","Last Modified":"2024-10-28 06:11:11","Directory":"/xml_archives/vcv","Host":"gs://clinvar-ingest-dev","Release Date":"2024-10-27"}] diff --git a/misc/re-run/ftp-watcher-vcv-2024-11-03.txt b/misc/re-run/ftp-watcher-vcv-2024-11-03.txt new file mode 100644 index 0000000..efd1e77 --- /dev/null +++ b/misc/re-run/ftp-watcher-vcv-2024-11-03.txt @@ -0,0 +1 @@ +[{"Name":"ClinVarVCVRelease_2024-1103.xml.gz","Size":4124180047,"Released":"2024-11-04 13:04:31","Last Modified":"2024-11-04 13:04:31","Directory":"/xml_archives/vcv","Host":"gs://clinvar-ingest-dev","Release Date":"2024-11-03"}] From e4e4f781aacd16922fba2423a30492d8e3f43820 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Thu, 5 Dec 2024 14:45:14 -0500 Subject: [PATCH 09/15] Update comment on RcvAccessionClassification's parsing of the XML 'Classification.Description' field. --- clinvar_ingest/model/variation_archive.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/clinvar_ingest/model/variation_archive.py b/clinvar_ingest/model/variation_archive.py index 757238a..6d62cef 100644 --- a/clinvar_ingest/model/variation_archive.py +++ b/clinvar_ingest/model/variation_archive.py @@ -804,8 +804,14 @@ def from_xml_single(inp: dict, statement_type: StatementType, rcv_id: str): or OncogenicityClassification entry. The statement_type is the key from the original `Classifications` XML/dict, indicating the type. """ - # TODO is there a chance they add fields to Description? Maybe don't extract. - # raw_description = extract(inp, "Description") + # Don't extract Description because there's a chance more XML attributes could be added. + # TODO some SomaticClinicalImpact classifications have more than 1 Description element, causing this + # get() call to return an array rather than a single {} dict. The `"Description": [..]` key-value is then + # placed in the resulting `content` field of the classification, since it is non-empty, and the date_last_evaluated, + # num_submissions, interp_description, clinical_impact_assertion_type, and clinical_impact_clinical_significance + # fields are null. + # This is rare, and we are deferring handling this until it can be discussed with ClinVar. Possibly it will + # just have to be handled downstream. Or we can make a 'description' field that is an array of dicts/records. raw_description = get(inp, "Description") or {} return RcvAccessionClassification( rcv_id=rcv_id, From acf46f2e5b3a639fb764e3063f436b510eb918bc Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Tue, 10 Dec 2024 18:17:18 -0500 Subject: [PATCH 10/15] Add script to generate synthetic ftp-watcher-like messages for backfilled release files. Add bucket-backed-up xml file list, and vcv-backfill messages files. --- misc/re-run/generate-ftp-watcher-messages.py | 96 +++++++++++++++++++ .../re-run/prior_xml_archives-ftp-watcher.txt | 16 ++++ misc/re-run/rcv-archives.txt | 19 ++++ misc/re-run/replace-ftp-watcher-messages.py | 23 +++++ misc/re-run/vcv-archives.txt | 19 ++++ 5 files changed, 173 insertions(+) create mode 100644 misc/re-run/generate-ftp-watcher-messages.py create mode 100644 misc/re-run/prior_xml_archives-ftp-watcher.txt create mode 100644 misc/re-run/rcv-archives.txt create mode 100644 misc/re-run/replace-ftp-watcher-messages.py create mode 100644 misc/re-run/vcv-archives.txt diff --git a/misc/re-run/generate-ftp-watcher-messages.py b/misc/re-run/generate-ftp-watcher-messages.py new file mode 100644 index 0000000..193503d --- /dev/null +++ b/misc/re-run/generate-ftp-watcher-messages.py @@ -0,0 +1,96 @@ +import json +import re + +import google.cloud.storage as storage + +# e.g. +# gs://clinvar-ingest-dev/prior_xml_archives/vcv/ClinVarVCVRelease_2024-0126.xml.gz +bucket = "clinvar-ingest-dev" +prefix = "prior_xml_archives/vcv/" + +output_file = "prior_xml_archives-ftp-watcher.txt" + +gs_client = storage.Client() + + +def filename_to_release_date(filename: str) -> str | None: + """ + Extract the release date from a filename. + """ + # ClinVarVCVRelease_2024-0126.xml.gz + # date_str = filename.split("_")[-1].split(".")[0] + pattern = re.compile(r".*(\d{4})-(\d{2})(\d{2}).xml.gz") + if match := pattern.match(str(filename)): + year, month, day = match.groups() + return f"{year}-{month}-{day}" + return None + + +# List a bucket's objects with a prefix +def list_prefix(bucket, prefix) -> list[str]: + """ + List all objects in a bucket with a given prefix. + """ + blobs = gs_client.list_blobs(bucket, prefix=prefix) + return [blob.name for blob in blobs] + + +# e.g. +# print(list_prefix(bucket, prefix)) +# ['prior_xml_archives/vcv/', 'prior_xml_archives/vcv/ClinVarVCVRelease_2024-0126.xml.gz', 'prior_xml_archives/vcv/ClinVarVCVRelease_2024-0205.xml.gz', 'prior_xml_archives/vcv/ClinVarVCVRelease_2024-0214.xml.gz', 'prior_xml_archives/vcv/ClinVarVCVRelease_2024-0221.xml.gz', 'prior_xml_archives/vcv/ClinVarVCVRelease_2024-0229.xml.gz', 'prior_xml_archives/vcv/ClinVarVCVRelease_2024-0306.xml.gz', 'prior_xml_archives/vcv/ClinVarVCVRelease_2024-0311.xml.gz', 'prior_xml_archives/vcv/ClinVarVCVRelease_2024-0331.xml.gz', 'prior_xml_archives/vcv/ClinVarVCVRelease_2024-0502.xml.gz', 'prior_xml_archives/vcv/ClinVarVCVRelease_2024-0603.xml.gz', 'prior_xml_archives/vcv/ClinVarVCVRelease_2024-0611.xml.gz', 'prior_xml_archives/vcv/ClinVarVCVRelease_2024-0618.xml.gz', 'prior_xml_archives/vcv/ClinVarVCVRelease_2024-0624.xml.gz', 'prior_xml_archives/vcv/ClinVarVCVRelease_2024-0630.xml.gz', 'prior_xml_archives/vcv/ClinVarVCVRelease_2024-0708.xml.gz', 'prior_xml_archives/vcv/ClinVarVCVRelease_2024-0716.xml.gz'] + + +# Generate a list of (release-date, blob) tuples sorted by release-date +def get_release_file_pairs(bucket, prefix) -> list[tuple[str, storage.Blob]]: + """ + List all objects in a bucket with a given prefix, sorted by release date. + """ + blobs = gs_client.list_blobs(bucket, prefix=prefix) + release_date_blob = [] + for blob in blobs: + release_date = filename_to_release_date(blob.name) + print(f"file: {blob.name} release_date: {release_date}") + if release_date: + release_date_blob.append((release_date, blob)) + return sorted(release_date_blob, key=lambda date_name: date_name[0]) + + +release_file_pairs = get_release_file_pairs(bucket, prefix) +print(release_file_pairs) +output_records = [] +for release_date, blob in release_file_pairs: + print(release_date, blob.name) + + example = [ + { + "Name": "ClinVarRCVRelease_2024-1027.xml.gz", + "Size": 4561188864, + "Released": "2024-10-28 06:11:13", + "Last Modified": "2024-10-28 06:11:13", + "Directory": "/xml_archives/rcv", + "Host": "gs://clinvar-ingest-dev", + "Release Date": "2024-10-27", + } + ] + if blob.size is None: + raise ValueError(f"Blob {blob.name} size not loaded") + blob_basename = blob.name.split("/")[-1] + blob_prefix = "/".join(blob.name.split("/")[:-1]) + rec = [ + { + "Name": blob.name.split("/")[-1], + "Size": blob.size, + # Doesn't really matter, just gonna set it to noon on the release date + "Released": f"{release_date} 12:00:00", + "Last Modified": f"{release_date} 12:00:00", + "Directory": "/" + blob_prefix, + "Host": f"gs://{blob.bucket.name}", + "Release Date": release_date, + } + ] + print(json.dumps(rec, indent=2)) + output_records.append(rec) + +with open(output_file, "w") as f: + for rec in output_records: + f.write(json.dumps(rec) + "\n") diff --git a/misc/re-run/prior_xml_archives-ftp-watcher.txt b/misc/re-run/prior_xml_archives-ftp-watcher.txt new file mode 100644 index 0000000..55f7c45 --- /dev/null +++ b/misc/re-run/prior_xml_archives-ftp-watcher.txt @@ -0,0 +1,16 @@ +[{"Name": "ClinVarVCVRelease_2024-0126.xml.gz", "Size": 3327813651, "Released": "2024-01-26 12:00:00", "Last Modified": "2024-01-26 12:00:00", "Directory": "/prior_xml_archives/vcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-01-26"}] +[{"Name": "ClinVarVCVRelease_2024-0205.xml.gz", "Size": 3333454774, "Released": "2024-02-05 12:00:00", "Last Modified": "2024-02-05 12:00:00", "Directory": "/prior_xml_archives/vcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-02-05"}] +[{"Name": "ClinVarVCVRelease_2024-0214.xml.gz", "Size": 3456907794, "Released": "2024-02-14 12:00:00", "Last Modified": "2024-02-14 12:00:00", "Directory": "/prior_xml_archives/vcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-02-14"}] +[{"Name": "ClinVarVCVRelease_2024-0221.xml.gz", "Size": 3634035852, "Released": "2024-02-21 12:00:00", "Last Modified": "2024-02-21 12:00:00", "Directory": "/prior_xml_archives/vcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-02-21"}] +[{"Name": "ClinVarVCVRelease_2024-0229.xml.gz", "Size": 3785764596, "Released": "2024-02-29 12:00:00", "Last Modified": "2024-02-29 12:00:00", "Directory": "/prior_xml_archives/vcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-02-29"}] +[{"Name": "ClinVarVCVRelease_2024-0306.xml.gz", "Size": 3789468424, "Released": "2024-03-06 12:00:00", "Last Modified": "2024-03-06 12:00:00", "Directory": "/prior_xml_archives/vcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-03-06"}] +[{"Name": "ClinVarVCVRelease_2024-0311.xml.gz", "Size": 3789744705, "Released": "2024-03-11 12:00:00", "Last Modified": "2024-03-11 12:00:00", "Directory": "/prior_xml_archives/vcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-03-11"}] +[{"Name": "ClinVarVCVRelease_2024-0331.xml.gz", "Size": 3840552832, "Released": "2024-03-31 12:00:00", "Last Modified": "2024-03-31 12:00:00", "Directory": "/prior_xml_archives/vcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-03-31"}] +[{"Name": "ClinVarVCVRelease_2024-0502.xml.gz", "Size": 3966100229, "Released": "2024-05-02 12:00:00", "Last Modified": "2024-05-02 12:00:00", "Directory": "/prior_xml_archives/vcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-05-02"}] +[{"Name": "ClinVarVCVRelease_2024-0603.xml.gz", "Size": 3947826182, "Released": "2024-06-03 12:00:00", "Last Modified": "2024-06-03 12:00:00", "Directory": "/prior_xml_archives/vcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-06-03"}] +[{"Name": "ClinVarVCVRelease_2024-0611.xml.gz", "Size": 3945553452, "Released": "2024-06-11 12:00:00", "Last Modified": "2024-06-11 12:00:00", "Directory": "/prior_xml_archives/vcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-06-11"}] +[{"Name": "ClinVarVCVRelease_2024-0618.xml.gz", "Size": 3953874534, "Released": "2024-06-18 12:00:00", "Last Modified": "2024-06-18 12:00:00", "Directory": "/prior_xml_archives/vcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-06-18"}] +[{"Name": "ClinVarVCVRelease_2024-0624.xml.gz", "Size": 3951642917, "Released": "2024-06-24 12:00:00", "Last Modified": "2024-06-24 12:00:00", "Directory": "/prior_xml_archives/vcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-06-24"}] +[{"Name": "ClinVarVCVRelease_2024-0630.xml.gz", "Size": 3954379937, "Released": "2024-06-30 12:00:00", "Last Modified": "2024-06-30 12:00:00", "Directory": "/prior_xml_archives/vcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-06-30"}] +[{"Name": "ClinVarVCVRelease_2024-0708.xml.gz", "Size": 3954337104, "Released": "2024-07-08 12:00:00", "Last Modified": "2024-07-08 12:00:00", "Directory": "/prior_xml_archives/vcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-07-08"}] +[{"Name": "ClinVarVCVRelease_2024-0716.xml.gz", "Size": 3964266964, "Released": "2024-07-16 12:00:00", "Last Modified": "2024-07-16 12:00:00", "Directory": "/prior_xml_archives/vcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-07-16"}] diff --git a/misc/re-run/rcv-archives.txt b/misc/re-run/rcv-archives.txt new file mode 100644 index 0000000..b534895 --- /dev/null +++ b/misc/re-run/rcv-archives.txt @@ -0,0 +1,19 @@ +gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-0724.xml.gz +gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-0730.xml.gz +gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-0805.xml.gz +gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-0812.xml.gz +gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-0818.xml.gz +gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-0825.xml.gz +gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-0902.xml.gz +gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-0908.xml.gz +gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-0917.xml.gz +gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-1001.xml.gz +gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-1009.xml.gz +gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-1014.xml.gz +gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-1020.xml.gz +gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-1027.xml.gz +gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-1103.xml.gz +gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-1111.xml.gz +gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-1120.xml.gz +gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-1126.xml.gz +gs://clinvar-ingest-dev/xml_archives/rcv/ClinVarRCVRelease_2024-1201.xml.gz diff --git a/misc/re-run/replace-ftp-watcher-messages.py b/misc/re-run/replace-ftp-watcher-messages.py new file mode 100644 index 0000000..f67e85d --- /dev/null +++ b/misc/re-run/replace-ftp-watcher-messages.py @@ -0,0 +1,23 @@ +import json + +# actual_archives_file = "vcv-archives.txt" +ftp_watcher_file = "confluent-prod_clinvar-somatic-ftp-watcher_20241203.txt" +output_ftp_watcher_file = "updated_clinvar-somatic-ftp-watcher_20241203.txt" + +with open(ftp_watcher_file) as f: + ftp_messages = f.readlines() + ftp_messages = [json.loads(msg) for msg in ftp_messages] + +# with open(actual_archives_file) as f: +# actual_archives = f.readlines() + +output_records = [] + +for ftp_message in ftp_messages: + for msg in ftp_message: + msg["Host"] = "gs://clinvar-ingest-dev" + msg["Directory"] = "/xml_archives/vcv" + +with open(output_ftp_watcher_file, "w") as f: + for msg in ftp_messages: + f.write(json.dumps(msg) + "\n") diff --git a/misc/re-run/vcv-archives.txt b/misc/re-run/vcv-archives.txt new file mode 100644 index 0000000..6133099 --- /dev/null +++ b/misc/re-run/vcv-archives.txt @@ -0,0 +1,19 @@ +gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-0724.xml.gz +gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-0730.xml.gz +gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-0805.xml.gz +gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-0812.xml.gz +gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-0819.xml.gz +gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-0825.xml.gz +gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-0902.xml.gz +gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-0908.xml.gz +gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-0917.xml.gz +gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-0930.xml.gz +gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-1009.xml.gz +gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-1014.xml.gz +gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-1020.xml.gz +gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-1027.xml.gz +gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-1103.xml.gz +gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-1111.xml.gz +gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-1120.xml.gz +gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-1126.xml.gz +gs://clinvar-ingest-dev/xml_archives/vcv/ClinVarVCVRelease_2024-1201.xml.gz From 8ee4aa46a68abfb0824b13695558681c39448378 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Wed, 11 Dec 2024 01:48:40 -0500 Subject: [PATCH 11/15] Add job ids for 10-27 and 11-03 re-runs --- misc/re-run/execute-re-runs.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/misc/re-run/execute-re-runs.py b/misc/re-run/execute-re-runs.py index 21fc033..1157fa8 100644 --- a/misc/re-run/execute-re-runs.py +++ b/misc/re-run/execute-re-runs.py @@ -22,10 +22,24 @@ # file_format = "vcv" # ftp_watcher_file = "misc/re-run/ftp-watcher-rcv-2024-10-27.txt" +# ftp_watcher_file = "misc/re-run/ftp-watcher-rcv-2024-11-03.txt" +# instance_name = "clinvar-vcv-ingest" +# file_format = "rcv" + +# clinvar-vcv-ingest-kyle-dev-ncvkq +# ftp_watcher_file = "misc/re-run/ftp-watcher-rcv-2024-10-27.txt" +# clinvar-vcv-ingest-kyle-dev-qbh7x ftp_watcher_file = "misc/re-run/ftp-watcher-rcv-2024-11-03.txt" -instance_name = "clinvar-rcv-ingest" +instance_name = "clinvar-vcv-ingest-kyle-dev" file_format = "rcv" +# clinvar-vcv-ingest-kyle-dev-bl8w5 +# ftp_watcher_file = "misc/re-run/ftp-watcher-vcv-2024-10-27.txt" +# clinvar-vcv-ingest-kyle-dev-vn2md +# ftp_watcher_file = "misc/re-run/ftp-watcher-vcv-2024-11-03.txt" +# instance_name = "clinvar-vcv-ingest-kyle-dev" +# file_format = "vcv" + def run_command( cmd_array, From f13e2597a0ef74670dde220b7bc64c529fa28845 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Wed, 11 Dec 2024 23:18:59 -0500 Subject: [PATCH 12/15] parameterize bq_meta_dataset --- misc/re-run/execute-re-runs.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/misc/re-run/execute-re-runs.py b/misc/re-run/execute-re-runs.py index 1157fa8..13d3955 100644 --- a/misc/re-run/execute-re-runs.py +++ b/misc/re-run/execute-re-runs.py @@ -13,6 +13,7 @@ import subprocess region = "us-east1" +bq_meta_dataset = "clinvar_kyle" # ftp_watcher_file = "confluent-prod_clinvar-somatic-ftp-watcher_20241203.txt" @@ -88,7 +89,7 @@ def run_command( "CLINVAR_INGEST_SLACK_CHANNEL": "", # Override job variable to disable messaging "CLINVAR_INGEST_BUCKET": "clinvar-ingest-dev", # Already set on job, no need to override # "CLINVAR_INGEST_RELEASE_TAG": "v2_0_4_alpha", # Already set on job, no need to override - "CLINVAR_INGEST_BQ_META_DATASET": "clinvar_ingest", # Already set on job, no need to override + "CLINVAR_INGEST_BQ_META_DATASET": bq_meta_dataset, # Already set on job, no need to override "BQ_DEST_PROJECT": "clingen-dev", "file_format": file_format, # Set already on job but provide again for explicitness } From 6ecd42bb22765a3503b56d35c4fbea575fe38934 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Thu, 12 Dec 2024 02:14:50 -0500 Subject: [PATCH 13/15] Add prior backfilled rcv ftp watcher messages --- misc/re-run/generate-ftp-watcher-messages.py | 7 +++++-- .../prior_xml_archives-ftp-watcher-rcv.txt | 16 ++++++++++++++++ ...xt => prior_xml_archives-ftp-watcher-vcv.txt} | 0 3 files changed, 21 insertions(+), 2 deletions(-) create mode 100644 misc/re-run/prior_xml_archives-ftp-watcher-rcv.txt rename misc/re-run/{prior_xml_archives-ftp-watcher.txt => prior_xml_archives-ftp-watcher-vcv.txt} (100%) diff --git a/misc/re-run/generate-ftp-watcher-messages.py b/misc/re-run/generate-ftp-watcher-messages.py index 193503d..8064797 100644 --- a/misc/re-run/generate-ftp-watcher-messages.py +++ b/misc/re-run/generate-ftp-watcher-messages.py @@ -6,9 +6,12 @@ # e.g. # gs://clinvar-ingest-dev/prior_xml_archives/vcv/ClinVarVCVRelease_2024-0126.xml.gz bucket = "clinvar-ingest-dev" -prefix = "prior_xml_archives/vcv/" -output_file = "prior_xml_archives-ftp-watcher.txt" +# prefix = "prior_xml_archives/vcv/" +# output_file = "prior_xml_archives-ftp-watcher-vcv.txt" +prefix = "prior_xml_archives/rcv/" +output_file = "prior_xml_archives-ftp-watcher-rcv.txt" + gs_client = storage.Client() diff --git a/misc/re-run/prior_xml_archives-ftp-watcher-rcv.txt b/misc/re-run/prior_xml_archives-ftp-watcher-rcv.txt new file mode 100644 index 0000000..6c24293 --- /dev/null +++ b/misc/re-run/prior_xml_archives-ftp-watcher-rcv.txt @@ -0,0 +1,16 @@ +[{"Name": "ClinVarRCVRelease_2024-0126.xml.gz", "Size": 3731320657, "Released": "2024-01-26 12:00:00", "Last Modified": "2024-01-26 12:00:00", "Directory": "/prior_xml_archives/rcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-01-26"}] +[{"Name": "ClinVarRCVRelease_2024-0205.xml.gz", "Size": 3725688044, "Released": "2024-02-05 12:00:00", "Last Modified": "2024-02-05 12:00:00", "Directory": "/prior_xml_archives/rcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-02-05"}] +[{"Name": "ClinVarRCVRelease_2024-0214.xml.gz", "Size": 3845774746, "Released": "2024-02-14 12:00:00", "Last Modified": "2024-02-14 12:00:00", "Directory": "/prior_xml_archives/rcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-02-14"}] +[{"Name": "ClinVarRCVRelease_2024-0221.xml.gz", "Size": 4005019695, "Released": "2024-02-21 12:00:00", "Last Modified": "2024-02-21 12:00:00", "Directory": "/prior_xml_archives/rcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-02-21"}] +[{"Name": "ClinVarRCVRelease_2024-0229.xml.gz", "Size": 4156102695, "Released": "2024-02-29 12:00:00", "Last Modified": "2024-02-29 12:00:00", "Directory": "/prior_xml_archives/rcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-02-29"}] +[{"Name": "ClinVarRCVRelease_2024-0306.xml.gz", "Size": 4182097863, "Released": "2024-03-06 12:00:00", "Last Modified": "2024-03-06 12:00:00", "Directory": "/prior_xml_archives/rcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-03-06"}] +[{"Name": "ClinVarRCVRelease_2024-0311.xml.gz", "Size": 4188977687, "Released": "2024-03-11 12:00:00", "Last Modified": "2024-03-11 12:00:00", "Directory": "/prior_xml_archives/rcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-03-11"}] +[{"Name": "ClinVarRCVRelease_2024-0331.xml.gz", "Size": 4128001922, "Released": "2024-03-31 12:00:00", "Last Modified": "2024-03-31 12:00:00", "Directory": "/prior_xml_archives/rcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-03-31"}] +[{"Name": "ClinVarRCVRelease_2024-0502.xml.gz", "Size": 4374362900, "Released": "2024-05-02 12:00:00", "Last Modified": "2024-05-02 12:00:00", "Directory": "/prior_xml_archives/rcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-05-02"}] +[{"Name": "ClinVarRCVRelease_2024-0603.xml.gz", "Size": 4300907047, "Released": "2024-06-03 12:00:00", "Last Modified": "2024-06-03 12:00:00", "Directory": "/prior_xml_archives/rcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-06-03"}] +[{"Name": "ClinVarRCVRelease_2024-0610.xml.gz", "Size": 4342574098, "Released": "2024-06-10 12:00:00", "Last Modified": "2024-06-10 12:00:00", "Directory": "/prior_xml_archives/rcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-06-10"}] +[{"Name": "ClinVarRCVRelease_2024-0618.xml.gz", "Size": 4369820001, "Released": "2024-06-18 12:00:00", "Last Modified": "2024-06-18 12:00:00", "Directory": "/prior_xml_archives/rcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-06-18"}] +[{"Name": "ClinVarRCVRelease_2024-0624.xml.gz", "Size": 4367290812, "Released": "2024-06-24 12:00:00", "Last Modified": "2024-06-24 12:00:00", "Directory": "/prior_xml_archives/rcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-06-24"}] +[{"Name": "ClinVarRCVRelease_2024-0630.xml.gz", "Size": 4367291618, "Released": "2024-06-30 12:00:00", "Last Modified": "2024-06-30 12:00:00", "Directory": "/prior_xml_archives/rcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-06-30"}] +[{"Name": "ClinVarRCVRelease_2024-0708.xml.gz", "Size": 4369075715, "Released": "2024-07-08 12:00:00", "Last Modified": "2024-07-08 12:00:00", "Directory": "/prior_xml_archives/rcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-07-08"}] +[{"Name": "ClinVarRCVRelease_2024-0716.xml.gz", "Size": 4377392140, "Released": "2024-07-16 12:00:00", "Last Modified": "2024-07-16 12:00:00", "Directory": "/prior_xml_archives/rcv", "Host": "gs://clinvar-ingest-dev", "Release Date": "2024-07-16"}] diff --git a/misc/re-run/prior_xml_archives-ftp-watcher.txt b/misc/re-run/prior_xml_archives-ftp-watcher-vcv.txt similarity index 100% rename from misc/re-run/prior_xml_archives-ftp-watcher.txt rename to misc/re-run/prior_xml_archives-ftp-watcher-vcv.txt From a99c74e041bc3bbfdd99ae43add4d65e90c9af19 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Thu, 12 Dec 2024 02:22:24 -0500 Subject: [PATCH 14/15] Add jobids for new 2024-01-26 execution --- misc/re-run/execute-re-runs.py | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/misc/re-run/execute-re-runs.py b/misc/re-run/execute-re-runs.py index 13d3955..4aed14f 100644 --- a/misc/re-run/execute-re-runs.py +++ b/misc/re-run/execute-re-runs.py @@ -14,32 +14,21 @@ region = "us-east1" bq_meta_dataset = "clinvar_kyle" -# ftp_watcher_file = "confluent-prod_clinvar-somatic-ftp-watcher_20241203.txt" - -# ftp_watcher_file = "misc/re-run/ftp-watcher-vcv-2024-10-27.txt" -# ftp_watcher_file = "misc/re-run/ftp-watcher-vcv-2024-11-03.txt" -# instance_name = "clinvar-vcv-ingest" -# file_format = "vcv" +instance_name = "clinvar-vcv-ingest-kyle-dev" # ftp_watcher_file = "misc/re-run/ftp-watcher-rcv-2024-10-27.txt" # ftp_watcher_file = "misc/re-run/ftp-watcher-rcv-2024-11-03.txt" -# instance_name = "clinvar-vcv-ingest" +# job-id: clinvar-vcv-ingest-kyle-dev-fnhpd +# ftp_watcher_file = "misc/re-run/ftp-watcher-rcv-2024-01-26.txt" # file_format = "rcv" -# clinvar-vcv-ingest-kyle-dev-ncvkq -# ftp_watcher_file = "misc/re-run/ftp-watcher-rcv-2024-10-27.txt" -# clinvar-vcv-ingest-kyle-dev-qbh7x -ftp_watcher_file = "misc/re-run/ftp-watcher-rcv-2024-11-03.txt" -instance_name = "clinvar-vcv-ingest-kyle-dev" -file_format = "rcv" -# clinvar-vcv-ingest-kyle-dev-bl8w5 # ftp_watcher_file = "misc/re-run/ftp-watcher-vcv-2024-10-27.txt" -# clinvar-vcv-ingest-kyle-dev-vn2md # ftp_watcher_file = "misc/re-run/ftp-watcher-vcv-2024-11-03.txt" -# instance_name = "clinvar-vcv-ingest-kyle-dev" -# file_format = "vcv" +# job-id: clinvar-vcv-ingest-kyle-dev-cbjdv +ftp_watcher_file = "misc/re-run/ftp-watcher-vcv-2024-01-26.txt" +file_format = "vcv" def run_command( From 947feecca80ce8c12001580c091a38e78d29d9c5 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter Date: Mon, 16 Dec 2024 16:44:39 -0500 Subject: [PATCH 15/15] tweaks to scripts for re-running --- misc/bin/deploy-job.sh | 12 ++++++------ misc/re-run/execute-re-runs.py | 20 +++++++++----------- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/misc/bin/deploy-job.sh b/misc/bin/deploy-job.sh index 70ff9e6..0d33277 100644 --- a/misc/bin/deploy-job.sh +++ b/misc/bin/deploy-job.sh @@ -146,10 +146,10 @@ gcloud run jobs $command $instance_name \ if [[ $instance_name =~ ^.*bq-ingest.*$|^.*stored-procedures.*$ ]]; then # turn off file globbing set -f - gcloud scheduler jobs ${command} http ${instance_name} \ - --location ${region} \ - --uri=https://${region}-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/${project}/jobs/${instance_name}:run \ - --http-method POST \ - --oauth-service-account-email=$pipeline_service_account \ - --schedule='*/15 * * * *' + # gcloud scheduler jobs ${command} http ${instance_name} \ + # --location ${region} \ + # --uri=https://${region}-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/${project}/jobs/${instance_name}:run \ + # --http-method POST \ + # --oauth-service-account-email=$pipeline_service_account \ + # --schedule='*/15 * * * *' fi diff --git a/misc/re-run/execute-re-runs.py b/misc/re-run/execute-re-runs.py index 4aed14f..222824d 100644 --- a/misc/re-run/execute-re-runs.py +++ b/misc/re-run/execute-re-runs.py @@ -13,22 +13,20 @@ import subprocess region = "us-east1" -bq_meta_dataset = "clinvar_kyle" +bq_meta_dataset = "clinvar_ingest" -instance_name = "clinvar-vcv-ingest-kyle-dev" -# ftp_watcher_file = "misc/re-run/ftp-watcher-rcv-2024-10-27.txt" -# ftp_watcher_file = "misc/re-run/ftp-watcher-rcv-2024-11-03.txt" -# job-id: clinvar-vcv-ingest-kyle-dev-fnhpd +instance_name = "clinvar-rcv-ingest" # ftp_watcher_file = "misc/re-run/ftp-watcher-rcv-2024-01-26.txt" -# file_format = "rcv" +# ftp_watcher_file = "misc/re-run/prior_xml_archives-ftp-watcher-rcv-no1-26.txt" +ftp_watcher_file = "misc/re-run/prior_xml_archives-ftp-watcher-rcv.txt" +file_format = "rcv" -# ftp_watcher_file = "misc/re-run/ftp-watcher-vcv-2024-10-27.txt" -# ftp_watcher_file = "misc/re-run/ftp-watcher-vcv-2024-11-03.txt" -# job-id: clinvar-vcv-ingest-kyle-dev-cbjdv -ftp_watcher_file = "misc/re-run/ftp-watcher-vcv-2024-01-26.txt" -file_format = "vcv" +# instance_name = "clinvar-vcv-ingest" +# ftp_watcher_file = "misc/re-run/ftp-watcher-vcv-2024-01-26.txt" +# ftp_watcher_file = "misc/re-run/prior_xml_archives-ftp-watcher-vcv.txt" +# file_format = "vcv" def run_command(