diff --git a/pipeline/utils/convert_to_parquet.py b/pipeline/utils/convert_to_parquet.py index 65661e16..0c3f077c 100644 --- a/pipeline/utils/convert_to_parquet.py +++ b/pipeline/utils/convert_to_parquet.py @@ -6,6 +6,7 @@ import argparse import orjson import dask.dataframe as dd +from concurrent.futures import ThreadPoolExecutor, as_completed def process_images(base64_str, resize_res=-1): import base64 @@ -44,21 +45,40 @@ def convert_json_to_parquet(input_path, output_path, max_partition_size): data = f.read() data_dict = orjson.loads(data) - # Estimate the size of the JSON dictionary in bytes total_size = len(data) print(f"Total size of the JSON data: {total_size} bytes") - # Calculate the number of partitions needed - nparitions = max(1, total_size // max_partition_size) + nparitions = int(max(1, total_size // max_partition_size)) print(f"Number of partitions: {nparitions}") - + resized_data_dict = {} dropped_keys = [] - for key, value in tqdm(data_dict.items(), desc=f"Processing {input_path}"): + + # Initialize the progress bar + progress_bar = tqdm(total=len(data_dict), unit="item", desc="Processing items") + + # Define a function to process a single item and update the progress bar + def process_item(key, value): if isinstance(value, list): value = value[0] resized_base64 = process_images(value) - resized_data_dict[key] = resized_base64 + progress_bar.update(1) # Update the progress bar here + return key, resized_base64 + + with ThreadPoolExecutor(max_workers=256) as executor: + future_to_key = {executor.submit(process_item, key, value): key for key, value in data_dict.items()} + + for future in as_completed(future_to_key): + key = future_to_key[future] + try: + resized_data_dict[key] = future.result() + except Exception as e: + print(f"Warning: Failed to process key {key}. Error: {e}") + dropped_keys.append(key) + progress_bar.update(1) # Update the progress bar for failed items as well + + # Close the progress bar after all tasks are done + progress_bar.close() ddf = dd.from_pandas(pd.DataFrame.from_dict(resized_data_dict, orient="index", columns=["base64"]), npartitions=nparitions) ddf.to_parquet(output_path, engine="pyarrow")