Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add catvar_ndjsonifier and dockerfile for cloudrun that combines individual json files into one doc keyed by id #11

Merged
merged 2 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions misc/catvar_ndjsonifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import enum
import os
import pathlib
import gzip
import csv
import json
import sys
import time

import clinvar_gk_pilot
from clinvar_gk_pilot.gcs import (
list_blobs,
already_downloaded,
download_to_local_file,
_local_file_path_for,
)

# increase csv field size limit
csv.field_size_limit(sys.maxsize)


bucket_name = "clinvar-gk-pilot"

# folder_path = "2024-04-07/dev/catvar_output_v2/"
# output_file_name = "combined-catvar_output.ndjson.gz"

folder_path = "2024-04-07/dev/scv_output_v2/"
output_file_name = "combined-scv_output.ndjson.gz"

blob_uris = list_blobs(bucket_name, folder_path)
blob_uris = [blob for blob in blob_uris if not blob.endswith("/")]
for blob in blob_uris:
print(blob)
local_paths = []
# Download all files
print("Downloading files...")
expected_local_paths = [_local_file_path_for(blob) for blob in blob_uris]
for expected_local_path, blob_uri in zip(expected_local_paths, blob_uris):
if not os.path.exists(expected_local_path):
local_paths.append(download_to_local_file(blob_uri))
else:
local_paths.append(expected_local_path)
# for blob in blob_uris:
# if not already_downloaded(blob):
# print(f"Downloading {blob}...")
# local_paths.append(download_to_local_file(blob))
# else:
# print(f"Already downloaded {blob}")
# local_paths.append(_local_file_path_for(blob))

# sys.exit(0)

output_lines_count = 0
last_logged_output_count_time = time.time()
last_logged_output_count_value = 0


with gzip.open(output_file_name, "wt", compresslevel=9) as f_out:
for file_idx, file_path in enumerate(local_paths):
print(f"Reading {file_path} ({file_idx + 1}/{len(local_paths)})...")
try:
with gzip.open(file_path, "rt") as f_in:
reader = csv.reader(f_in)
for i, row in enumerate(reader):
assert (
len(row) == 1
), f"row {i} of file {file_path} had more than 1 column! ({len(row)} columns) {row}"
obj = json.loads(row[0])
assert (
len(obj) == 1
), f"row {i} of file {file_path} had more than 1 key! ({len(obj)} keys) {obj}"

# Write key and value
key, value = list(obj.items())[0]
assert isinstance(
key, str
), f"key {key} on line {i} of file {file_path} is not a string!"

f_out.write(json.dumps(value))
f_out.write("\n")
output_lines_count += 1
now = time.time()
if now - last_logged_output_count_time > 5:
new_lines = output_lines_count - last_logged_output_count_value
print(
f"Outputs written: {output_lines_count} ({new_lines/5:.2f} lines/s)"
)
last_logged_output_count_value = output_lines_count
last_logged_output_count_time = now
except Exception as e:
print(f"Exception while reading {file_path}: {e}")
raise e

print(f"Wrote {output_file_name} successfully!")
23 changes: 23 additions & 0 deletions misc/combination/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Use a smaller base image
FROM python:3.9-slim AS build
WORKDIR /app

# Copy requirements file and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Final image
FROM python:3.9-slim
WORKDIR /app

# Copy dependencies from the build stage
COPY --from=build /usr/local/lib/python3.9/site-packages/ /usr/local/lib/python3.9/site-packages/

# Copy the application code
COPY combine-files.py .

# Set environment variables if needed
ENV PYTHONUNBUFFERED=1

# Command to run the Flask app
CMD ["python","combine-files.py"]
191 changes: 191 additions & 0 deletions misc/combination/combine-files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import csv
from dataclasses import dataclass
import json
import os
import re
import gzip
import sys
import time
# from flask import Flask, request, jsonify
from google.cloud import storage

# increase csv field size limit
csv.field_size_limit(sys.maxsize)

# app = Flask(__name__)


@dataclass()
class Env:
bucket_name: str
folder_path: str
file_pattern: str
output_file_path: str
output_blob_path: str

def __init__(self):
self.bucket_name = os.getenv("bucket_name")
self.folder_path = os.getenv("folder_path")
self.file_pattern = os.getenv("file_pattern")
self.output_file_path = os.getenv("output_file_path")
self.output_blob_path = os.getenv("output_blob_path")


def _open(file_path, mode):
if file_path.startswith("gs://"):
return storage.open(file_path, mode)

if file_path.endswith(".gz"):
return gzip.open(file_path, mode)
return open(file_path, mode)


class NDJson:

def __init__(self, file_path):
self.file_path = file_path

def __enter__(self):
self.file = open(self.file_path, "w")
self.file.write("[\n")
return self

def write(self, obj):
self.file.write(json.dumps(obj) + "\n")

def __exit__(self, exc_type, exc_value, traceback):
self.file.write("]\n")
self.file.close()


def combine_files(bucket_name, folder_path, file_pattern, output_file_path, output_blob_path=None):

# Initialize Google Cloud Storage client
client = storage.Client()

