Skip to content

Commit

Permalink
FEAT: Publish BUS Performance files to Tableau #455
Browse files Browse the repository at this point in the history
This change reads daily bus_event files from the public S3 bucket, concatenates them together and sends them to Tableau to be used by OPMI. This process happens in two steps:

1. Join daily bus event files together and saved joined parquet file to tableau partition of public bucket.
2. Tableau publisher app picks up tableau partition files from public bucket, converts them to a .hyper file and uploads them to ITD Tableau sever.

Two datasets are created by this process an ALL dataset, consisting of all available bus event files. Along with a RECENT dataset containing only the last 7 days of bus event files. The RECENT dataset will be updated on tableau continuously throughout the day. The ALL dataset will only be updated once per day around 6AM.

Asana Task: https://app.asana.com/0/1189492770004753/1208021735441636
  • Loading branch information
rymarczy authored Oct 25, 2024
1 parent e194fe8 commit 1b98406
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 6 deletions.
4 changes: 3 additions & 1 deletion src/lamp_py/bus_performance_manager/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from lamp_py.runtime_utils.env_validation import validate_environment
from lamp_py.runtime_utils.process_logger import ProcessLogger
from lamp_py.bus_performance_manager.write_events import write_bus_metrics
from lamp_py.tableau.pipeline import start_bus_parquet_updates

logging.getLogger().setLevel("INFO")

Expand All @@ -24,7 +25,7 @@ def parse_args(args: List[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=DESCRIPTION)
parser.add_argument(
"--interval",
default=60,
default=300,
dest="interval",
help="interval to run event loop on",
)
Expand All @@ -49,6 +50,7 @@ def iteration() -> None:
process_logger.log_start()
try:
write_bus_metrics()
start_bus_parquet_updates()
process_logger.log_complete()
except Exception as exception:
process_logger.log_failure(exception)
Expand Down
2 changes: 2 additions & 0 deletions src/lamp_py/runtime_utils/remote_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ def s3_uri(self) -> str:
bucket=S3_PUBLIC,
prefix=os.path.join(TABLEAU, "rail"),
)
tableau_bus_recent = S3Location(bucket=S3_PUBLIC, prefix=os.path.join(TABLEAU, "bus", "LAMP_RECENT_Bus_Events.parquet"))
tableau_bus_all = S3Location(bucket=S3_PUBLIC, prefix=os.path.join(TABLEAU, "bus", "LAMP_ALL_Bus_Events.parquet"))


class GTFSArchive(S3Location):
Expand Down
14 changes: 9 additions & 5 deletions src/lamp_py/tableau/hyper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from abc import abstractmethod
from itertools import chain
from typing import Dict
from typing import Optional

import pyarrow
from pyarrow import fs
Expand Down Expand Up @@ -69,13 +70,13 @@ def parquet_schema(self) -> pyarrow.schema:
"""

@abstractmethod
def create_parquet(self, db_manager: DatabaseManager) -> None:
def create_parquet(self, db_manager: Optional[DatabaseManager]) -> None:
"""
Business logic to create new Job parquet file
"""

@abstractmethod
def update_parquet(self, db_manager: DatabaseManager) -> bool:
def update_parquet(self, db_manager: Optional[DatabaseManager]) -> bool:
"""
Business logic to update existing Job parquet file
Expand Down Expand Up @@ -252,7 +253,7 @@ def run_hyper(self) -> None:
if retry_count == max_retries:
process_log.log_failure(exception=exception)

