Skip to content

Commit

Permalink
BFD-2712: Simplify pipeline app (#1803)
Browse files Browse the repository at this point in the history
* Adds `PipelineJobRunner` to serve as a wrapper around jobs which takes care of lifecycle events like scheduling, starts, stops, and error handling.
* Modifies `PipelineManager` to manage a simple pool of `PipelineJobRunner`s.  One per job.  Tracking lifecycle events received from them and gracefully managing application shutdown when all jobs are complete or any throw an exception.  Triggers graceful shutdown by all jobs if any of them fail with an exception.
* Removes unused generic `PipelineJobArguments` on `PipelineJob` since it introduced complexity for no gain.
* Eliminates many classes made obsolete by the simpler design.
  • Loading branch information
brianburton authored Jun 30, 2023
1 parent 31a903f commit 372d8f3
Show file tree
Hide file tree
Showing 32 changed files with 1,202 additions and 2,387 deletions.
7 changes: 7 additions & 0 deletions apps/bfd-pipeline/bfd-pipeline-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@
<scope>test</scope>
</dependency>

<dependency>
<!-- Mockito utilities for JUnit / Jupiter. -->
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import gov.cms.bfd.pipeline.ccw.rif.load.RifLoader;
import gov.cms.bfd.pipeline.ccw.rif.load.RifRecordLoadResult;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -62,6 +63,7 @@ public void dataAvailable(RifFilesEvent rifFilesEvent) {
PipelineApplication.class.getSimpleName(), "dataSet", "processed"))
.time();

final var failed = new AtomicBoolean(false);
Consumer<Throwable> errorHandler =
error -> {
/*
Expand All @@ -74,6 +76,7 @@ public void dataAvailable(RifFilesEvent rifFilesEvent) {
* we stop that way for _any_ failure, but we probably want
* to be more discriminating than that.
*/
failed.set(true);
errorOccurred(error);
};