# Get the bucket
bucket = client.get_bucket(bucket_name)

if bucket is None:
print(f"{bucket_name} bucket not found.")
return

# List all files in the folder matching the file pattern
blobs = bucket.list_blobs(prefix=folder_path)

if blobs is None:
print(f"No blobs found in {folder_path}.")
return

files_to_combine = [blob.name for blob in blobs if re.match(
file_pattern, os.path.basename(blob.name))]

if len(files_to_combine) == 0:
print(f"No files found matching pattern {file_pattern} to combine.")
return

# Logging stuff
output_keys_count = 0
last_logged_output_count_time = time.time()
last_logged_output_count_value = 0

with gzip.open(output_file_path, 'wt') as f_out:
f_out.write("{\n")

# Iterate over each file
for file_name in files_to_combine:
print(f"Processing file: {file_name}")
blob = bucket.get_blob(file_name)
with gzip.open(blob.open("rb"), 'rt') as f_in:
reader = csv.reader(f_in)
is_first_row = True
for i, row in enumerate(reader):
assert (
len(row) == 1
), f"row {i} of file {file_name} had more than 1 column! ({len(row)} columns) {row}"
obj = json.loads(row[0])
assert (
len(obj) == 1
), f"row {i} of file {file_name} had more than 1 key! ({len(obj)} keys) {obj}"

# Write key and value
key, value = list(obj.items())[0]
assert isinstance(
key, str
), f"key {key} on line {i} of file {file_name} is not a string!"

if not is_first_row:
f_out.write(",\n")
f_out.write(" ")
f_out.write(f'"{key}": ')
f_out.write(json.dumps(value))
is_first_row = False

# Progress logging
output_keys_count += 1
now = time.time()
if now - last_logged_output_count_time > 5:
new_lines = output_keys_count - last_logged_output_count_value
print(
f"Output keys written: {output_keys_count} ({new_lines/5:.2f} lines/s)"
)
last_logged_output_count_value = output_keys_count
last_logged_output_count_time = now

f_out.write("\n}\n")

print(f"Combined file {output_file_path} created successfully.")

if output_blob_path:
# Upload the combined file to the output_blob_uri
blob = bucket.blob(output_blob_path)
blob.upload_from_filename(output_file_path)

print(
f"Combined file {output_file_path} uploaded to {output_blob_path}."
)


# @app.route('/')
# def combine_files_http():
# # Get query parameters
# bucket_name = request.args.get('bucket_name')
# folder_path = request.args.get('folder_path')
# file_pattern = request.args.get('file_pattern')
# output_file_path = request.args.get('output_file_path')
# output_blob_path = request.args.get('output_blob_path', default=None)

# print(f"bucket_name: {bucket_name}, "
# f"folder_path: {folder_path}, "
# f"file_pattern: {file_pattern}, "
# f"output_file_path: {output_file_path}, "
# f"output_blob_path: {output_blob_path}")

# # Call the function to combine files
# combine_files(bucket_name, folder_path, file_pattern,
# output_file_path, output_blob_path)

# ret = {'message': 'Combined file created successfully.'}
# if output_blob_path:
# ret['output_blob_path'] = f"gs://{bucket_name}/{output_blob_path}"

# print(json.dumps(ret))
# return jsonify(ret)


if __name__ == '__main__':
# app.run(debug=True, host="0.0.0.0")
env = Env()
print(f"bucket_name: {env.bucket_name}, "
f"folder_path: {env.folder_path}, "
f"file_pattern: {env.file_pattern}, "
f"output_file_path: {env.output_file_path}, "
f"output_blob_path: {env.output_blob_path}")

combine_files(
bucket_name=env.bucket_name,
folder_path=env.folder_path,
file_pattern=env.file_pattern,
output_file_path=env.output_file_path,
output_blob_path=env.output_blob_path
)
2 changes: 2 additions & 0 deletions misc/combination/requirements.txt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@theferrit32 I'm more than a little confused on how one would go about using this. Maybe you could give a review?

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Flask
google-cloud-storage
40 changes: 40 additions & 0 deletions misc/splitlines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import argparse
import contextlib
import gzip
import os
import shutil
import sys


def split(input_filename, output_directory, partitions):
with gzip.open(input_filename, "rt", encoding="utf-8") as f:
filenames = [f"part-{i}.ndjson.gz" for i in range(partitions)]
file_paths = [
os.path.join(output_directory, filename) for filename in filenames
]
with contextlib.ExitStack() as stack:
files = [
stack.enter_context(gzip.open(file_path, "wt"))
for file_path in file_paths
]
for i, line in enumerate(f):
file_idx = i % partitions
files[file_idx].write(line)


def main(args=sys.argv[1:]):
parser = argparse.ArgumentParser()
parser.add_argument("input_filename")
parser.add_argument("output_directory")
parser.add_argument("partitions", type=int)
args = parser.parse_args(args)

if os.path.exists(args.output_directory):
shutil.rmtree(args.output_directory)
os.makedirs(args.output_directory, exist_ok=True)

return split(args.input_filename, args.output_directory, args.partitions)


if __name__ == "__main__":
main()