diff --git a/misc/catvar_ndjsonifier.py b/misc/catvar_ndjsonifier.py new file mode 100644 index 0000000..eed0776 --- /dev/null +++ b/misc/catvar_ndjsonifier.py @@ -0,0 +1,94 @@ +import enum +import os +import pathlib +import gzip +import csv +import json +import sys +import time + +import clinvar_gk_pilot +from clinvar_gk_pilot.gcs import ( + list_blobs, + already_downloaded, + download_to_local_file, + _local_file_path_for, +) + +# increase csv field size limit +csv.field_size_limit(sys.maxsize) + + +bucket_name = "clinvar-gk-pilot" + +# folder_path = "2024-04-07/dev/catvar_output_v2/" +# output_file_name = "combined-catvar_output.ndjson.gz" + +folder_path = "2024-04-07/dev/scv_output_v2/" +output_file_name = "combined-scv_output.ndjson.gz" + +blob_uris = list_blobs(bucket_name, folder_path) +blob_uris = [blob for blob in blob_uris if not blob.endswith("/")] +for blob in blob_uris: + print(blob) +local_paths = [] +# Download all files +print("Downloading files...") +expected_local_paths = [_local_file_path_for(blob) for blob in blob_uris] +for expected_local_path, blob_uri in zip(expected_local_paths, blob_uris): + if not os.path.exists(expected_local_path): + local_paths.append(download_to_local_file(blob_uri)) + else: + local_paths.append(expected_local_path) +# for blob in blob_uris: +# if not already_downloaded(blob): +# print(f"Downloading {blob}...") +# local_paths.append(download_to_local_file(blob)) +# else: +# print(f"Already downloaded {blob}") +# local_paths.append(_local_file_path_for(blob)) + +# sys.exit(0) + +output_lines_count = 0 +last_logged_output_count_time = time.time() +last_logged_output_count_value = 0 + + +with gzip.open(output_file_name, "wt", compresslevel=9) as f_out: + for file_idx, file_path in enumerate(local_paths): + print(f"Reading {file_path} ({file_idx + 1}/{len(local_paths)})...") + try: + with gzip.open(file_path, "rt") as f_in: + reader = csv.reader(f_in) + for i, row in enumerate(reader): + assert ( + len(row) == 1 + ), f"row {i} of file {file_path} had more than 1 column! ({len(row)} columns) {row}" + obj = json.loads(row[0]) + assert ( + len(obj) == 1 + ), f"row {i} of file {file_path} had more than 1 key! ({len(obj)} keys) {obj}" + + # Write key and value + key, value = list(obj.items())[0] + assert isinstance( + key, str + ), f"key {key} on line {i} of file {file_path} is not a string!" + + f_out.write(json.dumps(value)) + f_out.write("\n") + output_lines_count += 1 + now = time.time() + if now - last_logged_output_count_time > 5: + new_lines = output_lines_count - last_logged_output_count_value + print( + f"Outputs written: {output_lines_count} ({new_lines/5:.2f} lines/s)" + ) + last_logged_output_count_value = output_lines_count + last_logged_output_count_time = now + except Exception as e: + print(f"Exception while reading {file_path}: {e}") + raise e + +print(f"Wrote {output_file_name} successfully!") diff --git a/misc/combination/Dockerfile b/misc/combination/Dockerfile new file mode 100644 index 0000000..09c443d --- /dev/null +++ b/misc/combination/Dockerfile @@ -0,0 +1,23 @@ +# Use a smaller base image +FROM python:3.9-slim AS build +WORKDIR /app + +# Copy requirements file and install dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Final image +FROM python:3.9-slim +WORKDIR /app + +# Copy dependencies from the build stage +COPY --from=build /usr/local/lib/python3.9/site-packages/ /usr/local/lib/python3.9/site-packages/ + +# Copy the application code +COPY combine-files.py . + +# Set environment variables if needed +ENV PYTHONUNBUFFERED=1 + +# Command to run the Flask app +CMD ["python","combine-files.py"] diff --git a/misc/combination/combine-files.py b/misc/combination/combine-files.py new file mode 100644 index 0000000..2e54d75 --- /dev/null +++ b/misc/combination/combine-files.py @@ -0,0 +1,191 @@ +import csv +from dataclasses import dataclass +import json +import os +import re +import gzip +import sys +import time +# from flask import Flask, request, jsonify +from google.cloud import storage + +# increase csv field size limit +csv.field_size_limit(sys.maxsize) + +# app = Flask(__name__) + + +@dataclass() +class Env: + bucket_name: str + folder_path: str + file_pattern: str + output_file_path: str + output_blob_path: str + + def __init__(self): + self.bucket_name = os.getenv("bucket_name") + self.folder_path = os.getenv("folder_path") + self.file_pattern = os.getenv("file_pattern") + self.output_file_path = os.getenv("output_file_path") + self.output_blob_path = os.getenv("output_blob_path") + + +def _open(file_path, mode): + if file_path.startswith("gs://"): + return storage.open(file_path, mode) + + if file_path.endswith(".gz"): + return gzip.open(file_path, mode) + return open(file_path, mode) + + +class NDJson: + + def __init__(self, file_path): + self.file_path = file_path + + def __enter__(self): + self.file = open(self.file_path, "w") + self.file.write("[\n") + return self + + def write(self, obj): + self.file.write(json.dumps(obj) + "\n") + + def __exit__(self, exc_type, exc_value, traceback): + self.file.write("]\n") + self.file.close() + + +def combine_files(bucket_name, folder_path, file_pattern, output_file_path, output_blob_path=None): + + # Initialize Google Cloud Storage client + client = storage.Client() + + # Get the bucket + bucket = client.get_bucket(bucket_name) + + if bucket is None: + print(f"{bucket_name} bucket not found.") + return + + # List all files in the folder matching the file pattern + blobs = bucket.list_blobs(prefix=folder_path) + + if blobs is None: + print(f"No blobs found in {folder_path}.") + return + + files_to_combine = [blob.name for blob in blobs if re.match( + file_pattern, os.path.basename(blob.name))] + + if len(files_to_combine) == 0: + print(f"No files found matching pattern {file_pattern} to combine.") + return + + # Logging stuff + output_keys_count = 0 + last_logged_output_count_time = time.time() + last_logged_output_count_value = 0 + + with gzip.open(output_file_path, 'wt') as f_out: + f_out.write("{\n") + + # Iterate over each file + for file_name in files_to_combine: + print(f"Processing file: {file_name}") + blob = bucket.get_blob(file_name) + with gzip.open(blob.open("rb"), 'rt') as f_in: + reader = csv.reader(f_in) + is_first_row = True + for i, row in enumerate(reader): + assert ( + len(row) == 1 + ), f"row {i} of file {file_name} had more than 1 column! ({len(row)} columns) {row}" + obj = json.loads(row[0]) + assert ( + len(obj) == 1 + ), f"row {i} of file {file_name} had more than 1 key! ({len(obj)} keys) {obj}" + + # Write key and value + key, value = list(obj.items())[0] + assert isinstance( + key, str + ), f"key {key} on line {i} of file {file_name} is not a string!" + + if not is_first_row: + f_out.write(",\n") + f_out.write(" ") + f_out.write(f'"{key}": ') + f_out.write(json.dumps(value)) + is_first_row = False + + # Progress logging + output_keys_count += 1 + now = time.time() + if now - last_logged_output_count_time > 5: + new_lines = output_keys_count - last_logged_output_count_value + print( + f"Output keys written: {output_keys_count} ({new_lines/5:.2f} lines/s)" + ) + last_logged_output_count_value = output_keys_count + last_logged_output_count_time = now + + f_out.write("\n}\n") + + print(f"Combined file {output_file_path} created successfully.") + + if output_blob_path: + # Upload the combined file to the output_blob_uri + blob = bucket.blob(output_blob_path) + blob.upload_from_filename(output_file_path) + + print( + f"Combined file {output_file_path} uploaded to {output_blob_path}." + ) + + +# @app.route('/') +# def combine_files_http(): +# # Get query parameters +# bucket_name = request.args.get('bucket_name') +# folder_path = request.args.get('folder_path') +# file_pattern = request.args.get('file_pattern') +# output_file_path = request.args.get('output_file_path') +# output_blob_path = request.args.get('output_blob_path', default=None) + +# print(f"bucket_name: {bucket_name}, " +# f"folder_path: {folder_path}, " +# f"file_pattern: {file_pattern}, " +# f"output_file_path: {output_file_path}, " +# f"output_blob_path: {output_blob_path}") + +# # Call the function to combine files +# combine_files(bucket_name, folder_path, file_pattern, +# output_file_path, output_blob_path) + +# ret = {'message': 'Combined file created successfully.'} +# if output_blob_path: +# ret['output_blob_path'] = f"gs://{bucket_name}/{output_blob_path}" + +# print(json.dumps(ret)) +# return jsonify(ret) + + +if __name__ == '__main__': + # app.run(debug=True, host="0.0.0.0") + env = Env() + print(f"bucket_name: {env.bucket_name}, " + f"folder_path: {env.folder_path}, " + f"file_pattern: {env.file_pattern}, " + f"output_file_path: {env.output_file_path}, " + f"output_blob_path: {env.output_blob_path}") + + combine_files( + bucket_name=env.bucket_name, + folder_path=env.folder_path, + file_pattern=env.file_pattern, + output_file_path=env.output_file_path, + output_blob_path=env.output_blob_path + ) diff --git a/misc/combination/requirements.txt b/misc/combination/requirements.txt new file mode 100644 index 0000000..3744fab --- /dev/null +++ b/misc/combination/requirements.txt @@ -0,0 +1,2 @@ +Flask +google-cloud-storage \ No newline at end of file diff --git a/misc/splitlines.py b/misc/splitlines.py new file mode 100644 index 0000000..8be706d --- /dev/null +++ b/misc/splitlines.py @@ -0,0 +1,40 @@ +import argparse +import contextlib +import gzip +import os +import shutil +import sys + + +def split(input_filename, output_directory, partitions): + with gzip.open(input_filename, "rt", encoding="utf-8") as f: + filenames = [f"part-{i}.ndjson.gz" for i in range(partitions)] + file_paths = [ + os.path.join(output_directory, filename) for filename in filenames + ] + with contextlib.ExitStack() as stack: + files = [ + stack.enter_context(gzip.open(file_path, "wt")) + for file_path in file_paths + ] + for i, line in enumerate(f): + file_idx = i % partitions + files[file_idx].write(line) + + +def main(args=sys.argv[1:]): + parser = argparse.ArgumentParser() + parser.add_argument("input_filename") + parser.add_argument("output_directory") + parser.add_argument("partitions", type=int) + args = parser.parse_args(args) + + if os.path.exists(args.output_directory): + shutil.rmtree(args.output_directory) + os.makedirs(args.output_directory, exist_ok=True) + + return split(args.input_filename, args.output_directory, args.partitions) + + +if __name__ == "__main__": + main()