Skip to content

Commit

Permalink
FEAT: Join BUS GTFS-RT Vehicle Events with Transit Master Vehicle Eve…
Browse files Browse the repository at this point in the history
…nts (#440)

This change allows for the joining of Bus events from GTFS-RT and Transit Master (TM) data sources.

GTFS-RT events are joined to TM events with an "asof" join.

This type of join first performs a regular LEFT JOIN on the columns of "route_id", "trip_id", "vehicle_label" and "stop_id" and then performs a nearest match on "stop_sequence". The resulting dataframe retains a "stop_sequence" (from gtfs-rt) and "tm_stop_sequence" column to verify accuracy of stop_sequence nearest join.

Asana Task: https://app.asana.com/0/1205827492903547/1207771349226047
  • Loading branch information
rymarczy authored Sep 28, 2024
1 parent 31a5082 commit d796f42
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

def get_new_event_files() -> List[Dict[str, date | List[str]]]:
"""
Generate a dataframe that contains a record for every service date to be
Generate a list of dictionaries that contains a record for every service date to be
processed.
* Collect all of the potential input filepaths, their last modified
timestamp, and potential service dates.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,14 @@ def read_vehicle_positions(
& pl.col("vehicle.trip.trip_id").is_not_null()
& pl.col("vehicle.vehicle.id").is_not_null()
& pl.col("vehicle.timestamp").is_not_null()
& pl.col("vehicle.trip.start_time").is_not_null()
)
.select(
pl.col("vehicle.trip.route_id").cast(pl.String).alias("route_id"),
pl.col("vehicle.trip.trip_id").cast(pl.String).alias("trip_id"),
pl.col("vehicle.stop_id").cast(pl.String).alias("stop_id"),
pl.col("vehicle.current_stop_sequence")
.cast(pl.String)
.cast(pl.Int64)
.alias("stop_sequence"),
pl.col("vehicle.trip.direction_id")
.cast(pl.Int8)
Expand Down Expand Up @@ -103,18 +104,17 @@ def positions_to_events(vehicle_positions: pl.DataFrame) -> pl.DataFrame:
:param vehicle_positions: Dataframe of vehiclie positions
:return dataframe:
service_date -> String
route_id -> String
trip_id -> String
stop_id -> String
stop_sequence -> String
direction_id -> Int8
start_time -> String
service_date -> String
direction_id -> Int8
stop_id -> String
stop_sequence -> Int64
vehicle_id -> String
vehicle_label -> String
current_status -> String
arrival_gtfs -> Datetime
travel_towards_gtfs -> Datetime
gtfs_travel_to_dt -> Datetime
gtfs_arrival_dt -> Datetime
"""
vehicle_events = vehicle_positions.pivot(
values="vehicle_timestamp",
Expand All @@ -141,22 +141,22 @@ def positions_to_events(vehicle_positions: pl.DataFrame) -> pl.DataFrame:

vehicle_events = vehicle_events.rename(
{
"STOPPED_AT": "arrival_gtfs",
"IN_TRANSIT_TO": "travel_towards_gtfs",
"STOPPED_AT": "gtfs_arrival_dt",
"IN_TRANSIT_TO": "gtfs_travel_to_dt",
}
).select(
[
"service_date",
"route_id",
"trip_id",
"start_time",
"direction_id",
"stop_id",
"stop_sequence",
"direction_id",
"start_time",
"service_date",
"vehicle_id",
"vehicle_label",
"arrival_gtfs",
"travel_towards_gtfs",
"gtfs_travel_to_dt",
"gtfs_arrival_dt",
]
)

Expand Down
File renamed without changes.
60 changes: 60 additions & 0 deletions src/lamp_py/bus_performance_manager/events_joined.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import polars as pl


def join_gtfs_tm_events(gtfs: pl.DataFrame, tm: pl.DataFrame) -> pl.DataFrame:
"""
Join gtfs-rt and transit master (tm) event dataframes
:return dataframe:
service_date -> String
route_id -> String
trip_id -> String
start_time -> String
direction_id -> Int8
stop_id -> String
stop_sequence -> String
vehicle_id -> String
vehicle_label -> String
gtfs_travel_to_dt -> Datetime
gtfs_arrival_dt -> Datetime
tm_stop_sequence -> Int64
tm_is_layover -> Bool
tm_arrival_dt -> Datetime
tm_departure_dt -> Datetime
gtfs_sort_dt -> Datetime
gtfs_depart_dt -> Datetime
"""

# join gtfs and tm datasets using "asof" strategy for stop_sequence columns
# asof strategy finds nearest value match between "asof" columns if exact match is not found
# will perform regular left join on "by" columns

return (
gtfs.sort(by="stop_sequence")
.join_asof(
tm.sort("tm_stop_sequence"),
left_on="stop_sequence",
right_on="tm_stop_sequence",
by=["trip_id", "route_id", "vehicle_label", "stop_id"],
strategy="nearest",
coalesce=True,
)
.with_columns(
(
pl.coalesce(
["gtfs_travel_to_dt", "gtfs_arrival_dt"],
).alias("gtfs_sort_dt")
)
)
.with_columns(
(
pl.col("gtfs_travel_to_dt")
.shift(-1)
.over(
["vehicle_label", "trip_id"],
order_by="gtfs_sort_dt",
)
.alias("gtfs_depart_dt")
)
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def generate_tm_events(tm_files: List[str]) -> pl.DataFrame:
"GEO_NODE_ID",
"GEO_NODE_ABBR",
)
.filter(pl.col("GEO_NODE_ABBR").is_not_null())
.unique()
)

Expand All @@ -52,6 +53,7 @@ def generate_tm_events(tm_files: List[str]) -> pl.DataFrame:
"ROUTE_ID",
"ROUTE_ABBR",
)
.filter(pl.col("ROUTE_ABBR").is_not_null())
.unique()
)