def run_parquet(self, db_manager: DatabaseManager) -> None:
def run_parquet(self, db_manager: Optional[DatabaseManager]) -> None:
"""
Remote parquet Create / Update runner
Expand Down Expand Up @@ -291,7 +292,9 @@ def run_parquet(self, db_manager: DatabaseManager) -> None:
run_action = "update"
upload_parquet = self.update_parquet(db_manager)

parquet_file_size_mb = os.path.getsize(self.local_parquet_path) / (1024 * 1024)
parquet_file_size_mb = 0.0
if os.path.exists(self.local_parquet_path):
parquet_file_size_mb = os.path.getsize(self.local_parquet_path) / (1024 * 1024)

process_log.add_metadata(
remote_schema_match=remote_schema_match,
Expand All @@ -307,7 +310,8 @@ def run_parquet(self, db_manager: DatabaseManager) -> None:
extra_args={"Metadata": {"lamp_version": self.lamp_version}},
)

os.remove(self.local_parquet_path)
if os.path.exists(self.local_parquet_path):
os.remove(self.local_parquet_path)

process_log.log_complete()

Expand Down
136 changes: 136 additions & 0 deletions src/lamp_py/tableau/jobs/bus_performance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
from typing import Optional
from datetime import datetime
from datetime import timezone

import pyarrow
import pyarrow.parquet as pq
import pyarrow.dataset as pd
from pyarrow.fs import S3FileSystem

from lamp_py.tableau.hyper import HyperJob
from lamp_py.runtime_utils.remote_files import bus_events
from lamp_py.runtime_utils.remote_files import tableau_bus_all
from lamp_py.runtime_utils.remote_files import tableau_bus_recent
from lamp_py.aws.s3 import file_list_from_s3
from lamp_py.aws.s3 import file_list_from_s3_with_details
from lamp_py.aws.s3 import object_exists

bus_schema = pyarrow.schema(
[
("service_date", pyarrow.large_string()),
("route_id", pyarrow.large_string()),
("trip_id", pyarrow.large_string()),
("start_time", pyarrow.int64()),
("start_dt", pyarrow.timestamp("us")),
("stop_count", pyarrow.uint32()),
("direction_id", pyarrow.int8()),
("stop_id", pyarrow.large_string()),
("stop_sequence", pyarrow.int64()),
("vehicle_id", pyarrow.large_string()),
("vehicle_label", pyarrow.large_string()),
("gtfs_travel_to_dt", pyarrow.timestamp("us")),
("tm_stop_sequence", pyarrow.int64()),
("plan_trip_id", pyarrow.large_string()),
("exact_plan_trip_match", pyarrow.bool_()),
("block_id", pyarrow.large_string()),
("service_id", pyarrow.large_string()),
("route_pattern_id", pyarrow.large_string()),
("route_pattern_typicality", pyarrow.int64()),
("direction", pyarrow.large_string()),
("direction_destination", pyarrow.large_string()),
("plan_stop_count", pyarrow.uint32()),
("plan_start_time", pyarrow.int64()),
("plan_start_dt", pyarrow.timestamp("us")),
("stop_name", pyarrow.large_string()),
("plan_travel_time_seconds", pyarrow.int64()),
("plan_route_direction_headway_seconds", pyarrow.int64()),
("plan_direction_destination_headway_seconds", pyarrow.int64()),
("stop_arrival_dt", pyarrow.timestamp("us")),
("stop_departure_dt", pyarrow.timestamp("us")),
("gtfs_travel_to_seconds", pyarrow.int64()),
("stop_arrival_seconds", pyarrow.int64()),
("stop_departure_seconds", pyarrow.int64()),
("travel_time_seconds", pyarrow.int64()),
("dwell_time_seconds", pyarrow.int64()),
("route_direction_headway_seconds", pyarrow.int64()),
("direction_destination_headway_seconds", pyarrow.int64()),
]
)


def create_bus_parquet(job: HyperJob, num_files: Optional[int]) -> None:
"""
Join bus_events files into single parquet file for upload to Tableau
"""
s3_uris = file_list_from_s3(bucket_name=bus_events.bucket, file_prefix=bus_events.prefix)
ds_paths = [s.replace("s3://", "") for s in s3_uris]

if num_files is not None:
ds_paths = ds_paths[-num_files:]

ds = pd.dataset(
ds_paths,
format="parquet",
filesystem=S3FileSystem(),
)

with pq.ParquetWriter(job.local_parquet_path, schema=job.parquet_schema) as writer:
for batch in ds.to_batches(batch_size=500_000):
writer.write_batch(batch)


class HyperBusPerformanceAll(HyperJob):
"""HyperJob for ALL LAMP RT Bus Data"""

def __init__(self) -> None:
HyperJob.__init__(
self,
hyper_file_name=tableau_bus_all.prefix.rsplit("/")[-1].replace(".parquet", ".hyper"),
remote_parquet_path=tableau_bus_all.s3_uri,
lamp_version=tableau_bus_all.version,
)

@property
def parquet_schema(self) -> pyarrow.schema:
return bus_schema

def create_parquet(self, _: None) -> None:
self.update_parquet(None)

def update_parquet(self, _: None) -> bool:
# only run once per day after 11AM UTC
if object_exists(tableau_bus_all.s3_uri):
now_utc = datetime.now(tz=timezone.utc)
last_mod: datetime = file_list_from_s3_with_details(
bucket_name=tableau_bus_all.bucket,
file_prefix=tableau_bus_all.prefix,
)[0]["last_modified"]

if now_utc.day == last_mod.day or now_utc.hour < 11:
return False

create_bus_parquet(self, None)
return True


class HyperBusPerformanceRecent(HyperJob):
"""HyperJob for ALL LAMP RT Bus Data"""

def __init__(self) -> None:
HyperJob.__init__(
self,
hyper_file_name=tableau_bus_recent.prefix.rsplit("/")[-1].replace(".parquet", ".hyper"),
remote_parquet_path=tableau_bus_recent.s3_uri,
lamp_version=tableau_bus_recent.version,
)

@property
def parquet_schema(self) -> pyarrow.schema:
return bus_schema

def create_parquet(self, _: None) -> None:
self.update_parquet(None)

def update_parquet(self, _: None) -> bool:
create_bus_parquet(self, 7)
return True
16 changes: 16 additions & 0 deletions src/lamp_py/tableau/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
HyperStaticStopTimes,
HyperStaticTrips,
)
from lamp_py.tableau.jobs.bus_performance import HyperBusPerformanceAll
from lamp_py.tableau.jobs.bus_performance import HyperBusPerformanceRecent
from lamp_py.aws.ecs import check_for_parallel_tasks


Expand Down Expand Up @@ -54,6 +56,8 @@ def start_hyper_updates() -> None:
HyperStaticStopTimes(),
HyperStaticTrips(),
HyperRtAlerts(),
HyperBusPerformanceAll(),
HyperBusPerformanceRecent(),
]

for job in hyper_jobs:
Expand All @@ -77,3 +81,15 @@ def start_parquet_updates(db_manager: DatabaseManager) -> None:

for job in parquet_update_jobs:
job.run_parquet(db_manager)


def start_bus_parquet_updates() -> None:
"""Run all Bus Parquet Update jobs"""

parquet_update_jobs: List[HyperJob] = [
HyperBusPerformanceRecent(),
HyperBusPerformanceAll(),
]

for job in parquet_update_jobs:
job.run_parquet(None)

0 comments on commit 1b98406

Please sign in to comment.