Skip to content

Commit

Permalink
feat: 🔥 Postprocess parquet: images field (#6)
Browse files Browse the repository at this point in the history
* feat: 🔥 Postprocess parquet: images field

Images field is a nested JSON that is not recognized by DuckDB / Parquet during the export. Using Pyarrow, we re-structure the field and its schema

* fix: 🐛 Fix

* style: 🎨 Add notes to code
  • Loading branch information
jeremyarancio authored Nov 13, 2024
1 parent 9e1b871 commit 48e21b3
Show file tree
Hide file tree
Showing 4 changed files with 675 additions and 58 deletions.
170 changes: 159 additions & 11 deletions openfoodfacts_exports/exports/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
from pathlib import Path

import duckdb
import pyarrow as pa
import pyarrow.parquet as pq
from pydantic import BaseModel, model_validator
from huggingface_hub import HfApi

from openfoodfacts_exports import settings
from openfoodfacts_exports.utils import timer

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -152,32 +156,110 @@
;
"""

_SIZE_SCHEMA = pa.struct(
[
pa.field("h", pa.int32(), nullable=True),
pa.field("w", pa.int32(), nullable=True),
]
)

IMAGES_DATATYPE = pa.list_(
pa.struct(
[
pa.field("key", pa.string(), nullable=True),
pa.field("imgid", pa.int32(), nullable=True),
pa.field(
"sizes",
pa.struct(
[
pa.field("100", _SIZE_SCHEMA, nullable=True),
pa.field("200", _SIZE_SCHEMA, nullable=True),
pa.field("400", _SIZE_SCHEMA, nullable=True),
pa.field("full", _SIZE_SCHEMA, nullable=True),
]
),
nullable=True,
),
pa.field("uploaded_t", pa.int64(), nullable=True),
pa.field("uploader", pa.string(), nullable=True),
]
)
)

ALLOWED_IMAGE_SIZE_KEYS = {"100", "200", "400", "full"}

SCHEMAS = {"images": IMAGES_DATATYPE}


class ImageSize(BaseModel):
h: int | None = None
w: int | None = None


class Image(BaseModel):
"""`Images` schema for postprocessing used for field postprocessing."""

key: str
sizes: dict[str, ImageSize | None]
uploaded_t: int | None = None
imgid: int | None = None
uploader: str | None = None

@model_validator(mode="after")
def ignore_extra_sizes(self):
"""Literal doesn't accept extra values, returning an error in case of additional
keys.
"""
self.sizes = {
k: v for k, v in self.sizes.items() if k in ALLOWED_IMAGE_SIZE_KEYS
}
return self

@model_validator(mode="before")
@classmethod
def parse_int_from_string(cls, data: dict):
"""Some int are considered as string like '"1517312996"', leading to
int parsing issues
"""
imgid = data.get("imgid")
uploaded_t = data.get("uploaded_t")
if imgid and isinstance(imgid, str):
data.update({"imgid": imgid.strip('"')})
if uploaded_t and isinstance(uploaded_t, str):
data.update({"uploaded_t": uploaded_t.strip('"')})
return data


def export_parquet(dataset_path: Path, output_path: Path) -> None:
"""Convert a JSONL dataset to Parquet format and push it to Hugging Face
Hub."""
logger.info("Starting conversion of JSONL to Parquet")
Hub.
"""
logger.info("Start JSONL export to Parquet.")
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_file_path = Path(tmp_dir) / "converted_data.parquet"
tmp_converted_parquet_path = Path(tmp_dir) / "converted_data.parquet"
tmp_postprocessed_parquet_path = Path(tmp_dir) / "postprocessed_data.parquet"
convert_jsonl_to_parquet(
output_file_path=tmp_file_path, dataset_path=dataset_path
output_file_path=tmp_converted_parquet_path, dataset_path=dataset_path
)
postprocess_parquet(
parquet_path=tmp_converted_parquet_path,
output_path=tmp_postprocessed_parquet_path,
)
# Move dataset file to output_path
shutil.move(tmp_file_path, output_path)
shutil.move(tmp_postprocessed_parquet_path, output_path)

if settings.ENABLE_HF_PUSH:
push_parquet_file_to_hf(data_path=output_path)
else:
logger.info("Hugging Face push is disabled.")

logger.info("JSONL to Parquet conversion completed.")
logger.info("JSONL to Parquet conversion and postprocessing completed.")


def convert_jsonl_to_parquet(
output_file_path: Path,
dataset_path: Path,
) -> None:
logger.info("Start JSONL to Parquet conversion process.")
logger.info("Start conversion from JSONL to Parquet.")
if not dataset_path.exists():
raise FileNotFoundError(f"{str(dataset_path)} was not found.")
query = SQL_QUERY.replace("{dataset_path}", str(dataset_path)).replace(
Expand All @@ -186,7 +268,10 @@ def convert_jsonl_to_parquet(
try:
duckdb.sql(query)
except duckdb.Error as e:
logger.error(f"Error executing query: {query}\nError message: {e}")
logger.error(
"Error executing query: %s \nError message: %s",
query, e
)
raise
logger.info("JSONL successfully converted into Parquet file.")

Expand All @@ -197,7 +282,7 @@ def push_parquet_file_to_hf(
revision: str = "main",
commit_message: str = "Database updated",
) -> None:
logger.info(f"Start pushing data to Hugging Face at {repo_id}")
logger.info("Start pushing data to Hugging Face at %s", repo_id)
if not data_path.exists():
raise FileNotFoundError(f"Data is missing: {data_path}")
if data_path.suffix != ".parquet":
Expand All @@ -212,4 +297,67 @@ def push_parquet_file_to_hf(
path_in_repo="products.parquet",
commit_message=commit_message,
)
logger.info(f"Data succesfully pushed to Hugging Face at {repo_id}")
logger.info("Data succesfully pushed to Hugging Face at %s", repo_id)


@timer
def postprocess_parquet(
parquet_path: Path, output_path: Path, batch_size: int = 10000
) -> None:
logger.info("Start postprocessing parquet")
parquet_file = pq.ParquetFile(parquet_path)
updated_schema = update_schema(parquet_file.schema.to_arrow_schema())
with pq.ParquetWriter(output_path, schema=updated_schema) as writer:
for batch in parquet_file.iter_batches(batch_size=batch_size):
batch = postprocess_arrow_batch(batch)
writer.write_batch(batch)
logger.info("Parquet post processing done.")


def update_schema(schema: pa.Schema) -> pa.Schema:
for field_name, field_datatype in SCHEMAS.items():
schema = _update_schema_by_field(
schema=schema, field_name=field_name, field_datatype=field_datatype
)
return schema


def _update_schema_by_field(
schema: pa.Schema, field_name: str, field_datatype: pa.DataType
) -> pa.schema:
field_index = schema.get_field_index(field_name)
schema = schema.remove(field_index)
schema = schema.insert(field_index, pa.field(field_name, field_datatype))
return schema


def postprocess_arrow_batch(batch: pa.RecordBatch) -> pa.RecordBatch:
"""Add new processing features here."""
batch = postprocess_images(batch)
return batch


def postprocess_images(
batch: pa.RecordBatch, datatype: pa.DataType = IMAGES_DATATYPE
) -> pa.RecordBatch:
"""The `Images` field is a nested JSON with inconsistent data type.
We extract and structure the data as a list of dict using Pydantic.
Each dict corresponds to an image from the same product.
### Notes:
The process is quite long (20 - 30 min).
Possibilities: concurrency, pyarrow.compute, ...
"""
# Duckdb converted the json filed into a map_array:
# https://arrow.apache.org/docs/python/generated/pyarrow.MapArray.html#pyarrow-maparray
postprocessed_images = [
[Image(key=key, **value).model_dump() for key, value in image] if image else []
for image in batch["images"].to_pylist()
]
images_col_index = batch.schema.get_field_index("images")
batch = batch.set_column(
images_col_index,
pa.field("images", datatype),
pa.array(postprocessed_images, type=datatype),
)
return batch
14 changes: 14 additions & 0 deletions openfoodfacts_exports/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import time

import sentry_sdk
import toml
Expand All @@ -10,6 +11,9 @@
from openfoodfacts_exports import settings


logger = logging.getLogger(__name__)


def init_sentry(integrations: list[Integration] | None = None):
if settings.SENTRY_DSN:
integrations = integrations or []
Expand Down Expand Up @@ -37,3 +41,13 @@ def get_package_version() -> str:
def get_minio_client() -> Minio:
"""Return a Minio client with AWS credentials from environment."""
return Minio("s3.amazonaws.com", credentials=EnvAWSProvider())


def timer(func):
def wrapper(*args, **kwargs):
timestamp= time.time()
output = func(*args, **kwargs)
latency = time.time() - timestamp
logger.info(f"Latency of {func.__name__}: {latency:.2f} seconds")
return output
return wrapper
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"apscheduler>=3.10.4",
"duckdb>=1.1.3",
"duckdb==1.1.2",
"huggingface-hub>=0.26.2",
"minio>=7.2.10",
"openfoodfacts>=1.1.5",
"pyarrow>=18.0.0",
"pytz>=2024.2",
"requests>=2.32.3",
"rq>=2.0.0",
Expand All @@ -31,6 +32,7 @@ max-doc-length = 88
[dependency-groups]
dev = [
"coverage[toml]>=7.6.4",
"ipykernel>=6.29.5",
"pre-commit>=4.0.1",
"pytest-cov>=6.0.0",
"pytest>=8.3.3",
Expand Down
Loading

0 comments on commit 48e21b3

Please sign in to comment.