Skip to content

Commit

Permalink
Merge pull request opentripplanner#5974 from entur/publish_timetable_…
Browse files Browse the repository at this point in the history
…snapshot_in_background

Publish timetable snapshot in background
  • Loading branch information
vpaturet authored Jul 25, 2024
2 parents 09ced42 + cd8e29e commit de56d00
Show file tree
Hide file tree
Showing 11 changed files with 286 additions and 250 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ public class SiriTimetableSnapshotSource implements TimetableSnapshotProvider {
*/
private final SiriTripPatternCache tripPatternCache;

private final TransitEditorService transitService;
/**
* Long-lived transit editor service that has access to the timetable snapshot buffer.
* This differs from the usual use case where the transit service refers to the latest published
* timetable snapshot.
*/
private final TransitEditorService transitEditorService;

private final TimetableSnapshotManager snapshotManager;

Expand All @@ -71,9 +76,10 @@ public SiriTimetableSnapshotSource(
parameters,
() -> LocalDate.now(transitModel.getTimeZone())
);
this.transitService = new DefaultTransitService(transitModel);
this.transitEditorService =
new DefaultTransitService(transitModel, getTimetableSnapshotBuffer());
this.tripPatternCache =
new SiriTripPatternCache(tripPatternIdGenerator, transitService::getPatternForTrip);
new SiriTripPatternCache(tripPatternIdGenerator, transitEditorService::getPatternForTrip);

transitModel.initTimetableSnapshotProvider(this);
}
Expand Down Expand Up @@ -102,26 +108,22 @@ public UpdateResult applyEstimatedTimetable(

List<Result<UpdateSuccess, UpdateError>> results = new ArrayList<>();

snapshotManager.withLock(() -> {
if (incrementality == FULL_DATASET) {
// Remove all updates from the buffer
snapshotManager.clearBuffer(feedId);
}
if (incrementality == FULL_DATASET) {
// Remove all updates from the buffer
snapshotManager.clearBuffer(feedId);
}

for (var etDelivery : updates) {
for (var estimatedJourneyVersion : etDelivery.getEstimatedJourneyVersionFrames()) {
var journeys = estimatedJourneyVersion.getEstimatedVehicleJourneies();
LOG.debug("Handling {} EstimatedVehicleJourneys.", journeys.size());
for (EstimatedVehicleJourney journey : journeys) {
results.add(apply(journey, transitService, fuzzyTripMatcher, entityResolver));
}
for (var etDelivery : updates) {
for (var estimatedJourneyVersion : etDelivery.getEstimatedJourneyVersionFrames()) {
var journeys = estimatedJourneyVersion.getEstimatedVehicleJourneies();
LOG.debug("Handling {} EstimatedVehicleJourneys.", journeys.size());
for (EstimatedVehicleJourney journey : journeys) {
results.add(apply(journey, transitEditorService, fuzzyTripMatcher, entityResolver));
}
}
}

LOG.debug("message contains {} trip updates", updates.size());

snapshotManager.purgeAndCommit();
});
LOG.debug("message contains {} trip updates", updates.size());

return UpdateResult.ofResults(results);
}
Expand All @@ -131,6 +133,10 @@ public TimetableSnapshot getTimetableSnapshot() {
return snapshotManager.getTimetableSnapshot();
}

private TimetableSnapshot getTimetableSnapshotBuffer() {
return snapshotManager.getTimetableSnapshotBuffer();
}

private Result<UpdateSuccess, UpdateError> apply(
EstimatedVehicleJourney journey,
TransitEditorService transitService,
Expand Down Expand Up @@ -195,11 +201,7 @@ private boolean shouldAddNewTrip(
* Snapshot timetable is used as source if initialised, trip patterns scheduled timetable if not.
*/
private Timetable getCurrentTimetable(TripPattern tripPattern, LocalDate serviceDate) {
TimetableSnapshot timetableSnapshot = getTimetableSnapshot();
if (timetableSnapshot != null) {
return timetableSnapshot.resolve(tripPattern, serviceDate);
}
return tripPattern.getScheduledTimetable();
return getTimetableSnapshotBuffer().resolve(tripPattern, serviceDate);
}

private Result<TripUpdate, UpdateError> handleModifiedTrip(
Expand Down Expand Up @@ -228,7 +230,7 @@ private Result<TripUpdate, UpdateError> handleModifiedTrip(

if (trip != null) {
// Found exact match
pattern = transitService.getPatternForTrip(trip);
pattern = transitEditorService.getPatternForTrip(trip);
} else if (fuzzyTripMatcher != null) {
// No exact match found - search for trips based on arrival-times/stop-patterns
TripAndPattern tripAndPattern = fuzzyTripMatcher.match(
Expand Down Expand Up @@ -263,7 +265,7 @@ private Result<TripUpdate, UpdateError> handleModifiedTrip(
pattern,
estimatedVehicleJourney,
serviceDate,
transitService.getTimeZone(),
transitEditorService.getTimeZone(),
entityResolver
)
.build();
Expand Down Expand Up @@ -310,7 +312,7 @@ private Result<UpdateSuccess, UpdateError> addTripToGraphAndBuffer(TripUpdate tr
private boolean markScheduledTripAsDeleted(Trip trip, final LocalDate serviceDate) {
boolean success = false;

final TripPattern pattern = transitService.getPatternForTrip(trip);
final TripPattern pattern = transitEditorService.getPatternForTrip(trip);

if (pattern != null) {
// Mark scheduled trip times for this trip in this pattern as deleted
Expand All @@ -329,4 +331,11 @@ private boolean markScheduledTripAsDeleted(Trip trip, final LocalDate serviceDat

return success;
}

/**
* Flush pending changes in the timetable snapshot buffer and publish a new snapshot.
*/
public void flushBuffer() {
snapshotManager.purgeAndCommit();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@
* up timetables on this class could conceivably be replaced with snapshotting entire views of the
* transit network. It would also be possible to make the realtime version of Timetables or
* TripTimes the primary view, and include references back to their scheduled versions.
* <p>
* Implementation note: when a snapshot is committed, the mutable state of this class is stored
* in final fields and completely initialized in the constructor. This provides an additional
* guarantee of safe-publication without synchronization.
* (see <a href="https://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html#jls-17.5">final Field Semantics</a>)
*/
public class TimetableSnapshot {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ public DefaultTransitService(TransitModel transitModel) {
this.transitModelIndex = transitModel.getTransitModelIndex();
}

public DefaultTransitService(
TransitModel transitModel,
TimetableSnapshot timetableSnapshotBuffer
) {
this(transitModel);
this.timetableSnapshot = timetableSnapshotBuffer;
}

@Override
public Collection<String> getFeedIds() {
return this.transitModel.getFeedIds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource;
import org.opentripplanner.ext.siri.updater.SiriETUpdater;
import org.opentripplanner.ext.siri.updater.SiriSXUpdater;
Expand All @@ -20,6 +21,7 @@
import org.opentripplanner.updater.UpdatersParameters;
import org.opentripplanner.updater.alert.GtfsRealtimeAlertsUpdater;
import org.opentripplanner.updater.spi.GraphUpdater;
import org.opentripplanner.updater.spi.TimetableSnapshotFlush;
import org.opentripplanner.updater.trip.MqttGtfsRealtimeUpdater;
import org.opentripplanner.updater.trip.PollingTripUpdater;
import org.opentripplanner.updater.trip.TimetableSnapshotSource;
Expand Down Expand Up @@ -94,6 +96,9 @@ private void configure() {
);

GraphUpdaterManager updaterManager = new GraphUpdaterManager(graph, transitModel, updaters);

configureTimetableSnapshotFlush(updaterManager);

updaterManager.startUpdaters();

// Stop the updater manager if it contains nothing
Expand Down Expand Up @@ -223,4 +228,21 @@ private TimetableSnapshotSource provideGtfsTimetableSnapshot() {
}
return gtfsTimetableSnapshotSource;
}

/**
* If SIRI or GTFS real-time updaters are in use, configure a periodic flush of the timetable
* snapshot.
*/
private void configureTimetableSnapshotFlush(GraphUpdaterManager updaterManager) {
if (siriTimetableSnapshotSource != null || gtfsTimetableSnapshotSource != null) {
updaterManager
.getScheduler()
.scheduleWithFixedDelay(
new TimetableSnapshotFlush(siriTimetableSnapshotSource, gtfsTimetableSnapshotSource),
0,
updatersParameters.timetableSnapshotParameters().maxSnapshotFrequency().toSeconds(),
TimeUnit.SECONDS
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.opentripplanner.updater.spi;

import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource;
import org.opentripplanner.updater.trip.TimetableSnapshotSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Flush the timetable snapshot buffer by committing pending changes.
* Exceptions occurring during the flush are caught and ignored: the scheduler can then retry
* the task later.
*/
public class TimetableSnapshotFlush implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(TimetableSnapshotFlush.class);

private final SiriTimetableSnapshotSource siriTimetableSnapshotSource;
private final TimetableSnapshotSource gtfsTimetableSnapshotSource;

public TimetableSnapshotFlush(
SiriTimetableSnapshotSource siriTimetableSnapshotSource,
TimetableSnapshotSource gtfsTimetableSnapshotSource
) {
this.siriTimetableSnapshotSource = siriTimetableSnapshotSource;
this.gtfsTimetableSnapshotSource = gtfsTimetableSnapshotSource;
}

@Override
public void run() {
try {
LOG.debug("Flushing timetable snapshot buffer");
if (siriTimetableSnapshotSource != null) {
siriTimetableSnapshotSource.flushBuffer();
}
if (gtfsTimetableSnapshotSource != null) {
gtfsTimetableSnapshotSource.flushBuffer();
}
LOG.debug("Flushed timetable snapshot buffer");
} catch (Throwable t) {
LOG.error("Error flushing timetable snapshot buffer", t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

import java.time.LocalDate;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.opentripplanner.framework.time.CountdownTimer;
import org.opentripplanner.model.Timetable;
import org.opentripplanner.model.TimetableSnapshot;
import org.opentripplanner.routing.algorithm.raptoradapter.transit.mappers.TransitLayerUpdater;
import org.opentripplanner.routing.util.ConcurrentPublished;
import org.opentripplanner.transit.model.framework.FeedScopedId;
import org.opentripplanner.transit.model.framework.Result;
import org.opentripplanner.transit.model.network.TripPattern;
Expand All @@ -30,36 +29,18 @@ public final class TimetableSnapshotManager {

private static final Logger LOG = LoggerFactory.getLogger(TimetableSnapshotManager.class);
private final TransitLayerUpdater transitLayerUpdater;
/**
* Lock to indicate that buffer is in use
*/
private final ReentrantLock bufferLock = new ReentrantLock(true);

/**
* The working copy of the timetable snapshot. Should not be visible to routing threads. Should
* only be modified by a thread that holds a lock on {@link #bufferLock}. All public methods that
* might modify this buffer will correctly acquire the lock. By design, only one thread should
* ever be writing to this buffer.
* TODO RT_AB: research and document why this lock is needed since only one thread should ever be
* writing to this buffer. One possible reason may be a need to suspend writes while indexing
* and swapping out the buffer. But the original idea was to make a new copy of the buffer
* before re-indexing it. While refactoring or rewriting parts of this system, we could throw
* an exception if a writing section is entered by more than one thread.
* The working copy of the timetable snapshot. Should not be visible to routing threads.
* By design, only one thread should ever be writing to this buffer.
*/
private final TimetableSnapshot buffer = new TimetableSnapshot();

/**
* The last committed snapshot that was handed off to a routing thread. This snapshot may be given
* to more than one routing thread if the maximum snapshot frequency is exceeded.
*/
private volatile TimetableSnapshot snapshot = null;

/**
* If a timetable snapshot is requested less than this number of milliseconds after the previous
* snapshot, just return the same one. Throttles the potentially resource-consuming task of
* duplicating a TripPattern -> Timetable map and indexing the new Timetables.
* to more than one routing thread.
*/
private final CountdownTimer snapshotFrequencyThrottle;
private final ConcurrentPublished<TimetableSnapshot> snapshot = new ConcurrentPublished<>();

/**
* Should expired real-time data be purged from the graph.
Expand All @@ -85,7 +66,6 @@ public TimetableSnapshotManager(
Supplier<LocalDate> localDateNow
) {
this.transitLayerUpdater = transitLayerUpdater;
this.snapshotFrequencyThrottle = new CountdownTimer(parameters.maxSnapshotFrequency());
this.purgeExpiredData = parameters.purgeExpiredData();
this.localDateNow = Objects.requireNonNull(localDateNow);
// Force commit so that snapshot initializes
Expand All @@ -99,19 +79,17 @@ public TimetableSnapshotManager(
* to the snapshot to release resources.
*/
public TimetableSnapshot getTimetableSnapshot() {
// Try to get a lock on the buffer
if (bufferLock.tryLock()) {
// Make a new snapshot if necessary
try {
commitTimetableSnapshot(false);
return snapshot;
} finally {
bufferLock.unlock();
}
}
// No lock could be obtained because there is either a snapshot commit busy or updates
// are applied at this moment, just return the current snapshot
return snapshot;
return snapshot.get();
}

/**
* @return the current timetable snapshot buffer that contains pending changes (not yet published
* in a snapshot).
* This should be used in the context of an updater to build a TransitEditorService that sees all
* the changes applied so far by real-time updates.
*/
public TimetableSnapshot getTimetableSnapshotBuffer() {
return buffer;
}

/**
Expand All @@ -122,21 +100,12 @@ public TimetableSnapshot getTimetableSnapshot() {
*
* @param force Force the committing of a new snapshot even if the above conditions are not met.
*/
public void commitTimetableSnapshot(final boolean force) {
if (force || snapshotFrequencyThrottle.timeIsUp()) {
if (force || buffer.isDirty()) {
LOG.debug("Committing {}", buffer);
snapshot = buffer.commit(transitLayerUpdater, force);

// We only reset the timer when the snapshot is updated. This will cause the first
// update to be committed after a silent period. This should not have any effect in
// a busy updater. It is however useful when manually testing the updater.
snapshotFrequencyThrottle.restart();
} else {
LOG.debug("Buffer was unchanged, keeping old snapshot.");
}
void commitTimetableSnapshot(final boolean force) {
if (force || buffer.isDirty()) {
LOG.debug("Committing {}", buffer);
snapshot.publish(buffer.commit(transitLayerUpdater, force));
} else {
LOG.debug("Snapshot frequency exceeded. Reusing snapshot {}", snapshot);
LOG.debug("Buffer was unchanged, keeping old snapshot.");
}
}

Expand Down Expand Up @@ -205,22 +174,6 @@ private boolean purgeExpiredData() {
return buffer.purgeExpiredData(previously);
}

/**
* Execute a {@code Runnable} with a locked snapshot buffer and release the lock afterwards. While
* the action of locking and unlocking is not complicated to do for calling code, this method
* exists so that the lock instance is a private field.
*/
public void withLock(Runnable action) {
bufferLock.lock();

try {
action.run();
} finally {
// Always release lock
bufferLock.unlock();
}
}

/**
* Clear all data of snapshot for the provided feed id
*/
Expand Down
Loading

0 comments on commit de56d00

Please sign in to comment.