Expand All @@ -63,6 +65,7 @@ def generate_tm_events(tm_files: List[str]) -> pl.DataFrame:
"TRIP_ID",
"TRIP_SERIAL_NUMBER",
)
.filter(pl.col("TRIP_SERIAL_NUMBER").is_not_null())
.unique()
)

Expand All @@ -74,6 +77,7 @@ def generate_tm_events(tm_files: List[str]) -> pl.DataFrame:
"VEHICLE_ID",
"PROPERTY_TAG",
)
.filter(pl.col("PROPERTY_TAG").is_not_null())
.unique()
)

Expand All @@ -87,8 +91,15 @@ def generate_tm_events(tm_files: List[str]) -> pl.DataFrame:
tm_stop_crossings = (
pl.scan_parquet(tm_files)
.filter(
(pl.col("ACT_ARRIVAL_TIME").is_not_null())
| (pl.col("ACT_DEPARTURE_TIME").is_not_null())
(pl.col("IsRevenue") == "R")
& pl.col("ROUTE_ID").is_not_null()
& pl.col("GEO_NODE_ID").is_not_null()
& pl.col("TRIP_ID").is_not_null()
& pl.col("VEHICLE_ID").is_not_null()
& (
(pl.col("ACT_ARRIVAL_TIME").is_not_null())
| (pl.col("ACT_DEPARTURE_TIME").is_not_null())
)
)
.join(
tm_geo_nodes,
Expand Down Expand Up @@ -124,17 +135,19 @@ def generate_tm_events(tm_files: List[str]) -> pl.DataFrame:
),
)
.select(
pl.col("service_date").cast(pl.Date),
pl.col("PROPERTY_TAG").cast(pl.String).alias("tm_vehicle_label"),
(
pl.col("ROUTE_ABBR")
.cast(pl.String)
.str.strip_chars_start("0")
.alias("tm_route_id")
.alias("route_id")
),
pl.col("GEO_NODE_ID").cast(pl.String).alias("tm_geo_node_id"),
pl.col("GEO_NODE_ABBR").cast(pl.String).alias("tm_stop_id"),
pl.col("TRIP_SERIAL_NUMBER").cast(pl.String).alias("tm_trip_id"),
pl.col("TRIP_SERIAL_NUMBER").cast(pl.String).alias("trip_id"),
pl.col("GEO_NODE_ABBR").cast(pl.String).alias("stop_id"),
pl.col("PATTERN_GEO_NODE_SEQ")
.cast(pl.Int64)
.alias("tm_stop_sequence"),
pl.col("IS_LAYOVER").cast(pl.String).alias("tm_is_layover"),
pl.col("PROPERTY_TAG").cast(pl.String).alias("vehicle_label"),
(
(
pl.col("service_date")
Expand All @@ -159,6 +172,19 @@ def generate_tm_events(tm_files: List[str]) -> pl.DataFrame:
.collect()
)

if tm_stop_crossings.shape[0] == 0:
schema = {
"route_id": pl.String,
"trip_id": pl.String,
"stop_id": pl.String,
"tm_stop_sequence": pl.Int64,
"tm_is_layover": pl.Boolean,
"vehicle_label": pl.String,
"tm_arrival_dt": pl.Datetime,
"tm_departure_dt": pl.Datetime,
}
tm_stop_crossings = pl.DataFrame(schema=schema)

return tm_stop_crossings


Expand Down
11 changes: 8 additions & 3 deletions tests/bus_performance_manager/test_gtfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import polars as pl
import polars.testing as pl_test

from lamp_py.bus_performance_manager.gtfs import gtfs_events_for_date
from lamp_py.bus_performance_manager.events_gtfs_schedule import (
gtfs_events_for_date,
)

current_dir = os.path.join(os.path.dirname(__file__))

Expand All @@ -32,9 +34,12 @@ def mock_file_download(object_path: str, file_name: str) -> bool:
return True


@mock.patch("lamp_py.bus_performance_manager.gtfs.file_list_from_s3")
@mock.patch(
"lamp_py.bus_performance_manager.gtfs.download_file", new=mock_file_download
"lamp_py.bus_performance_manager.events_gtfs_schedule.file_list_from_s3"
)
@mock.patch(
"lamp_py.bus_performance_manager.events_gtfs_schedule.download_file",
new=mock_file_download,
)
def test_gtfs_events_for_date(s3_patch: mock.MagicMock) -> None:
"""
Expand Down
44 changes: 22 additions & 22 deletions tests/bus_performance_manager/test_gtfs_rt_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import polars as pl

from lamp_py.aws.s3 import get_datetime_from_partition_path
from lamp_py.bus_performance_manager.gtfs_rt_ingestion import (
from lamp_py.bus_performance_manager.events_gtfs_rt import (
read_vehicle_positions,
positions_to_events,
generate_gtfs_rt_events,
Expand Down Expand Up @@ -54,7 +54,7 @@ def get_service_date_and_files() -> Tuple[date, List[str]]:
"direction_id": pl.Int8,
"trip_id": pl.String,
"stop_id": pl.String,
"stop_sequence": pl.String,
"stop_sequence": pl.Int64,
"start_time": pl.String,
"service_date": pl.String,
"vehicle_id": pl.String,
Expand All @@ -65,18 +65,18 @@ def get_service_date_and_files() -> Tuple[date, List[str]]:

# schema for vehicle events dataframes
VE_SCHEMA = {
"service_date": pl.String,
"route_id": pl.String,
"direction_id": pl.Int8,
"trip_id": pl.String,
"stop_id": pl.String,
"stop_sequence": pl.String,
"start_time": pl.String,
"service_date": pl.String,
"direction_id": pl.Int8,
"stop_id": pl.String,
"stop_sequence": pl.Int64,
"vehicle_id": pl.String,
"vehicle_label": pl.String,
"current_status": pl.String,
"arrival_gtfs": pl.Datetime,
"travel_towards_gtfs": pl.Datetime,
"gtfs_travel_to_dt": pl.Datetime,
"gtfs_arrival_dt": pl.Datetime,
}


Expand Down Expand Up @@ -113,26 +113,26 @@ def test_gtfs_rt_to_bus_events() -> None:
assert event["direction_id"] == 0

if event["stop_id"] == "173":
assert event["travel_towards_gtfs"] == datetime(
assert event["gtfs_travel_to_dt"] == datetime(
year=2024, month=6, day=1, hour=13, minute=1, second=19
)
assert event["arrival_gtfs"] == datetime(
assert event["gtfs_arrival_dt"] == datetime(
year=2024, month=6, day=1, hour=13, minute=2, second=34
)

if event["stop_id"] == "655":
assert event["travel_towards_gtfs"] == datetime(
assert event["gtfs_travel_to_dt"] == datetime(
year=2024, month=6, day=1, hour=12, minute=50, second=31
)
assert event["arrival_gtfs"] == datetime(
assert event["gtfs_arrival_dt"] == datetime(
year=2024, month=6, day=1, hour=12, minute=53, second=29
)

if event["stop_id"] == "903":
assert event["travel_towards_gtfs"] == datetime(
assert event["gtfs_travel_to_dt"] == datetime(
year=2024, month=6, day=1, hour=13, minute=3, second=39
)
assert event["arrival_gtfs"] == datetime(
assert event["gtfs_arrival_dt"] == datetime(
year=2024, month=6, day=1, hour=13, minute=11, second=3
)

Expand All @@ -153,16 +153,16 @@ def test_gtfs_rt_to_bus_events() -> None:

# no arrival time at this stop
if event["stop_id"] == "12005":
assert event["travel_towards_gtfs"] == datetime(
assert event["gtfs_travel_to_dt"] == datetime(
year=2024, month=6, day=1, hour=12, minute=47, second=23
)
assert event["arrival_gtfs"] is None
assert event["gtfs_arrival_dt"] is None

if event["stop_id"] == "17091":
assert event["travel_towards_gtfs"] == datetime(
assert event["gtfs_travel_to_dt"] == datetime(
year=2024, month=6, day=1, hour=12, minute=52, second=41
)
assert event["arrival_gtfs"] is None
assert event["gtfs_arrival_dt"] is None

# get an empty dataframe by reading the same files but for events the day prior.
previous_service_date = service_date - timedelta(days=1)
Expand Down Expand Up @@ -202,7 +202,7 @@ def route_one() -> pl.DataFrame:
"direction_id": [0, 0, 0, 0],
"trip_id": ["101", "101", "101", "101"],
"stop_id": ["1", "1", "2", "2"],
"stop_sequence": ["1", "1", "2", "2"],
"stop_sequence": [1, 1, 2, 2],
"start_time": ["08:45:00", "08:45:00", "08:45:00", "08:45:00"],
"service_date": ["20240601", "20240601", "20240601", "20240601"],
"vehicle_id": ["y1001", "y1001", "y1001", "y1001"],
Expand Down Expand Up @@ -233,7 +233,7 @@ def route_two() -> pl.DataFrame:
"direction_id": [0, 0, 0, 0, 0, 0],
"trip_id": ["202", "202", "202", "202", "202", "202"],
"stop_id": ["1", "1", "1", "2", "2", "2"],
"stop_sequence": ["1", "1", "1", "2", "2", "2"],
"stop_sequence": [1, 1, 1, 2, 2, 2],
"start_time": [
"09:45:00",
"09:45:00",
Expand Down Expand Up @@ -282,7 +282,7 @@ def route_three() -> pl.DataFrame:
"direction_id": [0, 0, 0, 0, 0, 0],
"trip_id": ["303", "303", "303", "303", "303", "303"],
"stop_id": ["1", "1", "1", "2", "2", "2"],
"stop_sequence": ["1", "1", "1", "2", "2", "2"],
"stop_sequence": [1, 1, 1, 2, 2, 2],
"start_time": [
"10:45:00",
"10:45:00",
Expand Down Expand Up @@ -331,7 +331,7 @@ def route_four() -> pl.DataFrame:
"direction_id": [0, 0, 0, 0, 0, 0],
"trip_id": ["404", "404", "404", "404", "404", "404"],
"stop_id": ["1", "1", "1", "2", "2", "2"],
"stop_sequence": ["1", "1", "1", "2", "2", "2"],
"stop_sequence": [1, 1, 1, 2, 2, 2],
"start_time": [
"11:45:00",
"11:45:00",
Expand Down
Loading

0 comments on commit d796f42

Please sign in to comment.