-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FEAT: Ingestion Light Rail Raw GPS files #422
Conversation
src/lamp_py/aws/s3.py
Outdated
def object_exists(obj: str) -> bool: | ||
""" | ||
check if s3 object exists | ||
|
||
:param obj - expected as 's3://my_bucket/object' or 'my_bucket/object' | ||
|
||
:return: True is object exists, otherwise false | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Convenience function to check if object exists on S3
elif config_type == ConfigType.LIGHT_RAIL: | ||
raise IgnoreIngestion("Ignore LIGHT_RAIL files") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seemed like the least intrusive way to handle skipping these files in GTFS-RT processing. Specific exception will trigger the ignoring of this file type.
except IgnoreIngestion: | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Skip ignored files, don't move them
def thread_gps_to_frame(path: str) -> Tuple[Optional[pl.DataFrame], str]: | ||
""" | ||
gzip to dataframe converter function meant to be run in ThreadPool | ||
|
||
:param path: path to gzip that will be converted to polars dataframe | ||
""" | ||
file_system = current_thread().__dict__["file_system"] | ||
path = path.replace("s3://", "") | ||
|
||
logger = ProcessLogger(process_name="light_rail_gps_to_frame", path=path) | ||
logger.log_start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is similar to our gzip process for RT files. In testing, about 16 threads produced the maximum throughput with minimal resource usage.
else: | ||
error_files.append(path) | ||
|
||
dataframe: pl.DataFrame = pl.concat(dfs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gather all records from all files into single dataframe.
for date in dataframe.get_column("date").unique(): | ||
logger = ProcessLogger( | ||
process_name="light_rail_write_parquet", date=date | ||
) | ||
logger.log_start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Write parquet files by date pulled from "updated_at" column.
This will merge existing parquet files from S3, if they exist and de-dupe the entire dataframe.
Coverage of commit
|
def test_light_rail_gps() -> None: | ||
""" | ||
test gtfs_events_for_date pipeline | ||
""" | ||
dataframe, archive_files, error_files = dataframe_from_gz(mock_file_list) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not totally sure if it's worth testing the rest of the process, which just includes a file_list_from_s3
call and the parquet writing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one question on the new aws utility.
src/lamp_py/aws/s3.py
Outdated
except botocore.exceptions.ClientError as exception: | ||
if exception.response["Error"]["Code"] == "404": | ||
return False | ||
|
||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to catch a more generic exception here as well?
if something in our try
raises, it'll raise up to whatever called this function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
per this documentation: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_Errors
the "404" error is the only thing that should be returned if the object does not exist in the bucket. Any other error would be un-defined behavior. Such as a permissions error, that should probably raise so that it's not masked by the function.
with file_system.open_input_stream(path, compression="gzip") as f: | ||
df = ( | ||
pl.read_json(f.read()) | ||
.transpose( | ||
include_header=True, | ||
header_name="serial_number", | ||
column_names=("data",), | ||
) | ||
.select( | ||
pl.col("serial_number").cast(pl.String), | ||
pl.col("data").struct.field("speed").cast(pl.Float64), | ||
( | ||
pl.col("data") | ||
.struct.field("updated_at") | ||
.str.slice(0, length=10) | ||
.str.to_date() | ||
.alias("date") | ||
), | ||
pl.col("data").struct.field("updated_at").cast(pl.String), | ||
pl.col("data").struct.field("bearing").cast(pl.Float64), | ||
pl.col("data").struct.field("latitude").cast(pl.String), | ||
pl.col("data").struct.field("longitude").cast(pl.String), | ||
pl.col("data").struct.field("car").cast(pl.String), | ||
) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems to be a nicer way to do this than using pyarrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I also think most conversions between pyarrow and polars are zero-copy operations. The only tricky thing is making sure we're handling any type conversions correctly.
except botocore.exceptions.ClientError as exception: | ||
if exception.response["Error"]["Code"] == "404": | ||
return False | ||
raise exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raise all other exceptions, The 404 error code is the only thing that should be returned if the object does not exist. Any other exceptions would be undefined behavior
45f1f01
to
afb7932
Compare
afb7932
to
20568b0
Compare
Coverage of commit
|
This change allows the Ingestion app to ingest Light Rail RAW GPS files.
The process is currently configured to process a maximum batch size of ~5000 files on each Ingestion loop.
All processing is done in memory with polars, this dataset is relatively small and 5000 files worth of data utilizes less than 2GB of data. A full event loop of 5000 files takes ~2 mins to process locally. This can be adjusted if event loop is taking too long on AWS.
The process will also merge export files with existing files found in the public S3 bucket.
Asana Task: https://app.asana.com/0/1205827492903547/1207059327607972