Skip to content

Commit

Permalink
Update Glides Parquet Files (#397)
Browse files Browse the repository at this point in the history
* Update Glides Parquet Files

In the initial deployment of glides ingestion, location fields that
include the gtfsId and todsId fields incorrectly used ID instead of Id.
Correct this in the schemas for each of our record types and create a
migration that will modify existing data to the new schema so that old
and new datasets can be joined correctly.

While investigating, I also noticed that there were duplicate records
in our files stemming from restarting the application and re-ingesting
data from the past 24 hours. In the migration and in the
GlidesConverter class, deduplicate the datasets based on the 'id'
column.
  • Loading branch information
mzappitello authored Jul 15, 2024
1 parent 525b015 commit f5cc696
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 12 deletions.
46 changes: 34 additions & 12 deletions src/lamp_py/ingestion/glides.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from queue import Queue

from abc import ABC, abstractmethod
import polars as pl
import pyarrow
import pyarrow.dataset as pd
import pyarrow.parquet as pq
Expand All @@ -31,8 +32,8 @@ class GlidesConverter(ABC):

glides_location = pyarrow.struct(
[
("gtfsID", pyarrow.string()),
("todsID", pyarrow.string()),
("gtfsId", pyarrow.string()),
("todsId", pyarrow.string()),
]
)

Expand Down Expand Up @@ -88,7 +89,7 @@ def append_records(self) -> None:

joined_ds = new_dataset
if os.path.exists(self.local_path):
joined_ds = pd.dataset([pd.dataset(self.local_path), new_dataset])
joined_ds = pd.dataset([new_dataset, pd.dataset(self.local_path)])

process_logger.add_metadata(
new_records=new_dataset.count_rows(),
Expand All @@ -106,18 +107,33 @@ def append_records(self) -> None:
while start < now:
end = start + relativedelta(months=1)
if end < now:
table = joined_ds.filter(
(pc.field("time") >= start)
& (pc.field("time") < end)
).to_table()
unique_table = (
pl.DataFrame(
joined_ds.filter(
(pc.field("time") >= start)
& (pc.field("time") < end)
).to_table()
)
.unique(keep="first")
.to_arrow()
.sort_by("time")
)
else:
table = joined_ds.filter(
(pc.field("time") >= start)
).to_table()
unique_table = (
pl.DataFrame(
joined_ds.filter(
(pc.field("time") >= start)
).to_table()
)
.unique(keep="first")
.to_arrow()
.sort_by("time")
)

if unique_table.num_rows() > 0:

if table.num_rows > 0:
row_group_count += 1
writer.write_table(table)
writer.write_table(unique_table)

start = end

Expand Down Expand Up @@ -152,6 +168,8 @@ def event_schema(self) -> pyarrow.schema:
]
)

