Skip to content

Commit

Permalink
publish bus files to tableau
Browse files Browse the repository at this point in the history
  • Loading branch information
rymarczy committed Oct 25, 2024
1 parent e194fe8 commit 352e11b
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 6 deletions.
4 changes: 3 additions & 1 deletion src/lamp_py/bus_performance_manager/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from lamp_py.runtime_utils.env_validation import validate_environment
from lamp_py.runtime_utils.process_logger import ProcessLogger
from lamp_py.bus_performance_manager.write_events import write_bus_metrics
from lamp_py.tableau.pipeline import start_bus_parquet_updates

logging.getLogger().setLevel("INFO")

Expand All @@ -24,7 +25,7 @@ def parse_args(args: List[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=DESCRIPTION)
parser.add_argument(
"--interval",
default=60,
default=300,
dest="interval",
help="interval to run event loop on",
)
Expand All @@ -49,6 +50,7 @@ def iteration() -> None:
process_logger.log_start()
try:
write_bus_metrics()
start_bus_parquet_updates()
process_logger.log_complete()
except Exception as exception:
process_logger.log_failure(exception)
Expand Down
2 changes: 2 additions & 0 deletions src/lamp_py/runtime_utils/remote_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ def s3_uri(self) -> str:
bucket=S3_PUBLIC,
prefix=os.path.join(TABLEAU, "rail"),
)
tableau_bus_recent = S3Location(bucket=S3_PUBLIC, prefix=os.path.join(TABLEAU, "bus", "LAMP_RECENT_Bus_Events.parquet"))
tableau_bus_all = S3Location(bucket=S3_PUBLIC, prefix=os.path.join(TABLEAU, "bus", "LAMP_ALL_Bus_Events.parquet"))


class GTFSArchive(S3Location):
Expand Down
14 changes: 9 additions & 5 deletions src/lamp_py/tableau/hyper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from abc import abstractmethod
from itertools import chain
from typing import Dict
from typing import Optional

import pyarrow
from pyarrow import fs
Expand Down Expand Up @@ -69,13 +70,13 @@ def parquet_schema(self) -> pyarrow.schema:
"""

@abstractmethod
def create_parquet(self, db_manager: DatabaseManager) -> None:
def create_parquet(self, db_manager: Optional[DatabaseManager]) -> None:
"""
Business logic to create new Job parquet file
"""

@abstractmethod
def update_parquet(self, db_manager: DatabaseManager) -> bool:
def update_parquet(self, db_manager: Optional[DatabaseManager]) -> bool:
"""
Business logic to update existing Job parquet file
Expand Down Expand Up @@ -252,7 +253,7 @@ def run_hyper(self) -> None:
if retry_count == max_retries:
process_log.log_failure(exception=exception)

