diff --git a/python_src/src/lamp_py/migrations/versions/performance_manager_prod/001_5d9a7ee21ae5_initial_prod_schema.py b/python_src/src/lamp_py/migrations/versions/performance_manager_prod/001_5d9a7ee21ae5_initial_prod_schema.py index b520fa28..0310e81b 100644 --- a/python_src/src/lamp_py/migrations/versions/performance_manager_prod/001_5d9a7ee21ae5_initial_prod_schema.py +++ b/python_src/src/lamp_py/migrations/versions/performance_manager_prod/001_5d9a7ee21ae5_initial_prod_schema.py @@ -293,6 +293,7 @@ def upgrade() -> None: sa.Column("pm_trip_id", sa.Integer(), nullable=False), sa.Column("stop_sequence", sa.SmallInteger(), nullable=True), sa.Column("canonical_stop_sequence", sa.SmallInteger(), nullable=True), + sa.Column("sync_stop_sequence", sa.SmallInteger(), nullable=True), sa.Column("stop_id", sa.String(length=60), nullable=False), sa.Column("parent_station", sa.String(length=60), nullable=False), sa.Column( diff --git a/python_src/src/lamp_py/migrations/versions/performance_manager_prod/sql_strings/strings_001.py b/python_src/src/lamp_py/migrations/versions/performance_manager_prod/sql_strings/strings_001.py index 78359f4c..9f9be969 100644 --- a/python_src/src/lamp_py/migrations/versions/performance_manager_prod/sql_strings/strings_001.py +++ b/python_src/src/lamp_py/migrations/versions/performance_manager_prod/sql_strings/strings_001.py @@ -387,6 +387,8 @@ , ve.stop_sequence , ve.canonical_stop_sequence , prev_ve.canonical_stop_sequence as previous_canonical_stop_sequence + , ve.sync_stop_sequence + , prev_ve.sync_stop_sequence as previous_sync_stop_sequence , ve.stop_id , prev_ve.stop_id as previous_stop_id , ve.parent_station diff --git a/python_src/src/lamp_py/migrations/versions/performance_manager_staging/001_5d9a7ee21ae5_initial_prod_schema.py b/python_src/src/lamp_py/migrations/versions/performance_manager_staging/001_5d9a7ee21ae5_initial_prod_schema.py index b520fa28..0310e81b 100644 --- a/python_src/src/lamp_py/migrations/versions/performance_manager_staging/001_5d9a7ee21ae5_initial_prod_schema.py +++ b/python_src/src/lamp_py/migrations/versions/performance_manager_staging/001_5d9a7ee21ae5_initial_prod_schema.py @@ -293,6 +293,7 @@ def upgrade() -> None: sa.Column("pm_trip_id", sa.Integer(), nullable=False), sa.Column("stop_sequence", sa.SmallInteger(), nullable=True), sa.Column("canonical_stop_sequence", sa.SmallInteger(), nullable=True), + sa.Column("sync_stop_sequence", sa.SmallInteger(), nullable=True), sa.Column("stop_id", sa.String(length=60), nullable=False), sa.Column("parent_station", sa.String(length=60), nullable=False), sa.Column( diff --git a/python_src/src/lamp_py/migrations/versions/performance_manager_staging/sql_strings/strings_001.py b/python_src/src/lamp_py/migrations/versions/performance_manager_staging/sql_strings/strings_001.py index 78359f4c..9f9be969 100644 --- a/python_src/src/lamp_py/migrations/versions/performance_manager_staging/sql_strings/strings_001.py +++ b/python_src/src/lamp_py/migrations/versions/performance_manager_staging/sql_strings/strings_001.py @@ -387,6 +387,8 @@ , ve.stop_sequence , ve.canonical_stop_sequence , prev_ve.canonical_stop_sequence as previous_canonical_stop_sequence + , ve.sync_stop_sequence + , prev_ve.sync_stop_sequence as previous_sync_stop_sequence , ve.stop_id , prev_ve.stop_id as previous_stop_id , ve.parent_station diff --git a/python_src/src/lamp_py/performance_manager/README.md b/python_src/src/lamp_py/performance_manager/README.md index 503e205d..8ed569e3 100644 --- a/python_src/src/lamp_py/performance_manager/README.md +++ b/python_src/src/lamp_py/performance_manager/README.md @@ -13,6 +13,7 @@ Performance Manager is an application to measure rail performance on the MBTA tr | stop_id | string | false | | | stop_sequence | small integer | false | | | canonical_stop_sequence | small integer | false | stop_sequence based on "typical" route trips as defined in the [static_route_patterns](#static_route_patterns) table| +| sync_stop_sequence | small integer | false | stop_sequence that is consistent across all branches of a trunk for a particular `parent_station`| | parent_station | string | false | | | previous_trip_stop_pm_event_id | integer | true | pm_event_id of previous stop of pm_trip_id grouping | | next_trip_stop_pm_event_id | integer | true| pm_event_id of next stop of pm_trip_id grouping | diff --git a/python_src/src/lamp_py/performance_manager/l1_rt_trips.py b/python_src/src/lamp_py/performance_manager/l1_rt_trips.py index c7f4a961..17c1d23c 100644 --- a/python_src/src/lamp_py/performance_manager/l1_rt_trips.py +++ b/python_src/src/lamp_py/performance_manager/l1_rt_trips.py @@ -506,107 +506,275 @@ def update_directions(db_manager: DatabaseManager) -> None: process_logger.log_complete() -def update_canonical_stop_sequence(db_manager: DatabaseManager) -> None: +def update_stop_sequence(db_manager: DatabaseManager) -> None: """ - Update canonical_stop_sequence from static_route_patterns + Update canonical_stop_sequence and sync_stop_sequence from static_route_patterns """ - select_update_params = sa.select( - TempEventCompare.service_date, - TempEventCompare.static_version_key, - ).distinct() - - for result in db_manager.select_as_list(select_update_params): - service_date = result["service_date"] - version_key = result["static_version_key"] + static_canon = ( + sa.select( + StaticRoutePatterns.direction_id, + StaticTrips.trunk_route_id, + sa.func.coalesce( + StaticTrips.branch_route_id, StaticTrips.trunk_route_id + ).label("route_id"), + StaticStops.parent_station, + StaticStopTimes.stop_sequence, + StaticRoutePatterns.static_version_key, + ) + .select_from(StaticRoutePatterns) + .join( + StaticTrips, + sa.and_( + StaticRoutePatterns.representative_trip_id + == StaticTrips.trip_id, + StaticRoutePatterns.static_version_key + == StaticTrips.static_version_key, + ), + ) + .join( + StaticStopTimes, + sa.and_( + StaticRoutePatterns.representative_trip_id + == StaticStopTimes.trip_id, + StaticRoutePatterns.static_version_key + == StaticStopTimes.static_version_key, + ), + ) + .join( + StaticStops, + sa.and_( + StaticStopTimes.stop_id == StaticStops.stop_id, + StaticStopTimes.static_version_key + == StaticStops.static_version_key, + ), + ) + .where( + StaticRoutePatterns.static_version_key + == sa.func.any( + sa.func.array( + sa.select(TempEventCompare.static_version_key) + .distinct() + .scalar_subquery() + ) + ), + StaticRoutePatterns.route_pattern_typicality == 1, + ) + .subquery("static_canon") + ) - static_sub = ( - sa.select( - StaticRoutePatterns.direction_id, + rt_canon = ( + sa.select( + VehicleEvents.pm_event_id, + static_canon.c.stop_sequence, + ) + .select_from(VehicleEvents) + .join( + VehicleTrips, + VehicleEvents.pm_trip_id == VehicleTrips.pm_trip_id, + ) + .join( + static_canon, + sa.and_( + VehicleTrips.direction_id == static_canon.c.direction_id, sa.func.coalesce( - StaticTrips.branch_route_id, StaticTrips.trunk_route_id - ).label("route_id"), - StaticStops.parent_station, - StaticStopTimes.stop_sequence, - ) - .select_from(StaticRoutePatterns) - .join( - StaticTrips, - sa.and_( - StaticRoutePatterns.representative_trip_id - == StaticTrips.trip_id, - StaticRoutePatterns.static_version_key - == StaticTrips.static_version_key, - ), - ) - .join( - StaticStopTimes, - sa.and_( - StaticRoutePatterns.representative_trip_id - == StaticStopTimes.trip_id, - StaticRoutePatterns.static_version_key - == StaticStopTimes.static_version_key, - ), - ) - .join( - StaticStops, - sa.and_( - StaticStopTimes.stop_id == StaticStops.stop_id, - StaticStopTimes.static_version_key - == StaticStops.static_version_key, - ), - ) - .where( - StaticRoutePatterns.static_version_key == version_key, - StaticRoutePatterns.route_pattern_typicality == 1, - StaticStopTimes.static_version_key == version_key, - StaticTrips.static_version_key == version_key, - StaticStops.static_version_key == version_key, - ) - .subquery("static_sub") + VehicleTrips.branch_route_id, + VehicleTrips.trunk_route_id, + ) + == static_canon.c.route_id, + VehicleTrips.static_version_key + == static_canon.c.static_version_key, + VehicleEvents.parent_station == static_canon.c.parent_station, + ), ) - - canonical_sub = ( - sa.select( - VehicleEvents.pm_event_id, - static_sub.c.stop_sequence, - ) - .select_from(VehicleEvents) - .join( - VehicleTrips, - VehicleEvents.pm_trip_id == VehicleTrips.pm_trip_id, - ) - .join( - static_sub, - sa.and_( - VehicleTrips.direction_id == static_sub.c.direction_id, - sa.func.coalesce( - VehicleTrips.branch_route_id, - VehicleTrips.trunk_route_id, - ) - == static_sub.c.route_id, - VehicleEvents.parent_station == static_sub.c.parent_station, - ), - ) - .where( - VehicleEvents.service_date == service_date, - VehicleTrips.static_version_key == version_key, - ) - .subquery("canonical_sub") + .where( + VehicleEvents.service_date + == sa.func.any( + sa.func.array( + sa.select(TempEventCompare.service_date) + .distinct() + .scalar_subquery() + ) + ), ) + .subquery("rt_canon") + ) - update_query = ( - sa.update(VehicleEvents.__table__) - .values( - canonical_stop_sequence=canonical_sub.c.stop_sequence, - ) - .where( - VehicleEvents.pm_event_id == canonical_sub.c.pm_event_id, - ) + update_rt_canon = ( + sa.update(VehicleEvents.__table__) + .values( + canonical_stop_sequence=rt_canon.c.stop_sequence, + ) + .where( + VehicleEvents.pm_event_id == rt_canon.c.pm_event_id, ) + ) process_logger = ProcessLogger("l1_events.update_canonical_stop_sequence") process_logger.log_start() - db_manager.execute(update_query) + db_manager.execute(update_rt_canon) + process_logger.log_complete() + + # select "zero_point" parent_stations + # this query will produce one parent_station for each trunk_route_id that + # is the most likey to have all branch_routes passing through them + zero_point_stop = ( + sa.select( + static_canon.c.trunk_route_id, + static_canon.c.parent_station, + sa.literal(0).label("sync_start"), + ) + .distinct( + static_canon.c.trunk_route_id, + ) + .group_by( + static_canon.c.trunk_route_id, + static_canon.c.parent_station, + ) + .order_by( + static_canon.c.trunk_route_id, + sa.func.count( + static_canon.c.stop_sequence, + ).desc(), + ( + sa.func.max(static_canon.c.stop_sequence) + - sa.func.min(static_canon.c.stop_sequence) + ).desc(), + ) + .subquery("zero_points") + ) + + # select stop_sequence number for the zerp_point parent_station of each route-branch, + # consider this value the stop_sequence "adjustment" value + zero_seq_vals = ( + sa.select( + static_canon.c.direction_id, + static_canon.c.route_id, + static_canon.c.stop_sequence.label("seq_adjust"), + ) + .select_from(static_canon) + .join( + zero_point_stop, + sa.and_( + zero_point_stop.c.trunk_route_id + == static_canon.c.trunk_route_id, + zero_point_stop.c.parent_station + == static_canon.c.parent_station, + ), + ) + .subquery("zero_seq_vals") + ) + + # select the minimum stop_sequence value and minimum difference + # between a stop_sequence and stop_sequence "adjustment" for each branch-route + # these values will be used to normalize canonical stop_sequence values across a trunk + sync_adjust_vals = ( + sa.select( + static_canon.c.direction_id, + static_canon.c.trunk_route_id, + sa.func.min(static_canon.c.stop_sequence).label("min_seq"), + sa.func.min( + static_canon.c.stop_sequence - zero_seq_vals.c.seq_adjust + ).label("min_sync"), + ) + .select_from(static_canon) + .join( + zero_seq_vals, + sa.and_( + zero_seq_vals.c.direction_id == static_canon.c.direction_id, + zero_seq_vals.c.route_id == static_canon.c.route_id, + ), + ) + .group_by( + static_canon.c.direction_id, + static_canon.c.trunk_route_id, + ) + .subquery("sync_adjust_vals") + ) + + # create sync_stop_sequence + # canonical_stop_sequence - zero_parent_stop_sequence - minimum_sync_sequence_adjustment(for trunk) + minimum_canonical_stop_sequence(for trunk) + sync_values = ( + sa.select( + static_canon.c.direction_id, + static_canon.c.route_id, + static_canon.c.parent_station, + static_canon.c.static_version_key, + ( + static_canon.c.stop_sequence + - zero_seq_vals.c.seq_adjust + - sync_adjust_vals.c.min_sync + + sync_adjust_vals.c.min_seq + ).label("sync_stop_sequence"), + ) + .select_from(static_canon) + .join( + zero_seq_vals, + sa.and_( + zero_seq_vals.c.direction_id == static_canon.c.direction_id, + zero_seq_vals.c.route_id == static_canon.c.route_id, + ), + ) + .join( + sync_adjust_vals, + sa.and_( + sync_adjust_vals.c.direction_id == static_canon.c.direction_id, + sync_adjust_vals.c.trunk_route_id + == static_canon.c.trunk_route_id, + ), + ) + .subquery(("sync_values")) + ) + + rt_sync = ( + sa.select( + VehicleEvents.pm_event_id, + sync_values.c.sync_stop_sequence, + ) + .select_from(VehicleEvents) + .join( + VehicleTrips, + VehicleEvents.pm_trip_id == VehicleTrips.pm_trip_id, + ) + .join( + sync_values, + sa.and_( + VehicleTrips.direction_id == sync_values.c.direction_id, + sa.func.coalesce( + VehicleTrips.branch_route_id, + VehicleTrips.trunk_route_id, + ) + == sync_values.c.route_id, + VehicleTrips.static_version_key + == sync_values.c.static_version_key, + VehicleEvents.parent_station == sync_values.c.parent_station, + ), + ) + .where( + VehicleEvents.service_date + == sa.func.any( + sa.func.array( + sa.select(TempEventCompare.service_date) + .distinct() + .scalar_subquery() + ) + ), + ) + .subquery("rt_sync") + ) + + update_rt_sync = ( + sa.update(VehicleEvents.__table__) + .values( + sync_stop_sequence=rt_sync.c.sync_stop_sequence, + ) + .where( + VehicleEvents.pm_event_id == rt_sync.c.pm_event_id, + ) + ) + + process_logger = ProcessLogger("l1_events.update_sync_stop_sequence") + process_logger.log_start() + db_manager.execute(update_rt_sync) process_logger.log_complete() @@ -776,5 +944,5 @@ def process_trips(db_manager: DatabaseManager) -> None: update_static_trip_id_guess_exact(db_manager) update_start_times(db_manager) update_directions(db_manager) - update_canonical_stop_sequence(db_manager) + update_stop_sequence(db_manager) update_backup_static_trip_id(db_manager) diff --git a/python_src/src/lamp_py/postgres/postgres_schema.py b/python_src/src/lamp_py/postgres/postgres_schema.py index a8c736b0..292ef60d 100644 --- a/python_src/src/lamp_py/postgres/postgres_schema.py +++ b/python_src/src/lamp_py/postgres/postgres_schema.py @@ -26,6 +26,7 @@ class VehicleEvents(SqlBase): # pylint: disable=too-few-public-methods # stop identifiers stop_sequence = sa.Column(sa.SmallInteger, nullable=True) canonical_stop_sequence = sa.Column(sa.SmallInteger, nullable=True) + sync_stop_sequence = sa.Column(sa.SmallInteger, nullable=True) stop_id = sa.Column(sa.String(60), nullable=False) parent_station = sa.Column(sa.String(60), nullable=False) diff --git a/python_src/tests/performance_manager/test_performance_manager.py b/python_src/tests/performance_manager/test_performance_manager.py index 923f5286..971ce870 100644 --- a/python_src/tests/performance_manager/test_performance_manager.py +++ b/python_src/tests/performance_manager/test_performance_manager.py @@ -417,6 +417,7 @@ def test_gtfs_rt_processing( "headway_trunk_seconds", "headway_branch_seconds", "canonical_stop_sequence", + "sync_stop_sequence", } expected_columns.add("trip_id") expected_columns.add("vehicle_label")