# NOTE: These tables will eventually be uniqued via polars, which will
# not work if any of the types in the schema are objects.
return pyarrow.schema(
[
(
Expand Down Expand Up @@ -207,6 +225,8 @@ def __init__(self) -> None:

@property
def event_schema(self) -> pyarrow.schema:
# NOTE: These tables will eventually be uniqued via polars, which will
# not work if any of the types in the schema are objects.
return pyarrow.schema(
[
(
Expand Down Expand Up @@ -306,6 +326,8 @@ def event_schema(self) -> pyarrow.schema:
]
)

# NOTE: These tables will eventually be uniqued via polars, which will
# not work if any of the types in the schema are objects.
return pyarrow.schema(
[
(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""update_glides_location_column_names
Revision ID: 26db393ea854
Revises: 07903947aabe
Create Date: 2024-07-09 12:12:04.325358
Details
* upgrade -> for each glides parquet file:
* rename columns to match api. replace gtfsID with gtfsId and todsID with
todsId
* unique each dataset based on the 'id' uuid field.
* downgrade -> Nothing
"""

import os
import tempfile
import polars as pl
import pyarrow.parquet as pq

from lamp_py.aws.s3 import download_file, upload_file

# revision identifiers, used by Alembic.
revision = "26db393ea854"
down_revision = "07903947aabe"
branch_labels = None
depends_on = None


def upgrade() -> None:
def update_glides_archive(temp_dir: str, base_filename: str) -> None:
"""
* download the remote file to a local temp dir
* rename columns with "gtfsID" or "todsID" in them to use "Id"
* unique columns
* sort the dataset based on 'time' column
"""
remote_path = f"s3://{os.environ['SPRINGBOARD_BUCKET']}/lamp/GLIDES/{base_filename}"
old_local_path = os.path.join(temp_dir, f"old_{base_filename}")
new_local_path = os.path.join(temp_dir, f"new_{base_filename}")

file_exists = download_file(remote_path, old_local_path)
if not file_exists:
return

old = pq.read_table(old_local_path)
old_names = old.column_names

# rename columns containing gtfsID and todsID to use Id instead
new_names = [
n.replace("gtfsID", "gtfsId").replace("todsID", "todsId")
for n in old_names
]
renamed = old.rename_columns(new_names)

# unique the records
new = pl.DataFrame(renamed).unique().sort(by=["time"]).to_arrow()

pq.write_table(new, new_local_path)

upload_file(new_local_path, remote_path)

files_to_update = [
"editor_changes.parquet",
"operator_sign_ins.parquet",
"trip_updates.parquet",
]

with tempfile.TemporaryDirectory() as temp_dir:
for filename in files_to_update:
update_glides_archive(temp_dir, filename)


def downgrade() -> None:
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""update_glides_location_column_names
Revision ID: 26db393ea854
Revises: cce8dfee767a
Create Date: 2024-07-09 12:12:04.325358
Details
* upgrade -> for each glides parquet file:
* rename columns to match api. replace gtfsID with gtfsId and todsID with
todsId
* unique each dataset based on the 'id' uuid field.
* downgrade -> Nothing
"""

import os
import tempfile
import polars as pl
import pyarrow.parquet as pq

from lamp_py.aws.s3 import download_file, upload_file

# revision identifiers, used by Alembic.
revision = "26db393ea854"
down_revision = "cce8dfee767a"
branch_labels = None
depends_on = None


def upgrade() -> None:
def update_glides_archive(temp_dir: str, base_filename: str) -> None:
"""
* download the remote file to a local temp dir
* rename columns with "gtfsID" or "todsID" in them to use "Id"
* unique columns
* sort the dataset based on 'time' column
"""
remote_path = f"s3://{os.environ['SPRINGBOARD_BUCKET']}/lamp/GLIDES/{base_filename}"
old_local_path = os.path.join(temp_dir, f"old_{base_filename}")
new_local_path = os.path.join(temp_dir, f"new_{base_filename}")

file_exists = download_file(remote_path, old_local_path)
if not file_exists:
return

old = pq.read_table(old_local_path)
old_names = old.column_names

# rename columns containing gtfsID and todsID to use Id instead
new_names = [
n.replace("gtfsID", "gtfsId").replace("todsID", "todsId")
for n in old_names
]
renamed = old.rename_columns(new_names)

# unique the records
new = pl.DataFrame(renamed).unique().sort(by=["time"]).to_arrow()

pq.write_table(new, new_local_path)

upload_file(new_local_path, remote_path)

files_to_update = [
"editor_changes.parquet",
"operator_sign_ins.parquet",
"trip_updates.parquet",
]

with tempfile.TemporaryDirectory() as temp_dir:
for filename in files_to_update:
update_glides_archive(temp_dir, filename)


def downgrade() -> None:
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""update_glides_location_column_names
Revision ID: 26db393ea854
Revises: 07903947aabe
Create Date: 2024-07-09 12:12:04.325358
Details
* upgrade -> for each glides parquet file:
* rename columns to match api. replace gtfsID with gtfsId and todsID with
todsId
* unique each dataset based on the 'id' uuid field.
* downgrade -> Nothing
"""

import os
import tempfile
import polars as pl
import pyarrow.parquet as pq

from lamp_py.aws.s3 import download_file, upload_file

# revision identifiers, used by Alembic.
revision = "26db393ea854"
down_revision = "07903947aabe"
branch_labels = None
depends_on = None


def upgrade() -> None:
def update_glides_archive(temp_dir: str, base_filename: str) -> None:
"""
* download the remote file to a local temp dir
* rename columns with "gtfsID" or "todsID" in them to use "Id"
* unique columns
* sort the dataset based on 'time' column
"""
remote_path = f"s3://{os.environ['SPRINGBOARD_BUCKET']}/lamp/GLIDES/{base_filename}"
old_local_path = os.path.join(temp_dir, f"old_{base_filename}")
new_local_path = os.path.join(temp_dir, f"new_{base_filename}")

file_exists = download_file(remote_path, old_local_path)
if not file_exists:
return

old = pq.read_table(old_local_path)
old_names = old.column_names

# rename columns containing gtfsID and todsID to use Id instead
new_names = [
n.replace("gtfsID", "gtfsId").replace("todsID", "todsId")
for n in old_names
]
renamed = old.rename_columns(new_names)

# unique the records
new = pl.DataFrame(renamed).unique().sort(by=["time"]).to_arrow()

pq.write_table(new, new_local_path)

upload_file(new_local_path, remote_path)

files_to_update = [
"editor_changes.parquet",
"operator_sign_ins.parquet",
"trip_updates.parquet",
]

with tempfile.TemporaryDirectory() as temp_dir:
for filename in files_to_update:
update_glides_archive(temp_dir, filename)


def downgrade() -> None:
pass

0 comments on commit f5cc696

Please sign in to comment.