Skip to content

Commit

Permalink
try a different schema and no gc
Browse files Browse the repository at this point in the history
  • Loading branch information
mzappitello committed Oct 17, 2023
1 parent f982e3e commit 26cc48c
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 14 deletions.
5 changes: 2 additions & 3 deletions python_src/src/lamp_py/ingestion/convert_gtfs_rt.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import logging
import os
import gc
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from datetime import datetime, timezone
Expand Down Expand Up @@ -326,7 +325,8 @@ def gz_to_pyarrow(

except FileNotFoundError as _:
return (None, filename, None)
except Exception as _:
except Exception as e:
print(e)
self.thread_init()
return (None, filename, None)

Expand Down Expand Up @@ -355,7 +355,6 @@ def write_table(self, table: pyarrow.table) -> None:
pl.log_complete()

if self.detail.table_sort_order is not None:
gc.collect()
pl = ProcessLogger(
"sort_table",
config_type=self.config_type,
Expand Down
34 changes: 23 additions & 11 deletions python_src/src/lamp_py/ingestion/gtfs_rt_structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,33 @@

trip_descriptor = pyarrow.struct(
[
("trip_id", pyarrow.string()),
("route_id", pyarrow.string()),
("trip_id", pyarrow.dictionary(pyarrow.int32(), pyarrow.utf8())),
("route_id", pyarrow.dictionary(pyarrow.int32(), pyarrow.utf8())),
("direction_id", pyarrow.uint8()),
("start_time", pyarrow.string()),
("start_date", pyarrow.string()),
("schedule_relationship", pyarrow.string()),
("route_pattern_id", pyarrow.string()), # MBTA Enhanced Field
("tm_trip_id", pyarrow.string()), # Only used by Busloc
("start_time", pyarrow.dictionary(pyarrow.int32(), pyarrow.utf8())),
("start_date", pyarrow.dictionary(pyarrow.int32(), pyarrow.utf8())),
(
"schedule_relationship",
pyarrow.dictionary(pyarrow.int32(), pyarrow.utf8()),
),
(
"route_pattern_id",
pyarrow.dictionary(pyarrow.int32(), pyarrow.utf8()),
), # MBTA Enhanced Field
(
"tm_trip_id",
pyarrow.dictionary(pyarrow.int32(), pyarrow.utf8()),
), # Only used by Busloc
("overload_id", pyarrow.int64()), # Only used by Busloc
("overload_offset", pyarrow.int64()), # Only used by Busloc
]
)

vehicle_descriptor = pyarrow.struct(
[
("id", pyarrow.string()),
("label", pyarrow.string()),
("license_plate", pyarrow.string()),
("id", pyarrow.dictionary(pyarrow.int32(), pyarrow.utf8())),
("label", pyarrow.dictionary(pyarrow.int32(), pyarrow.utf8())),
("license_plate", pyarrow.dictionary(pyarrow.int32(), pyarrow.utf8())),
(
"consist",
pyarrow.list_(
Expand All @@ -40,7 +49,10 @@
),
),
), # MBTA Enhanced Field
("assignment_status", pyarrow.string()), # Only used by Busloc
(
"assignment_status",
pyarrow.dictionary(pyarrow.int32(), pyarrow.utf8()),
), # Only used by Busloc
]
)

Expand Down

0 comments on commit 26cc48c

Please sign in to comment.