diff --git a/src/main/java/com/conveyal/r5/analyst/NetworkPreloader.java b/src/main/java/com/conveyal/r5/analyst/NetworkPreloader.java index e3683fd36..1d9e40960 100644 --- a/src/main/java/com/conveyal/r5/analyst/NetworkPreloader.java +++ b/src/main/java/com/conveyal/r5/analyst/NetworkPreloader.java @@ -79,7 +79,7 @@ public NetworkPreloader(TransportNetworkCache transportNetworkCache) { this.transportNetworkCache = transportNetworkCache; } - public LoaderState preloadData (AnalysisWorkerTask task) { + public LoaderState preload (AnalysisWorkerTask task) { if (task.scenario != null) { transportNetworkCache.rememberScenario(task.scenario); } @@ -94,10 +94,11 @@ public LoaderState preloadData (AnalysisWorkerTask task) { * similar tasks will make interleaved calls to setProgress (with superficial map synchronization). Other than * causing a value to briefly revert from PRESENT to BUILDING this doesn't seem deeply problematic. * This is provided specifically for regional tasks, to ensure that they remain in preloading mode while all this - * data is prepared. + * data is prepared. + * Any exceptions that occur while building the network will escape this method, leaving the status as BUILDING. */ - public TransportNetwork synchronousPreload (AnalysisWorkerTask task) { - return buildValue(Key.forTask(task)); + public TransportNetwork preloadBlocking (AnalysisWorkerTask task) { + return getBlocking(Key.forTask(task)); } @Override @@ -140,7 +141,7 @@ protected TransportNetwork buildValue(Key key) { linkedPointSet.getEgressCostTable(progressListener); } } - // Finished building all needed inputs for analysis, return the completed network to the AsyncLoader code. + // Finished building all needed inputs for analysis, return the completed network return scenarioNetwork; } diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java b/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java index d8e89dfd9..2e18b7b34 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/AnalysisWorker.java @@ -314,7 +314,7 @@ public static void sleepSeconds (int seconds) { protected byte[] handleAndSerializeOneSinglePointTask (TravelTimeSurfaceTask task) throws IOException { LOG.debug("Handling single-point task {}", task.toString()); // Get all the data needed to run one analysis task, or at least begin preparing it. - final AsyncLoader.LoaderState networkLoaderState = networkPreloader.preloadData(task); + final AsyncLoader.LoaderState networkLoaderState = networkPreloader.preload(task); // If loading is not complete, bail out of this function. // Ideally we'd stall briefly using something like Future.get(timeout) in case loading finishes quickly. @@ -467,7 +467,7 @@ protected void handleOneRegionalTask (RegionalTask task) throws Throwable { // LoadingCaches behind it. Specifically, the TransportNetworkCache and LinkageCache enforce turn-taking and // prevent redundant work. If those are ever removed, we would need either async regional task preparation, or // a synchronous mode with per-key blocking on AsyncLoader (kind of reinventing the wheel of LoadingCache). - TransportNetwork transportNetwork = networkPreloader.synchronousPreload(task); + TransportNetwork transportNetwork = networkPreloader.preloadBlocking(task); // If we are generating a static site, there must be a single metadata file for an entire batch of results. // Arbitrarily we create this metadata as part of the first task in the job. diff --git a/src/main/java/com/conveyal/r5/util/AsyncLoader.java b/src/main/java/com/conveyal/r5/util/AsyncLoader.java index 4606803bb..256ee5c2b 100644 --- a/src/main/java/com/conveyal/r5/util/AsyncLoader.java +++ b/src/main/java/com/conveyal/r5/util/AsyncLoader.java @@ -28,7 +28,7 @@ * "value is present in map". * * Potential problem: if we try to return immediately saying whether the needed data are available, - * there are some cases where preparing the reqeusted object might take only a few hundred milliseconds or less. + * there are some cases where preparing the requested object might take only a few hundred milliseconds or less. * In that case then we don't want the caller to have to re-poll. In this case a Future.get() with timeout is good. * * Created by abyrd on 2018-09-14 @@ -97,10 +97,23 @@ public String toString() { } } + /** + * This has been factored out of the executor runnables so subclasses can force a blocking (non-async) load. + * Any exceptions that occur while building the value will escape this method, leaving the status as BUILDING. + */ + protected V getBlocking (K key) { + V value = buildValue(key); + synchronized (map) { + map.put(key, new LoaderState(Status.PRESENT, "Loaded", 100, value)); + } + return value; + } + /** * Attempt to fetch the value for the supplied key. * If the value is not yet present, and not yet being computed / fetched, enqueue a task to do so. * Return a response that reports status, and may or may not contain the value. + * Any exception that occurs while building the value is caught and associated with the key with a status of ERROR. */ public LoaderState get (K key) { LoaderState state = null; @@ -109,7 +122,7 @@ public LoaderState get (K key) { state = map.get(key); if (state == null) { // Only enqueue a task to load the value for this key if another call hasn't already done it. - state = new LoaderState(Status.WAITING, "Enqueued task...", 0, null); + state = new LoaderState<>(Status.WAITING, "Enqueued task...", 0, null); map.put(key, state); enqueueLoadTask = true; } @@ -120,16 +133,16 @@ public LoaderState get (K key) { // Enqueue task outside the above block (synchronizing the fewest lines possible). if (enqueueLoadTask) { executor.execute(() -> { - setProgress(key, 0, "Starting..."); try { - V value = buildValue(key); - synchronized (map) { - map.put(key, new LoaderState(Status.PRESENT, null, 100, value)); - } + setProgress(key, 0, "Starting..."); + getBlocking(key); } catch (Throwable t) { // It's essential to trap Throwable rather than just Exception. Otherwise the executor - // threads can be killed by any Error that happens, stalling the executor. - setError(key, t); + // threads can be killed by any Error that happens, stalling the executor. The below permanently + // associates an error with the key. No further attempt will ever be made to create the value. + synchronized (map) { + map.put(key, new LoaderState(t)); + } LOG.error("Async load failed: " + ExceptionUtils.stackTraceString(t)); } }); @@ -139,12 +152,13 @@ public LoaderState get (K key) { /** * Override this method in concrete subclasses to specify the logic to build/calculate/fetch a value. - * Implementations may call setProgress to report progress on long operations. + * Implementations may call setProgress to report progress on long operations; if they do so, any callers of this + * method are responsible for also calling setComplete() to ensure loaded objects are marked as PRESENT. * Throw an exception to indicate an error has occurred and the building process cannot complete. * It's not entirely clear this should return a value - might be better to call setValue within the overridden * method, just as we call setProgress or setError. */ - protected abstract V buildValue(K key) throws Exception; + protected abstract V buildValue(K key); /** * Call this method inside the buildValue method to indicate progress. @@ -155,13 +169,4 @@ public void setProgress(K key, int percentComplete, String message) { } } - /** - * Call this method inside the buildValue method to indicate that an unrecoverable error has happened. - * FIXME this will permanently associate an error with the key. No further attempt will ever be made to create the value. - */ - protected void setError (K key, Throwable throwable) { - synchronized (map) { - map.put(key, new LoaderState(throwable)); - } - } }