def run_parquet(self, db_manager: DatabaseManager) -> None:
def run_parquet(self, db_manager: Optional[DatabaseManager]) -> None:
"""
Remote parquet Create / Update runner
Expand Down Expand Up @@ -291,7 +292,9 @@ def run_parquet(self, db_manager: DatabaseManager) -> None:
run_action = "update"
upload_parquet = self.update_parquet(db_manager)

parquet_file_size_mb = os.path.getsize(self.local_parquet_path) / (1024 * 1024)
parquet_file_size_mb = 0.0
if os.path.exists(self.local_parquet_path):
parquet_file_size_mb = os.path.getsize(self.local_parquet_path) / (1024 * 1024)

process_log.add_metadata(
remote_schema_match=remote_schema_match,
Expand All @@ -307,7 +310,8 @@ def run_parquet(self, db_manager: DatabaseManager) -> None:
extra_args={"Metadata": {"lamp_version": self.lamp_version}},
)

os.remove(self.local_parquet_path)
if os.path.exists(self.local_parquet_path):
os.remove(self.local_parquet_path)

process_log.log_complete()

Expand Down
136 changes: 136 additions & 0 deletions src/lamp_py/tableau/jobs/bus_performance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
from typing import Optional
from datetime import datetime
from datetime import timezone

import pyarrow
import pyarrow.parquet as pq
import pyarrow.dataset as pd
from pyarrow.fs import S3FileSystem

from lamp_py.tableau.hyper import HyperJob
from lamp_py.runtime_utils.remote_files import bus_events
from lamp_py.runtime_utils.remote_files import tableau_bus_all
from lamp_py.runtime_utils.remote_files import tableau_bus_recent
from lamp_py.aws.s3 import file_list_from_s3
from lamp_py.aws.s3 import file_list_from_s3_with_details
from lamp_py.aws.s3 import object_exists

bus_schema = pyarrow.schema(
[
("service_date", pyarrow.large_string()),
("route_id", pyarrow.large_string()),
("trip_id", pyarrow.large_string()),
("start_time", pyarrow.int64()),
("start_dt", pyarrow.timestamp("us")),
("stop_count", pyarrow.uint32()),
("direction_id", pyarrow.int8()),
("stop_id", pyarrow.large_string()),
("stop_sequence", pyarrow.int64()),
("vehicle_id", pyarrow.large_string()),
("vehicle_label", pyarrow.large_string()),
("gtfs_travel_to_dt", pyarrow.timestamp("us")),
("tm_stop_sequence", pyarrow.int64()),
("plan_trip_id", pyarrow.large_string()),
("exact_plan_trip_match", pyarrow.bool_()),
("block_id", pyarrow.large_string()),
("service_id", pyarrow.large_string()),
("route_pattern_id", pyarrow.large_string()),
("route_pattern_typicality", pyarrow.int64()),
("direction", pyarrow.large_string()),
("direction_destination", pyarrow.large_string()),
("plan_stop_count", pyarrow.uint32()),
("plan_start_time", pyarrow.int64()),
("plan_start_dt", pyarrow.timestamp("us")),
("stop_name", pyarrow.large_string()),
("plan_travel_time_seconds", pyarrow.int64()),
("plan_route_direction_headway_seconds", pyarrow.int64()),
("plan_direction_destination_headway_seconds", pyarrow.int64()),
("stop_arrival_dt", pyarrow.timestamp("us")),
("stop_departure_dt", pyarrow.timestamp("us")),
("gtfs_travel_to_seconds", pyarrow.int64()),
("stop_arrival_seconds", pyarrow.int64()),
("stop_departure_seconds", pyarrow.int64()),
("travel_time_seconds", pyarrow.int64()),
("dwell_time_seconds", pyarrow.int64()),
("route_direction_headway_seconds", pyarrow.int64()),
("direction_destination_headway_seconds", pyarrow.int64()),
]
)


def create_bus_parquet(job: HyperJob, num_files: Optional[int]) -> None:
"""
Join bus_events files into single parquet file for upload to Tableau
"""
s3_uris = file_list_from_s3(bucket_name=bus_events.bucket, file_prefix=bus_events.prefix)
ds_paths = [s.replace("s3://", "") for s in s3_uris]

if num_files is not None:
ds_paths = ds_paths[-num_files:]

ds = pd.dataset(
ds_paths,
format="parquet",
filesystem=S3FileSystem(),
)

with pq.ParquetWriter(job.local_parquet_path, schema=job.parquet_schema) as writer:
for batch in ds.to_batches(batch_size=500_000):
writer.write_batch(batch)


class HyperBusPerformanceAll(HyperJob):
"""HyperJob for ALL LAMP RT Bus Data"""

def __init__(self) -> None:
HyperJob.__init__(
self,
hyper_file_name=tableau_bus_all.prefix.rsplit("/")[-1].replace(".parquet", ".hyper"),
remote_parquet_path=tableau_bus_all.s3_uri,
lamp_version=tableau_bus_all.version,
)

@property
def parquet_schema(self) -> pyarrow.schema:
return bus_schema

def create_parquet(self, _: None) -> None:
self.update_parquet(None)

def update_parquet(self, _: None) -> bool:
# only run once per day after 11AM UTC
if object_exists(tableau_bus_all.s3_uri):
now_utc = datetime.now(tz=timezone.utc)
last_mod: datetime = file_list_from_s3_with_details(
bucket_name=tableau_bus_all.bucket,
file_prefix=tableau_bus_all.prefix,
)[0]["last_modified"]

if now_utc.day == last_mod.day or now_utc.hour < 11:
return False

create_bus_parquet(self, None)
return True


class HyperBusPerformanceRecent(HyperJob):
"""HyperJob for ALL LAMP RT Bus Data"""

def __init__(self) -> None:
HyperJob.__init__(
self,
hyper_file_name=tableau_bus_recent.prefix.rsplit("/")[-1].replace(".parquet", ".hyper"),
remote_parquet_path=tableau_bus_recent.s3_uri,
lamp_version=tableau_bus_recent.version,
)

@property
def parquet_schema(self) -> pyarrow.schema:
return bus_schema

def create_parquet(self, _: None) -> None:
self.update_parquet(None)

def update_parquet(self, _: None) -> bool:
create_bus_parquet(self, 7)
return True
16 changes: 16 additions & 0 deletions src/lamp_py/tableau/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
HyperStaticStopTimes,
HyperStaticTrips,
)
from lamp_py.tableau.jobs.bus_performance import HyperBusPerformanceAll
from lamp_py.tableau.jobs.bus_performance import HyperBusPerformanceRecent
from lamp_py.aws.ecs import check_for_parallel_tasks


Expand Down Expand Up @@ -54,6 +56,8 @@ def start_hyper_updates() -> None:
HyperStaticStopTimes(),
HyperStaticTrips(),
HyperRtAlerts(),
HyperBusPerformanceAll(),
HyperBusPerformanceRecent(),
]

for job in hyper_jobs:
Expand All @@ -77,3 +81,15 @@ def start_parquet_updates(db_manager: DatabaseManager) -> None:

for job in parquet_update_jobs:
job.run_parquet(db_manager)


def start_bus_parquet_updates() -> None:
"""Run all Bus Parquet Update jobs"""

parquet_update_jobs: List[HyperJob] = [
HyperBusPerformanceRecent(),
HyperBusPerformanceAll(),
]

for job in parquet_update_jobs:
job.run_parquet(None)

0 comments on commit 352e11b

Please sign in to comment.