Expand All @@ -90,6 +93,10 @@ public void dataAvailable(RifFilesEvent rifFilesEvent) {
* and processed by the next stage.
*/
for (RifFileEvent rifFileEvent : rifFilesEvent.getFileEvents()) {
if (failed.get()) {
LOGGER.info("Stopping due to error.");
break;
}
Slf4jReporter dataSetFileMetricsReporter =
Slf4jReporter.forRegistry(rifFileEvent.getEventMetrics()).outputTo(LOGGER).build();
dataSetFileMetricsReporter.start(2, TimeUnit.MINUTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import gov.cms.bfd.pipeline.rda.grpc.RdaServerJob;
import gov.cms.bfd.pipeline.sharedutils.PipelineApplicationState;
import gov.cms.bfd.pipeline.sharedutils.PipelineJob;
import gov.cms.bfd.pipeline.sharedutils.jobs.store.PipelineJobRecordStore;
import gov.cms.bfd.sharedutils.config.AppConfigurationException;
import gov.cms.bfd.sharedutils.config.ConfigException;
import gov.cms.bfd.sharedutils.config.ConfigLoader;
Expand All @@ -41,7 +40,6 @@
import io.micrometer.core.instrument.util.HierarchicalNameMapper;
import io.micrometer.jmx.JmxConfig;
import io.micrometer.jmx.JmxMeterRegistry;
import java.lang.Thread.UncaughtExceptionHandler;
import java.sql.DatabaseMetaData;
import java.time.Clock;
import java.util.ArrayList;
Expand Down Expand Up @@ -96,7 +94,6 @@ public final class PipelineApplication {
*/
public static void main(String[] args) throws Exception {
LOGGER.info("Application starting up!");
configureUnexpectedExceptionHandlers();

AppConfiguration appConfig = null;
ConfigLoader configLoader = null;
Expand Down Expand Up @@ -222,23 +219,13 @@ public static void main(String[] args) throws Exception {
System.exit(EXIT_CODE_SMOKE_TEST_FAILURE);
}

/*
* Create the PipelineManager and register all jobs.
*/
PipelineJobRecordStore jobRecordStore = new PipelineJobRecordStore(appMetrics);
PipelineManager pipelineManager =
new PipelineManager(appMetrics, jobRecordStore, s3TaskManager);
final var pipelineManager = new PipelineManager(Thread::sleep, Clock.systemUTC(), jobs);
registerShutdownHook(appMetrics, pipelineManager);
jobs.forEach(pipelineManager::registerJob);

pipelineManager.start();
LOGGER.info("Job processing started.");

/*
* At this point, we're done here with the main thread. From now on, the PipelineManager's
* executor service should be the only non-daemon thread running (and whatever it kicks off).
* Once/if that thread stops, the application will run all registered shutdown hooks and Wait
* for the PipelineManager to stop running jobs, and then check to see if we should exit
* normally with 0 or abnormally with a non-0 because a job failed.
*/
pipelineManager.awaitCompletion();
}

/**
Expand Down Expand Up @@ -309,9 +296,9 @@ private static HierarchicalNameMapper getHierarchicalNameMapper(List<Tag> common
* @param jobs all {@link PipelineJob} to test
* @return true if any test failed
*/
private static boolean anySmokeTestFailed(List<PipelineJob<?>> jobs) {
private static boolean anySmokeTestFailed(List<PipelineJob> jobs) {
boolean anyTestFailed = false;
for (PipelineJob<?> job : jobs) {
for (PipelineJob job : jobs) {
try {
LOGGER.info("smoke test running: job={}", job.getType());
if (job.isSmokeTestSuccessful()) {
Expand All @@ -338,12 +325,12 @@ private static boolean anySmokeTestFailed(List<PipelineJob<?>> jobs) {
* @param pooledDataSource our {@link javax.sql.DataSource}
* @return list of {@link PipelineJob}s to be registered
*/
private static List<PipelineJob<?>> createAllJobs(
private static List<PipelineJob> createAllJobs(
AppConfiguration appConfig,
MeterRegistry appMeters,
MetricRegistry appMetrics,
HikariDataSource pooledDataSource) {
final var jobs = new ArrayList<PipelineJob<?>>();
final var jobs = new ArrayList<PipelineJob>();

/*
* Create and register the other jobs.
Expand Down Expand Up @@ -406,7 +393,7 @@ private static List<PipelineJob<?>> createAllJobs(
* @param appState the {@link PipelineApplicationState} to use
* @return a {@link CcwRifLoadJob} instance for the application to use
*/
private static PipelineJob<?> createCcwRifLoadJob(
private static PipelineJob createCcwRifLoadJob(
CcwRifLoadOptions loadOptions, PipelineApplicationState appState) {
/*
* Create the services that will be used to handle each stage in the extract, transform, and
Expand All @@ -423,7 +410,9 @@ private static PipelineJob<?> createCcwRifLoadJob(
DataSetMonitorListener dataSetMonitorListener =
new DefaultDataSetMonitorListener(
appState.getMetrics(),
PipelineApplication::handleUncaughtException,
t -> {
throw new RuntimeException(t);
},
rifProcessor,
rifLoader);
CcwRifLoadJob ccwRifLoadJob =
Expand Down Expand Up @@ -471,79 +460,34 @@ private static void registerShutdownHook(
Runtime.getRuntime()
.addShutdownHook(
new Thread(
new Runnable() {
@Override
public void run() {
LOGGER.info("Application is shutting down...");

/*
* Just a reminder: this might take a while! It's going to wait
* for any data sets that are being actively processed to finish
* processing.
*/
pipelineManager.stop();

// Ensure that the final metrics get logged.
Slf4jReporter.forRegistry(metrics).outputTo(LOGGER).build().report();

LOGGER.info("Application has finished shutting down.");

/*
* We have to do this ourselves (rather than use Logback's DelayingShutdownHook)
* to ensure that the logger isn't closed before the above logging.
*/
LoggerContext logbackContext =
(LoggerContext) LoggerFactory.getILoggerFactory();
logbackContext.stop();
() -> {
/*
* Just a reminder: this might take a while! It's going to wait
* for any data sets that are being actively processed to finish
* processing.
*/
LOGGER.info("Application is shutting down...");
pipelineManager.stop();
pipelineManager.awaitCompletion();
LOGGER.info("Job processing stopped.");

if (pipelineManager.getError() != null) {
LOGGER.error(
"Application encountered exception: message={}",
pipelineManager.getError().getMessage());
}
}));
}

/**
* Registers {@link UncaughtExceptionHandler}s for the main thread, and a default one for all
* other threads. These are just here to make sure that things don't die silently, but instead at
* least log any errors that have occurred.
*/
private static void configureUnexpectedExceptionHandlers() {
Thread.currentThread()
.setUncaughtExceptionHandler(
new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
handleUncaughtException(e);
}
});
Thread.setDefaultUncaughtExceptionHandler(
new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
handleUncaughtException(e);
}
});
}

/**
* Call this method to deal with any uncaught exceptions. It'll log the error and then shut things
* down gracefully.
*
* @param throwable the error that occurred
*/
static void handleUncaughtException(Throwable throwable) {
/*
* If an error is caught, log it and then shut everything down.
*/
// Ensure that the final metrics get logged.
Slf4jReporter.forRegistry(metrics).outputTo(LOGGER).build().report();

LOGGER.error("Data set failed with an unhandled error. Application will exit.", throwable);
shutdown();
}
LOGGER.info("Application has finished shutting down.");

/**
* This will trigger the shutdown monitors, block until they complete, and then terminate this
* thread (and all others). Accordingly, we can be doubly sure that the data set processing will
* be halted: 1) this thread is the CcwRifLoadJob's and that thread will block then die, and 2)
* the shutdown monitor will call PipelineManager.stop(). Pack it up: we're going home, folks.
*/
static void shutdown() {
System.exit(EXIT_CODE_JOB_FAILED);
/*
* We have to do this ourselves (rather than use Logback's DelayingShutdownHook)
* to ensure that the logger isn't closed before the above logging.
*/
LoggerContext logbackContext = (LoggerContext) LoggerFactory.getILoggerFactory();
logbackContext.stop();
}));
}
}
Loading

0 comments on commit 372d8f3

Please sign in to comment.