diff --git a/apps/bfd-pipeline/bfd-pipeline-app/pom.xml b/apps/bfd-pipeline/bfd-pipeline-app/pom.xml index 4b057ae903..291a488e7b 100644 --- a/apps/bfd-pipeline/bfd-pipeline-app/pom.xml +++ b/apps/bfd-pipeline/bfd-pipeline-app/pom.xml @@ -49,6 +49,13 @@ test + + + org.mockito + mockito-junit-jupiter + test + + org.projectlombok lombok diff --git a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/DefaultDataSetMonitorListener.java b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/DefaultDataSetMonitorListener.java index 8bf0ac2718..e72d707850 100644 --- a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/DefaultDataSetMonitorListener.java +++ b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/DefaultDataSetMonitorListener.java @@ -12,6 +12,7 @@ import gov.cms.bfd.pipeline.ccw.rif.load.RifLoader; import gov.cms.bfd.pipeline.ccw.rif.load.RifRecordLoadResult; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +63,7 @@ public void dataAvailable(RifFilesEvent rifFilesEvent) { PipelineApplication.class.getSimpleName(), "dataSet", "processed")) .time(); + final var failed = new AtomicBoolean(false); Consumer errorHandler = error -> { /* @@ -74,6 +76,7 @@ public void dataAvailable(RifFilesEvent rifFilesEvent) { * we stop that way for _any_ failure, but we probably want * to be more discriminating than that. */ + failed.set(true); errorOccurred(error); }; @@ -90,6 +93,10 @@ public void dataAvailable(RifFilesEvent rifFilesEvent) { * and processed by the next stage. */ for (RifFileEvent rifFileEvent : rifFilesEvent.getFileEvents()) { + if (failed.get()) { + LOGGER.info("Stopping due to error."); + break; + } Slf4jReporter dataSetFileMetricsReporter = Slf4jReporter.forRegistry(rifFileEvent.getEventMetrics()).outputTo(LOGGER).build(); dataSetFileMetricsReporter.start(2, TimeUnit.MINUTES); diff --git a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/PipelineApplication.java b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/PipelineApplication.java index 740f643e8f..d165b42f77 100644 --- a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/PipelineApplication.java +++ b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/PipelineApplication.java @@ -24,7 +24,6 @@ import gov.cms.bfd.pipeline.rda.grpc.RdaServerJob; import gov.cms.bfd.pipeline.sharedutils.PipelineApplicationState; import gov.cms.bfd.pipeline.sharedutils.PipelineJob; -import gov.cms.bfd.pipeline.sharedutils.jobs.store.PipelineJobRecordStore; import gov.cms.bfd.sharedutils.config.AppConfigurationException; import gov.cms.bfd.sharedutils.config.ConfigException; import gov.cms.bfd.sharedutils.config.ConfigLoader; @@ -41,7 +40,6 @@ import io.micrometer.core.instrument.util.HierarchicalNameMapper; import io.micrometer.jmx.JmxConfig; import io.micrometer.jmx.JmxMeterRegistry; -import java.lang.Thread.UncaughtExceptionHandler; import java.sql.DatabaseMetaData; import java.time.Clock; import java.util.ArrayList; @@ -96,7 +94,6 @@ public final class PipelineApplication { */ public static void main(String[] args) throws Exception { LOGGER.info("Application starting up!"); - configureUnexpectedExceptionHandlers(); AppConfiguration appConfig = null; ConfigLoader configLoader = null; @@ -222,23 +219,13 @@ public static void main(String[] args) throws Exception { System.exit(EXIT_CODE_SMOKE_TEST_FAILURE); } - /* - * Create the PipelineManager and register all jobs. - */ - PipelineJobRecordStore jobRecordStore = new PipelineJobRecordStore(appMetrics); - PipelineManager pipelineManager = - new PipelineManager(appMetrics, jobRecordStore, s3TaskManager); + final var pipelineManager = new PipelineManager(Thread::sleep, Clock.systemUTC(), jobs); registerShutdownHook(appMetrics, pipelineManager); - jobs.forEach(pipelineManager::registerJob); + + pipelineManager.start(); LOGGER.info("Job processing started."); - /* - * At this point, we're done here with the main thread. From now on, the PipelineManager's - * executor service should be the only non-daemon thread running (and whatever it kicks off). - * Once/if that thread stops, the application will run all registered shutdown hooks and Wait - * for the PipelineManager to stop running jobs, and then check to see if we should exit - * normally with 0 or abnormally with a non-0 because a job failed. - */ + pipelineManager.awaitCompletion(); } /** @@ -309,9 +296,9 @@ private static HierarchicalNameMapper getHierarchicalNameMapper(List common * @param jobs all {@link PipelineJob} to test * @return true if any test failed */ - private static boolean anySmokeTestFailed(List> jobs) { + private static boolean anySmokeTestFailed(List jobs) { boolean anyTestFailed = false; - for (PipelineJob job : jobs) { + for (PipelineJob job : jobs) { try { LOGGER.info("smoke test running: job={}", job.getType()); if (job.isSmokeTestSuccessful()) { @@ -338,12 +325,12 @@ private static boolean anySmokeTestFailed(List> jobs) { * @param pooledDataSource our {@link javax.sql.DataSource} * @return list of {@link PipelineJob}s to be registered */ - private static List> createAllJobs( + private static List createAllJobs( AppConfiguration appConfig, MeterRegistry appMeters, MetricRegistry appMetrics, HikariDataSource pooledDataSource) { - final var jobs = new ArrayList>(); + final var jobs = new ArrayList(); /* * Create and register the other jobs. @@ -406,7 +393,7 @@ private static List> createAllJobs( * @param appState the {@link PipelineApplicationState} to use * @return a {@link CcwRifLoadJob} instance for the application to use */ - private static PipelineJob createCcwRifLoadJob( + private static PipelineJob createCcwRifLoadJob( CcwRifLoadOptions loadOptions, PipelineApplicationState appState) { /* * Create the services that will be used to handle each stage in the extract, transform, and @@ -423,7 +410,9 @@ private static PipelineJob createCcwRifLoadJob( DataSetMonitorListener dataSetMonitorListener = new DefaultDataSetMonitorListener( appState.getMetrics(), - PipelineApplication::handleUncaughtException, + t -> { + throw new RuntimeException(t); + }, rifProcessor, rifLoader); CcwRifLoadJob ccwRifLoadJob = @@ -471,79 +460,34 @@ private static void registerShutdownHook( Runtime.getRuntime() .addShutdownHook( new Thread( - new Runnable() { - @Override - public void run() { - LOGGER.info("Application is shutting down..."); - - /* - * Just a reminder: this might take a while! It's going to wait - * for any data sets that are being actively processed to finish - * processing. - */ - pipelineManager.stop(); - - // Ensure that the final metrics get logged. - Slf4jReporter.forRegistry(metrics).outputTo(LOGGER).build().report(); - - LOGGER.info("Application has finished shutting down."); - - /* - * We have to do this ourselves (rather than use Logback's DelayingShutdownHook) - * to ensure that the logger isn't closed before the above logging. - */ - LoggerContext logbackContext = - (LoggerContext) LoggerFactory.getILoggerFactory(); - logbackContext.stop(); + () -> { + /* + * Just a reminder: this might take a while! It's going to wait + * for any data sets that are being actively processed to finish + * processing. + */ + LOGGER.info("Application is shutting down..."); + pipelineManager.stop(); + pipelineManager.awaitCompletion(); + LOGGER.info("Job processing stopped."); + + if (pipelineManager.getError() != null) { + LOGGER.error( + "Application encountered exception: message={}", + pipelineManager.getError().getMessage()); } - })); - } - - /** - * Registers {@link UncaughtExceptionHandler}s for the main thread, and a default one for all - * other threads. These are just here to make sure that things don't die silently, but instead at - * least log any errors that have occurred. - */ - private static void configureUnexpectedExceptionHandlers() { - Thread.currentThread() - .setUncaughtExceptionHandler( - new UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - handleUncaughtException(e); - } - }); - Thread.setDefaultUncaughtExceptionHandler( - new UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - handleUncaughtException(e); - } - }); - } - /** - * Call this method to deal with any uncaught exceptions. It'll log the error and then shut things - * down gracefully. - * - * @param throwable the error that occurred - */ - static void handleUncaughtException(Throwable throwable) { - /* - * If an error is caught, log it and then shut everything down. - */ + // Ensure that the final metrics get logged. + Slf4jReporter.forRegistry(metrics).outputTo(LOGGER).build().report(); - LOGGER.error("Data set failed with an unhandled error. Application will exit.", throwable); - shutdown(); - } + LOGGER.info("Application has finished shutting down."); - /** - * This will trigger the shutdown monitors, block until they complete, and then terminate this - * thread (and all others). Accordingly, we can be doubly sure that the data set processing will - * be halted: 1) this thread is the CcwRifLoadJob's and that thread will block then die, and 2) - * the shutdown monitor will call PipelineManager.stop(). Pack it up: we're going home, folks. - */ - static void shutdown() { - System.exit(EXIT_CODE_JOB_FAILED); + /* + * We have to do this ourselves (rather than use Logback's DelayingShutdownHook) + * to ensure that the logger isn't closed before the above logging. + */ + LoggerContext logbackContext = (LoggerContext) LoggerFactory.getILoggerFactory(); + logbackContext.stop(); + })); } } diff --git a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/PipelineJobRunner.java b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/PipelineJobRunner.java new file mode 100644 index 0000000000..8230fb4a0f --- /dev/null +++ b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/PipelineJobRunner.java @@ -0,0 +1,229 @@ +package gov.cms.bfd.pipeline.app; + +import gov.cms.bfd.pipeline.sharedutils.PipelineJob; +import gov.cms.bfd.pipeline.sharedutils.PipelineJobOutcome; +import gov.cms.bfd.sharedutils.interfaces.ThrowingConsumer; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import java.util.regex.Pattern; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +/** + * Wrapper for a {@link PipelineJob} that runs the job on a schedule. Implements {@link Runnable} so + * it can be submitted to an {@link java.util.concurrent.ExecutorService}. + */ +@Slf4j +@AllArgsConstructor +public class PipelineJobRunner implements Runnable { + /** Object that tracks the status of all job runs. */ + private final Tracker tracker; + /** The job we run. */ + private final PipelineJob job; + /** Function used to sleep. Parameterized for use by unit tests. */ + private final ThrowingConsumer sleeper; + /** Used to get timestamps. Parameterized for use by unit tests. */ + private final Clock clock; + + /** + * Runs the job according to its schedule. If the job has no schedule simply runs the job once. + * Any uncaught exception thrown by the job terminates the loop. Our status is always updated in + * the {@link Tracker} so it knows when job runs happen as well as when the loop terminates (and + * for what reason). + */ + @Override + public void run() { + try { + final Long repeatMillis = + job.getSchedule() + .map(s -> Duration.of(s.getRepeatDelay(), s.getRepeatDelayUnit())) + .map(Duration::toMillis) + .orElse(0L); + while (tracker.jobsCanRun()) { + runJob(); + if (repeatMillis <= 0 || !tracker.jobsCanRun()) { + break; + } + tracker.sleeping(job); + sleeper.accept(repeatMillis); + } + tracker.stoppingNormally(job); + } catch (InterruptedException ex) { + tracker.stoppingDueToInterrupt(job); + } catch (Exception ex) { + tracker.stoppingDueToException(job, ex); + } finally { + tracker.stopped(job); + } + } + + /** + * Runs the job once and reports its outcome to the {@link Tracker}. + * + * @throws Exception passed through if the job terminates with an exception + */ + private void runJob() throws Exception { + final long id = tracker.beginningRun(job); + final Instant startTime = clock.instant(); + PipelineJobOutcome outcome = null; + Exception exception = null; + try { + outcome = job.call(); + } catch (Exception ex) { + exception = ex; + } + final Instant stopTime = clock.instant(); + // one must be null and the other not null + assert (outcome == null) != (exception == null); + tracker.completedRun( + new JobRunSummary( + id, + job, + startTime, + stopTime, + Optional.ofNullable(outcome), + Optional.ofNullable(exception))); + + // rethrow so that loop can perform local error handling + if (exception != null) { + throw exception; + } + } + + /** Summarizes the results of a job run. */ + @Data + public static class JobRunSummary { + /** Used to identify log lines indicating job success. */ + private static final Pattern SUCCESS_REGEX = + Pattern.compile( + String.format("%s \\[id=\\d+,.*outcome=([^,]+)", JobRunSummary.class.getSimpleName())); + /** Used to identify log lines indicating job failure. */ + private static final Pattern FAILURE_REGEX = + Pattern.compile( + String.format( + "%s \\[id=\\d+,.*failure=([^\\]]+)", JobRunSummary.class.getSimpleName())); + + /** Id for this job run. Assigned by {@link Tracker#beginningRun}. */ + private final long id; + /** The job. */ + private final PipelineJob job; + /** When the run started. */ + private final Instant startTime; + /** When the run stopped. */ + private final Instant stopTime; + /** The outcome if run was successful. */ + private final Optional outcome; + /** The exception if run failed. */ + private final Optional exception; + + /** + * Used by integration tests to detect successful job run log lines. + * + * @param logString line from log file to check + * @return true if the line indicates a job was successful + */ + public static boolean isSuccessString(String logString) { + var m = SUCCESS_REGEX.matcher(logString); + return m.find() && !m.group(1).equals("Optional.empty"); + } + + /** + * Used by integration tests to detect failed job run log lines. + * + * @param logString line from log file to check + * @return true if the line indicates a job failed + */ + public static boolean isFailureString(String logString) { + var m = FAILURE_REGEX.matcher(logString); + return m.find() && !m.group(1).equals("Optional.empty"); + } + + @Override + public String toString() { + return new StringBuilder() + .append(getClass().getSimpleName()) + .append(" [id=") + .append(id) + .append(", jobType=") + .append(job.getType()) + .append(", startedTime=") + .append(startTime) + .append(", completedTime=") + .append(stopTime) + .append(", outcome=") + .append(outcome) + .append(", failure=") + .append(exception) + .append("]") + .toString(); + } + } + + /** Interface for objects that manage {@link PipelineJobRunner} instances. */ + public interface Tracker { + /** + * Callable to determine if it is ok to run the job again. Used to allow jobs to shutdown + * cleanly when any job fails or the pipeline app is shutting down. + * + * @return true if it's ok to run again + */ + boolean jobsCanRun(); + + /** + * Notifies the tracker that a new job run is starting. Return value is a unique id assigned to + * this job run. + * + * @param job the job that is starting + * @return unique id for this run + */ + long beginningRun(PipelineJob job); + + /** + * Notifies the tracker that a job has completed and the outcome of the run. + * + * @param summary summaries the outcome of the run + */ + void completedRun(JobRunSummary summary); + + /** + * Notifies the tracker that a job is sleeping between runs. + * + * @param job the job that is sleeping + */ + void sleeping(PipelineJob job); + + /** + * Notifies the tracker that a job is stopping because it caught an {@link InterruptedException} + * while waiting to run again or while the job was running. + * + * @param job the job that is stopping + */ + void stoppingDueToInterrupt(PipelineJob job); + + /** + * Notifies the tracker that a job is stopping because it threw an exception during a run. + * + * @param job the job that is stopping + * @param error the exception that was thrown + */ + void stoppingDueToException(PipelineJob job, Exception error); + + /** + * Notifies the tracker that a job is stopping because it has completed a run and doesn't have a + * schedule for running multiple times or was told not to run again by {@link #jobsCanRun}. + * + * @param job the job that is stopping + */ + void stoppingNormally(PipelineJob job); + + /** + * Notifies the tracker that a job has stopped. + * + * @param job the job that has stopped + */ + void stopped(PipelineJob job); + } +} diff --git a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/PipelineManager.java b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/PipelineManager.java index b0d72b3e85..b1caadbd3e 100644 --- a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/PipelineManager.java +++ b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/PipelineManager.java @@ -1,609 +1,292 @@ package gov.cms.bfd.pipeline.app; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningScheduledExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import gov.cms.bfd.pipeline.app.scheduler.SchedulerJob; -import gov.cms.bfd.pipeline.app.volunteer.VolunteerJob; -import gov.cms.bfd.pipeline.ccw.rif.extract.s3.task.S3TaskManager; -import gov.cms.bfd.pipeline.sharedutils.NullPipelineJobArguments; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import gov.cms.bfd.pipeline.sharedutils.PipelineJob; -import gov.cms.bfd.pipeline.sharedutils.PipelineJobArguments; -import gov.cms.bfd.pipeline.sharedutils.PipelineJobOutcome; -import gov.cms.bfd.pipeline.sharedutils.PipelineJobRecordId; -import gov.cms.bfd.pipeline.sharedutils.PipelineJobSchedule; -import gov.cms.bfd.pipeline.sharedutils.PipelineJobType; -import gov.cms.bfd.pipeline.sharedutils.jobs.store.PipelineJobFailure; -import gov.cms.bfd.pipeline.sharedutils.jobs.store.PipelineJobRecord; -import gov.cms.bfd.pipeline.sharedutils.jobs.store.PipelineJobRecordStore; -import java.time.Duration; -import java.time.Instant; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import gov.cms.bfd.sharedutils.interfaces.ThrowingConsumer; +import java.time.Clock; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Orchestrates and manages the execution of {@link PipelineJob}s. */ -public final class PipelineManager implements AutoCloseable { +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; +import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.utils.ThreadFactoryBuilder; + +/** Class responsible for executing pipeline jobs in a thread pool. */ +@Slf4j +@ThreadSafe +public class PipelineManager implements PipelineJobRunner.Tracker { /** - * The number of jobs that can be run at one time. Because the {@link VolunteerJob} and {@link - * SchedulerJob} will always be running, this number must be greater than or equal to 3, in order - * for any actual jobs to get run. Value of 6 is sufficient for VolunteerJob, SchedulerJob, - * CcwRifLoadJob, RdaFissClaimLoadJob, RdaMcsClaimLoadJob, and RdaServerJob. - * - * @see #jobExecutor + * We track job results so that tests can check them. This sets a limit on the number we keep + * around. Needs to be large enough for tests but not so large that it uses a lot of heap in + * production. */ - public static final int JOB_EXECUTOR_THREADS = 6; - - private static final Logger LOGGER = LoggerFactory.getLogger(PipelineManager.class); - - /** The {@link MetricRegistry} used to track the application's performance and events. */ - private final MetricRegistry appMetrics; - + private static final int MAX_COMPLETED_JOBS = 100; + /** Function used to sleep. Parameterized for use by unit tests. */ + private final ThrowingConsumer sleeper; + /** Used to get timestamps. Parameterized for use by unit tests. */ + private final Clock clock; + /** All of the jobs we manage. */ + private final ImmutableList jobs; + /** Thread pool with one thread per job. */ + private final ExecutorService threadPool; + /** Used to wait for all jobs to terminate. */ + private final CountDownLatch latch; + /** Used to assign job ids in a thread safe manner. */ + private final AtomicLong idGenerator = new AtomicLong(1); + /** Recent job results for use by tests. */ + private final LinkedList completedJobs; /** - * Used to run the jobs. Provided by Guava and documented here: - * https://github.com/google/guava/wiki/ListenableFutureExplained. If you're touching this class, - * you need to read that page and the associated JavaDoc. Concurrent APIs like - * this are chock-full-o' footguns and while Guava's APIs here are well-designed, they're still no - * exception to that rule. + * True if all jobs are interruptable. When false we can't interrupt the pool for faster + * shutdowns. */ - private final ListeningScheduledExecutorService jobExecutor; - + private final boolean interruptable; /** - * This is a handle of the internal thread pool used by our job executor. The only reason this is - * stored here is to report the number of remaining threads remaining when we encounter an error - * and are waiting on all jobs/threads to close out before the pipeline shuts down completely, as - * the {@link ListeningScheduledExecutorService} doesn't have a good way to get its number of - * running threads or an instance of this underlying thread pool. + * One {@link Future} per job. Can be used to get job result directly. Access limited to + * synchronized methods. */ - private static ScheduledThreadPoolExecutor jobExecutorThreadPoolHandle; - + @GuardedBy("this") + private ImmutableList> runningJobFutures; + /** True while we're running. False when we're not. Access limited to synchronized methods. */ + @GuardedBy("this") + private boolean isRunning; /** - * This {@link Map} stores handles to all enqueued job {@link Future}s. We need to keep track of - * these for two use reasons: - * - *
    - *
  1. So that we can ensure that we don't over-commit and enqueue more work than we have - * executors available for. - *
  2. So that we can ensure that jobs can be cancelled when {@link #stop()} is called. - *
- * - *

Note: this isn't really thread-safe; it's subject to race conditions, unless used very - * carefully. Accordingly, usage of this field SHALL conform to the following - * rules: - * - *

    - *
  • Jobs SHALL only be added to it on the {@link VolunteerJob}'s thread, ensuring that the - * job is submitted to {@link #jobExecutor} and added to {@link - * #jobsEnqueuedHandles} in a block that is {@code synchronized} on {@link - * #jobsEnqueuedHandles}. - *
  • {@link ConcurrentMap#size()} SHALL only be read on the {@link VolunteerJob}'s thread - * (aside from reads for metrics, which SHALL NOT be used for any application logic). - *
  • Jobs SHALL only be removed by {@link PipelineJobWrapper} and {@link PipelineJobCallback}, - * ensuring that the job is removed from to {@link #jobsEnqueuedHandles} in a block that is - * {@code synchronized} on {@link #jobsEnqueuedHandles}. - *
  • All other reads of this collection (e.g. {@link #stop()}) SHALL {@code synchronize} on - * it. - *
- * - *

Following these rules will result in {@link ConcurrentMap#size()} being eventually - * consistent but nevertheless ALWAYS >= the number of jobs that are actually enqueued, from the - * view of the {@link VolunteerJob}'s thread. This aligns with what the {@link VolunteerJob} needs - * the value for, as we're using it to avoid over-subscribing on work. Under-subscribing is fine, - * in the context. + * Any exception generated by a job will halt processing and is stored here. Access limited to + * synchronized methods. */ - private final ConcurrentMap> jobsEnqueuedHandles; + @GuardedBy("this") + private Exception error; /** - * Used to run the job monitoring tasks, e.g. {@link ListenableFuture} callbacks. Those tasks are - * run in a separate thread pool to ensure that job execution doesn't starve them out. - */ - private final ExecutorService jobMonitorsExecutor; - - /** - * The {@link PipelineJobRecordStore} service that tracks job submissions, executions, and - * outcomes. + * Initializes an instance. Creates the thread pool but does not schedule any jobs yet. + * + * @param sleeper used by threads to wait for a set period of time + * @param clock used to determine current time + * @param jobs the jobs to execute */ - private final PipelineJobRecordStore jobRecordStore; + public PipelineManager( + ThrowingConsumer sleeper, Clock clock, List jobs) { + this.sleeper = sleeper; + this.clock = clock; + this.jobs = ImmutableList.copyOf(jobs); + threadPool = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .threadNamePrefix(getClass().getSimpleName()) + .daemonThreads(false) + .build()); + latch = new CountDownLatch(jobs.size()); + completedJobs = new LinkedList<>(); + interruptable = jobs.stream().allMatch(PipelineJob::isInterruptible); + } /** - * Stores the {@link PipelineJob}s that have been registered via {@link - * #registerJob(PipelineJob)}. + * Can be called only once. Used by main thread. Starts all jobs running by adding them to the + * thread pool. Since our pool automatically assigns one thread per job all of them will begin + * running immediately. */ - private final Map, PipelineJob> jobRegistry; + public synchronized void start() { + if (isRunning || runningJobFutures != null) { + throw new IllegalStateException("start has already been called"); + } + var futures = ImmutableList.>builder(); + for (PipelineJob job : jobs) { + var jobExecutor = new PipelineJobRunner(this, job, sleeper, clock); + var future = threadPool.submit(jobExecutor); + futures.add(future); + } + runningJobFutures = futures.build(); + isRunning = true; + } /** - * Keep a reference to the S3 task manager to properly clean it up if we need to shut down the - * application unexpectedly. + * Can be called multiple times. Triggers a shutdown of the thread pool and interrupts running + * threads if {@link #interruptable} is true. */ - private final S3TaskManager s3TaskManagerHandle; + public synchronized void stop() { + if (isRunning) { + if (interruptable) { + var unscheduled = threadPool.shutdownNow(); + // Just a sanity check. We only schedule one job per pipeline job and have dedicated + // threads so there should never be any unscheduled jobs waiting in a queue. + assert unscheduled.size() == 0; + } else { + log.info("stopping but must wait for uninterruptible jobs to complete on their own"); + threadPool.shutdown(); + } + isRunning = false; + } + } /** - * Constructs a new {@link PipelineManager} instance. Note that this intended for use as a - * singleton service in the application: only one instance running at a time. + * Callable by main thread to wait for all jobs to finish running. This is potentially an infinite + * loop but that's by design. We call this while the program is running to wait for it to complete + * and a common scenario is for the pipeline to run forever. * - * @param appMetrics the {@link MetricRegistry} for the overall application - * @param jobRecordStore the {@link PipelineJobRecordStore} for the overall application - * @param s3TaskManagerHandle a handle to the S3 task manager for reporting on cleanup operations; - * may be {@code null}, and if so will not report the number of remaining threads when waiting - * for pipeline shutdown + *

External causes like jobs completing on their own or calls to {@link #stop} from the + * shutdown handler will cause the pool to shut down gracefully while we wait and thus allow us to + * return. */ - public PipelineManager( - MetricRegistry appMetrics, - PipelineJobRecordStore jobRecordStore, - S3TaskManager s3TaskManagerHandle) { - this.appMetrics = appMetrics; - this.jobRecordStore = jobRecordStore; - this.jobExecutor = createJobExecutor(); - this.jobsEnqueuedHandles = new ConcurrentHashMap<>(); - this.jobMonitorsExecutor = Executors.newCachedThreadPool(); - this.jobRegistry = new HashMap<>(); - this.s3TaskManagerHandle = s3TaskManagerHandle; + public void awaitCompletion() { + // Calls to the latch are automatically synchronized on the latch. + while (latch.getCount() > 0) { + try { + latch.await(); + } catch (InterruptedException ex) { + log.debug("caught interrupt - still waiting for latch to reach zero"); + } + } - /* - * Bootstrap the SchedulerJob and VolunteerJob, which are responsible for ensuring that all of - * the other jobs get executed, as and when needed. Note that it will permanently tie up two of - * the job executors, as they're designed to run forever. - */ - VolunteerJob volunteerJob = new VolunteerJob(appMetrics, this, jobRecordStore); - registerJob(volunteerJob); - PipelineJobRecord volunteerJobRecord = - jobRecordStore.submitPendingJob(VolunteerJob.JOB_TYPE, null); - enqueueJob(volunteerJobRecord); - SchedulerJob schedulerJob = new SchedulerJob(appMetrics, this, jobRecordStore); - registerJob(schedulerJob); - jobRecordStore.submitPendingJob(SchedulerJob.JOB_TYPE, null); - } + // just in case we somehow aren't stopped + stop(); - /** - * Creates an executor to listen for and run jobs. - * - * @return the {@link ListeningScheduledExecutorService} to use for {@link #jobExecutor} - */ - private static ListeningScheduledExecutorService createJobExecutor() { - ScheduledThreadPoolExecutor jobExecutorInner = - new ScheduledThreadPoolExecutor(JOB_EXECUTOR_THREADS); - jobExecutorThreadPoolHandle = jobExecutorInner; - jobExecutorInner.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - return MoreExecutors.listeningDecorator(jobExecutorInner); + log.info("waiting for pool to terminate"); + boolean terminated = false; + while (!terminated) { + try { + // Should return immediately since latch indicated that all jobs are done. + // Calls to the pool are synchronized on the pool itself. + terminated = threadPool.awaitTermination(30, TimeUnit.SECONDS); + } catch (InterruptedException ex) { + log.debug("caught interrupt - still waiting for thread pool to terminate"); + } + } } /** - * Registers the specified {@link PipelineJob}, scheduling it (if it has a {@link - * PipelineJob#getSchedule()}) and also making it available for triggering elsewhere, via {@link - * PipelineManager#enqueueJob(PipelineJobRecord)}. + * Callable by main thread or test to get any exception thrown by a job. * - * @param job the {@link PipelineJob} to register + * @return first exception thrown by a job */ - public void registerJob(PipelineJob job) { - jobRegistry.put(job.getType(), job); + @Nullable + public synchronized Exception getError() { + return error; } /** - * Warning: See the note on {@link #jobsEnqueuedHandles} for the thread-safety - * rules for usage of this property. It is not safe to use outside of those - * conditions. + * OK to run unless {@link #stop} has been called or an exception has been caught. * - *

This property, and its invariants, allow us to avoid over-committing and accepting more work - * than we are guaranteed to have {@link #jobExecutor} threads/slots available for. This isn't - * strictly necessary if jobs are only running on a single node, but is nevertheless a nice - * property, and becomes very important if jobs are being run across multiple nodes (as otherwise - * we'd have unnecessarily stalled work). + *

{@inheritDoc} * - * @return the number of available executor slots (technically, an eventually consistent - * approximation of that value, which is guaranteed to be less-than-or-equal-to the true - * value, provided that the thread-safety rules described in {@link #jobsEnqueuedHandles} are - * adhered to) + * @return true if OK for job to run */ - public int getOpenExecutorSlots() { - return JOB_EXECUTOR_THREADS - jobsEnqueuedHandles.size(); + @Override + public synchronized boolean jobsCanRun() { + return isRunning && error == null; } /** - * Gets the set of jobs that have registered schedules. + * Just logs the event and returns a unique id. * - * @return the {@link Set} of jobs registered via {@link #registerJob(PipelineJob)} that have - * {@link PipelineJob#getSchedule()} values - */ - @SuppressWarnings({"unchecked", "rawtypes"}) - public Set> getScheduledJobs() { - Set scheduledJobs = - jobRegistry.values().stream() - .filter(j -> j.getSchedule().isPresent()) - .collect(Collectors.toSet()); - return scheduledJobs; - } - - /** - * Enqueues an execution of the specified {@link PipelineJob}, with the specified parameters. + *

{@inheritDoc} * - * @param the type parameter - * @param jobRecord the {@link PipelineJobRecord} of the job to run - * @return true if the specified job was enqueued, or false if it could - * not be (e.g. because {@link #stop()} has been called) + * @param job the job that is starting + * @return job id */ - public boolean enqueueJob(PipelineJobRecord jobRecord) { - Timer.Context timerEnqueue = - appMetrics.timer(MetricRegistry.name(getClass().getSimpleName(), "enqueue")).time(); - - // First, find the specified job. - @SuppressWarnings("unchecked") - PipelineJob job = (PipelineJob) jobRegistry.get(jobRecord.getJobType()); - if (job == null) - throw new IllegalArgumentException( - String.format("Unknown or unregistered job type '%s'.", jobRecord.getJobType())); - - // Submit the job to be run! - PipelineJobWrapper jobWrapper; - ListenableFuture jobFuture; - synchronized (jobsEnqueuedHandles) { - /* - * Design Note: Java's Executor framework is great at running things and returning results, - * but provides no built-in facilities at all for monitoring and tracking the things that have - * been run. We need that monitoring here, so we use PipelineJobWrapper to catch these events: - * enqueue, start, complete-successfully, complete-with-exception. (See - * Futures.addCallback(...) a little bit further below for a discussion of some additional - * monitoring we need and add in.) - */ - jobWrapper = new PipelineJobWrapper<>(job, jobRecord); - - // Ensure code below doesn't accidentally use the unwrapped job. - job = jobWrapper; - - try { - jobFuture = jobExecutor.submit(jobWrapper); - } catch (RejectedExecutionException e) { - // Indicates that the executor has been shutdown. - return false; - } - jobsEnqueuedHandles.put(jobRecord.getId(), new PipelineJobHandle<>(jobWrapper, jobFuture)); - } - - /* - * Design Note: We can't catch job-cancellation-before-start events in PipelineJobWrapper, - * because its call(...) method won't ever be called in those cases. Guava's ListenableFuture - * framework doesn't monitor task submission or start, so we can't use it to catch those events. - * Accordingly, we use a combo: both PipelineJobWrapper and Guava's ListenableFuture, to catch - * all of the events that we're interested in. - */ - Futures.addCallback( - jobFuture, new PipelineJobCallback(jobWrapper.getJobRecord()), this.jobMonitorsExecutor); - - timerEnqueue.stop(); - return true; + @Override + public long beginningRun(PipelineJob job) { + final var runId = idGenerator.getAndIncrement(); + log.info("Job run beginning: type={} id={}", job.getType(), runId); + return runId; } /** - * Handle job failure by de-queueing and recording the failure. + * Records the {@link gov.cms.bfd.pipeline.app.PipelineJobRunner.JobRunSummary} in a fixed size + * list. + * + *

{@inheritDoc} * - * @param jobRecordId the {@link PipelineJobRecord} of the job - * @param exception The exception from the job failure + * @param summary summaries the outcome of the run */ - private void handleJobFailure(PipelineJobRecordId jobRecordId, Exception exception) { - synchronized (jobsEnqueuedHandles) { - if (jobsEnqueuedHandles.containsKey(jobRecordId)) { - jobRecordStore.recordJobFailure(jobRecordId, new PipelineJobFailure(exception)); - jobsEnqueuedHandles.remove(jobRecordId); - } - LOGGER.error("Job failure in Pipeline: " + exception.getMessage(), exception); + @Override + public synchronized void completedRun(PipelineJobRunner.JobRunSummary summary) { + log.info("job run complete: {}", summary); + if (completedJobs.size() > MAX_COMPLETED_JOBS) { + completedJobs.removeFirst(); } + completedJobs.addLast(summary); } /** - * Handle job cancellation by de-queueing and recording cancellation. + * Just logs the event. * - * @param jobRecordId the {@link PipelineJobRecord} of the job + *

{@inheritDoc} + * + * @param job the job that is sleeping */ - private void handleJobCancellation(PipelineJobRecordId jobRecordId) { - synchronized (jobsEnqueuedHandles) { - if (jobsEnqueuedHandles.containsKey(jobRecordId)) { - jobRecordStore.recordJobCancellation(jobRecordId); - jobsEnqueuedHandles.remove(jobRecordId); - } - } + @Override + public void sleeping(PipelineJob job) { + log.debug("Job sleeping: type={}", job.getType()); } /** - * Handle normal job completion by de-queueing and recording completion. + * Just logs the event. + * + *

{@inheritDoc} * - * @param jobRecordId the {@link PipelineJobRecord} of the job - * @param jobOutcome the outcome of the job to record + * @param job the job that is stopping */ - private void handleJobCompletion(PipelineJobRecordId jobRecordId, PipelineJobOutcome jobOutcome) { - synchronized (jobsEnqueuedHandles) { - if (jobsEnqueuedHandles.containsKey(jobRecordId)) { - jobRecordStore.recordJobCompletion(jobRecordId, jobOutcome); - jobsEnqueuedHandles.remove(jobRecordId); - } - } + @Override + public void stoppingDueToInterrupt(PipelineJob job) { + log.info("Job interrupted: type={}", job.getType()); } /** - * This will eventually end all jobs and shut down this {@link PipelineManager}. Note: not all - * jobs support being stopped while in progress, so this method may block for quite a while. + * Saves the exception for reporting later. This will also prevent other jobs from running so that + * the pipeline can shut down gracefully without a call to {@link System#exit}. + * + *

{@inheritDoc} + * + * @param job the job that is stopping + * @param exception the exception that was thrown */ - public void stop() { - // If something has already shut us down, we're done. - if (jobExecutor.isShutdown()) { - return; - } - - Timer.Context timerStop = - appMetrics.timer(MetricRegistry.name(getClass().getSimpleName(), "stop")).time(); - LOGGER.info("Stopping PipelineManager..."); - - /* - * Tell the job executor to shut down, which will prevent it from accepting new jobs and from - * running any jobs that haven't already started. If all jobs are interruptible, we'll shut it - * down _harder_, such that in-progress job threads get interrupted (ala Thread.interrupt()). - */ - boolean unsafeToInterrupt = jobRegistry.values().stream().anyMatch(j -> !j.isInterruptible()); - if (unsafeToInterrupt) { - jobExecutor.shutdown(); - LOGGER.info("Shut down job executor, without cancelling existing jobs."); + @Override + public synchronized void stoppingDueToException(PipelineJob job, Exception exception) { + log.error("Job execution failed: type={} exception={}", job.getType(), exception.getMessage()); + if (this.error == null) { + this.error = exception; } else { - jobExecutor.shutdownNow(); - LOGGER.info("Shut down job executor, cancelling existing jobs."); - } - - LOGGER.info("Attempting to shut down interruptable jobs..."); - - /* - * Try to stop all jobs that are either not running yet or are interruptible. Note: VolunteerJob - * might still be trying to submit jobs over on its thread, so we synchronize to keep things - * consistent and ensure we don't miss any jobs. - */ - synchronized (jobsEnqueuedHandles) { - jobsEnqueuedHandles.values().parallelStream() - .forEach( - j -> { - LOGGER.info(" Attempting to cancel " + j.job.getType()); - /* - * Note: There's a race condition here, where the job may have completed just before - * we try to cancel it, but that's okay because Future.cancel(...) is basically a - * no-op for jobs that have already completed. - */ - j.cancelIfInterruptible(); - }); - } - - LOGGER.info("Cancelled all interruptable jobs."); - - // Clean up any pending S3 operations and shut down the S3 manager (will be null if not a CCW - // pipeline) - if (s3TaskManagerHandle != null) { - s3TaskManagerHandle.shutdownSafely(); + this.error.addSuppressed(exception); } - - /* - * Wait for everything to halt. - */ - boolean allStopped = jobExecutor.isTerminated(); - Optional lastWaitMessage = Optional.empty(); - while (!allStopped) { - if (lastWaitMessage.isEmpty() - || Duration.between(lastWaitMessage.get(), Instant.now()).toMinutes() >= 1) { - LOGGER.info( - "Waiting for jobs to stop, which may take A WHILE. " - + "(Message will repeat every minute, until complete.)"); - - // Debug jobs still running; can help understand what the app is doing - Collection> runningJobs = jobRegistry.values(); - if (runningJobs.size() > 0) { - LOGGER.debug(" Jobs running: "); - for (PipelineJob job : runningJobs) { - LOGGER.debug(" {}", job.getType()); - } - } - if (jobExecutorThreadPoolHandle != null) { - int numLeft = jobExecutorThreadPoolHandle.getActiveCount(); - LOGGER.info(" Waiting on {} threads to resolve in the job executor...", numLeft); - } - lastWaitMessage = Optional.of(Instant.now()); - } - try { - allStopped = jobExecutor.awaitTermination(10, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - /* - * Need to ignore interrupts here so that we can shut down safely. - */ - LOGGER.warn( - String.format("%s ignoring interrupt during shutdown.", this.getClass().getName())); - } - } - LOGGER.info("Stopped PipelineManager."); - timerStop.stop(); - } - - /** {@inheritDoc} */ - @Override - public void close() throws Exception { - stop(); } /** - * A handle for a {@link PipelineJob} execution, which is used to allow the application to cancel - * job executions. + * Just logs the event. + * + *

{@inheritDoc} * - * @param the {@link PipelineJobArguments} type associated with this {@link PipelineJob} - * implementation (see {@link NullPipelineJobArguments} for those {@link PipelineJob} - * implementations which do not need arguments) + * @param job the job that is stopping */ - private static final class PipelineJobHandle { - /** The {@link PipelineJob} that the paired {@link Future} is for. */ - private final PipelineJob job; - /** The {@link Future} representing an execution of the paired {@link PipelineJob}. */ - private final Future future; - - /** - * Constructs a new {@link PipelineJobHandle} instance. - * - * @param job the {@link PipelineJob} that the paired {@link Future} is for - * @param future the {@link Future} representing an execution of the paired {@link PipelineJob} - */ - public PipelineJobHandle(PipelineJob job, Future future) { - this.job = job; - this.future = future; - } - - /** - * Attempts to cancel the job execution by calling {@link Future#cancel(boolean)}, respecting - * the value of {@link PipelineJob#isInterruptible()}. - */ - public void cancelIfInterruptible() { - LOGGER.trace("cancelIfPendingOrInterruptible() called: job.getType()='{}'", job.getType()); - future.cancel(job.isInterruptible()); - } + @Override + public void stoppingNormally(PipelineJob job) { + log.debug("Job stopping: " + job.getType()); } /** - * This {@link PipelineJob} implementation wraps a delegate {@link PipelineJob}, providing data to - * {@link PipelineJobRecordStore} about that job's execution and status. + * Logs the event and counts down the latch to reflect that job has stopped. * - * @param the {@link PipelineJobArguments} type associated with this {@link PipelineJob} - * implementation (see {@link NullPipelineJobArguments} for those {@link PipelineJob} - * implementations which do not need arguments) + * @param job the job that has stopped */ - private final class PipelineJobWrapper implements PipelineJob { - /** The {@link PipelineJob} to wrap and monitor. */ - private final PipelineJob wrappedJob; - /** The {@link PipelineJobRecord} for the job to wrap and monitor. */ - private final PipelineJobRecord jobRecord; - - /** - * Constructs a new {@link PipelineJobWrapper} for the specified {@link PipelineJob}. - * - * @param wrappedJob the {@link PipelineJob} to wrap and monitor - * @param jobRecord the {@link PipelineJobRecord} for the job to wrap and monitor - */ - public PipelineJobWrapper(PipelineJob wrappedJob, PipelineJobRecord jobRecord) { - this.wrappedJob = wrappedJob; - this.jobRecord = jobRecord; - jobRecordStore.recordJobEnqueue(jobRecord.getId()); - } - - /** - * Gets the {@link #jobRecord}. - * - * @return the {@link PipelineJobRecord} for this {@link PipelineJobWrapper} - */ - public PipelineJobRecord getJobRecord() { - return jobRecord; - } - - /** {@inheritDoc} */ - @Override - public PipelineJobType getType() { - return wrappedJob.getType(); - } - - /** {@inheritDoc} */ - @Override - public Optional getSchedule() { - return wrappedJob.getSchedule(); - } - - /** {@inheritDoc} */ - @Override - public boolean isInterruptible() { - return wrappedJob.isInterruptible(); - } - - /** {@inheritDoc} */ - @Override - public PipelineJobOutcome call() throws Exception { - jobRecordStore.recordJobStart(jobRecord.getId()); - - try { - PipelineJobOutcome jobOutcome = wrappedJob.call(); - handleJobCompletion(jobRecord.getId(), jobOutcome); - return jobOutcome; - } catch (InterruptedException e) { - /* - * This indicates that someone has successfully interrupted the job, which should only have - * happened when we're trying to shut down. Whether or not PipelineJob.isInterruptible() for - * this job, it's now been stopped, so we should record the cancellation. - */ - handleJobCancellation(jobRecord.getId()); - - // Restore the interrupt so things can get back to shutting down. - Thread.currentThread().interrupt(); - LOGGER.error( - "PipeLineJobOutcome interrupt failed with the the following: " + e.getMessage(), e); - throw new InterruptedException("Re-firing job interrupt."); - } catch (Exception e) { - handleJobFailure(jobRecord.getId(), e); - - // Wrap and re-throw the failure. - throw new Exception("Re-throwing job failure.", e); - } - } + @Override + public void stopped(PipelineJob job) { + log.info("Job stopped: " + job.getType()); + latch.countDown(); } /** - * A {@link FutureCallback} for Guava {@link ListenableFuture}s for {@link PipelineJobWrapper} - * tasks, providing data to {@link PipelineJobRecordStore} about a job's execution and status. + * Callable by tests to get our job summaries. + * + * @return list of job summaries */ - private final class PipelineJobCallback - implements FutureCallback { - /** The {@link PipelineJobRecordId} that this {@link PipelineJobCallback} is for. */ - private final PipelineJobRecord jobRecord; - - /** - * Constructs a new {@link PipelineJobWrapper} for the specified {@link PipelineJob}. - * - * @param jobRecord the {@link PipelineJobRecordId} that this {@link PipelineJobCallback} is for - */ - public PipelineJobCallback(PipelineJobRecord jobRecord) { - this.jobRecord = jobRecord; - } - - /** {@inheritDoc} */ - @Override - public void onSuccess(PipelineJobOutcome result) { - // Nothing to do here. - } - - /** {@inheritDoc} */ - @Override - public void onFailure(Throwable jobThrowable) { - if (jobThrowable instanceof CancellationException) { - /* - * This is the whole reason we have this extra listener in the first place: it's the only - * way we have to catch cancel-before-start events (the PipelineJobWrapper can't do it, - * since it won't get called in the first place). - */ - handleJobCancellation(jobRecord.getId()); - LOGGER.info("Job cancelled: " + jobRecord.getJobType()); - } else if (jobThrowable instanceof InterruptedException) { - // If our job has been interrupted, we are already shutting down, so just ignore it. - LOGGER.info("Job interrupted: " + jobRecord.getJobType()); - } else { - /* If the job failed and wasnt cancelled, it threw some other exception mid-stream, - * and now we must die as we almost certainly cannot continue with the tainted batch sitting in Incoming. - * This will call the proper shutdown procedures and exit gracefully. - * FUTURE: We may be able to more fault-tolerant by moving the failed batch - * into the Failed folder when this happens and continuing to run - */ - PipelineApplication.shutdown(); - } - } + @VisibleForTesting + synchronized List getCompletedJobs() { + return ImmutableList.copyOf(completedJobs); } } diff --git a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/scheduler/SchedulerJob.java b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/scheduler/SchedulerJob.java deleted file mode 100644 index eac19f74fc..0000000000 --- a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/scheduler/SchedulerJob.java +++ /dev/null @@ -1,153 +0,0 @@ -package gov.cms.bfd.pipeline.app.scheduler; - -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; -import gov.cms.bfd.pipeline.app.PipelineManager; -import gov.cms.bfd.pipeline.app.volunteer.VolunteerJob; -import gov.cms.bfd.pipeline.sharedutils.NullPipelineJobArguments; -import gov.cms.bfd.pipeline.sharedutils.PipelineJob; -import gov.cms.bfd.pipeline.sharedutils.PipelineJobOutcome; -import gov.cms.bfd.pipeline.sharedutils.PipelineJobSchedule; -import gov.cms.bfd.pipeline.sharedutils.PipelineJobType; -import gov.cms.bfd.pipeline.sharedutils.jobs.store.PipelineJobRecord; -import gov.cms.bfd.pipeline.sharedutils.jobs.store.PipelineJobRecordStore; -import java.time.Instant; -import java.util.Optional; -import java.util.Set; - -/** - * This {@link PipelineJob} checks the schedules of other jobs and triggers their executions, per - * those schedules. - * - *

Design Note: If we ever move to an autoscaled version of this application, it will be - * important to ensure that this job is only running once across its environment, to avoid duplicate - * job schedule triggers. To that end, this job will get run just like any other job (via the {@link - * PipelineJobRecordStore} and the {@link VolunteerJob}). To ensure that it gets kicked off, the - * {@link PipelineJobRecordStore} has a permanently uncompleted {@link PipelineJobRecord} for this: - * the job will always be running and runs a scheduling loop internally. - */ -public final class SchedulerJob implements PipelineJob { - /** Represents that this job type is a scheduler. */ - public static final PipelineJobType JOB_TYPE = - new PipelineJobType(SchedulerJob.class); - - /** - * The number of milliseconds to wait between schedule check iterations. Regardless of their - * schedule, {@link PipelineJob}s will not be able to run more frequently than this. - * - *

Note: this "constant" is actually mutable, but should only ever be modified by tests. - */ - public static long SCHEDULER_TICK_MILLIS = 10 * 1000; - - /** The metrics for this job. */ - private final MetricRegistry appMetrics; - /** The orchestration object for the pipeline. */ - private final PipelineManager pipelineManager; - /** Holds the records of completed jobs. */ - private final PipelineJobRecordStore jobRecordsStore; - - /** - * Constructs the {@link SchedulerJob}, which should be a singleton within the application - * environment. - * - * @param appMetrics the {@link MetricRegistry} for the overall application - * @param pipelineManager the {@link PipelineManager} that jobs should be run on - * @param jobRecordsStore the {@link PipelineJobRecordStore} tracking jobs that have been - * submitted for execution - */ - public SchedulerJob( - MetricRegistry appMetrics, - PipelineManager pipelineManager, - PipelineJobRecordStore jobRecordsStore) { - this.appMetrics = appMetrics; - this.pipelineManager = pipelineManager; - this.jobRecordsStore = jobRecordsStore; - } - - /** {@inheritDoc} */ - @Override - public Optional getSchedule() { - return Optional.empty(); - } - - /** {@inheritDoc} */ - @Override - public boolean isInterruptible() { - return true; - } - - /** {@inheritDoc} */ - @Override - public PipelineJobOutcome call() throws Exception { - boolean scheduledAJob = false; - while (true) { - try (Timer.Context timer = - appMetrics - .timer(MetricRegistry.name(getClass().getSimpleName(), "call", "iteration")) - .time()) { - Instant now = Instant.now(); - Set> scheduledJobs = - pipelineManager.getScheduledJobs(); - for (PipelineJob scheduledJob : scheduledJobs) { - PipelineJobSchedule jobSchedule = scheduledJob.getSchedule().get(); - Optional> mostRecentExecution = - jobRecordsStore.findMostRecent(scheduledJob.getType()); - - /* Calculate whether or not we should trigger an execution of the next job. */ - boolean shouldTriggerJob; - if (!mostRecentExecution.isPresent()) { - // If the job has never run, we'll always trigger it now, regardless of schedule. - shouldTriggerJob = true; - } else { - if (!mostRecentExecution.get().isCompleted()) { - // If the job's still pending or running, don't double-trigger it. - shouldTriggerJob = false; - } else { - if (mostRecentExecution.get().isCompletedSuccessfully()) { - // If the job's not running, check to see if it's time to trigger it again. - // Note: This calculation is based on completion time, not submission or start time. - Instant nextExecution = - mostRecentExecution - .get() - .getStartedTime() - .get() - .plus(jobSchedule.getRepeatDelay(), jobSchedule.getRepeatDelayUnit()); - shouldTriggerJob = now.equals(nextExecution) || now.isAfter(nextExecution); - } else { - // We don't re-run failed jobs. - shouldTriggerJob = false; - } - } - } - - // If we shouldn't trigger this job, move on to the next. - if (!shouldTriggerJob) { - continue; - } - - // Trigger the job (for future execution, when VolunteerJob picks it up)! - jobRecordsStore.submitPendingJob(scheduledJob.getType(), null); - } - } - - try { - Thread.sleep(SCHEDULER_TICK_MILLIS); - } catch (InterruptedException e) { - /* - * Jobs are only interrupted/cancelled as part of application shutdown, so when encountered, - * we'll break out of our scheduling loop and close up shop here. - */ - break; - } - } - - /* - * Did we schedule at least one job? If we ever move to an autoscaled version of this - * application, it will be important to ensure that we "collude" with the PipelineJobRecordStore - * to ignore this PipelineJobOutcome and ensure that the record doesn't get marked as completed, - * even when the application shuts down. (If that happened, then scheduled triggers would stop - * firing.) - */ - return scheduledAJob ? PipelineJobOutcome.WORK_DONE : PipelineJobOutcome.NOTHING_TO_DO; - } -} diff --git a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/scheduler/package-info.java b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/scheduler/package-info.java deleted file mode 100644 index 65b066063a..0000000000 --- a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/scheduler/package-info.java +++ /dev/null @@ -1,5 +0,0 @@ -/** - * Contains the {@link gov.cms.bfd.pipeline.sharedutils.PipelineJob} that's responsible for kicking - * off other jobs, per their schedules. - */ -package gov.cms.bfd.pipeline.app.scheduler; diff --git a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/volunteer/VolunteerJob.java b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/volunteer/VolunteerJob.java deleted file mode 100644 index e047d10a51..0000000000 --- a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/volunteer/VolunteerJob.java +++ /dev/null @@ -1,116 +0,0 @@ -package gov.cms.bfd.pipeline.app.volunteer; - -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; -import gov.cms.bfd.pipeline.app.PipelineManager; -import gov.cms.bfd.pipeline.sharedutils.NullPipelineJobArguments; -import gov.cms.bfd.pipeline.sharedutils.PipelineJob; -import gov.cms.bfd.pipeline.sharedutils.PipelineJobOutcome; -import gov.cms.bfd.pipeline.sharedutils.PipelineJobSchedule; -import gov.cms.bfd.pipeline.sharedutils.PipelineJobType; -import gov.cms.bfd.pipeline.sharedutils.jobs.store.PipelineJobRecord; -import gov.cms.bfd.pipeline.sharedutils.jobs.store.PipelineJobRecordStore; -import java.util.Optional; -import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This {@link PipelineJob} watches for jobs to be submitted and then claims them: it "volunteers" - * the system for any available work. - */ -public final class VolunteerJob implements PipelineJob { - private static final Logger LOGGER = LoggerFactory.getLogger(VolunteerJob.class); - - /** Represents this job type is a volunteer job (accepts pending jobs). */ - public static final PipelineJobType JOB_TYPE = - new PipelineJobType(VolunteerJob.class); - - /** - * The number of milliseconds to wait between job submission iterations. Regardless of their - * schedule, {@link PipelineJob}s will not be able to run more frequently than this. - * - *

Note: this "constant" is actually mutable, but should only ever be modified by tests. - */ - public static long VOLUNTEER_TICK_MILLIS = 10 * 1000; - - /** The metrics for this job. */ - private final MetricRegistry appMetrics; - /** The orchestration object for the pipeline. */ - private final PipelineManager pipelineManager; - /** Holds the records of completed jobs. */ - private final PipelineJobRecordStore jobRecordsStore; - - /** - * Constructs the {@link VolunteerJob}, which should be a singleton within its JVM. - * - * @param appMetrics the {@link MetricRegistry} for the overall application - * @param pipelineManager the {@link PipelineManager} that jobs should be run on - * @param jobRecordsStore the {@link PipelineJobRecordStore} tracking jobs that have been - * submitted for execution - */ - public VolunteerJob( - MetricRegistry appMetrics, - PipelineManager pipelineManager, - PipelineJobRecordStore jobRecordsStore) { - this.appMetrics = appMetrics; - this.pipelineManager = pipelineManager; - this.jobRecordsStore = jobRecordsStore; - } - - /** {@inheritDoc} */ - @Override - public Optional getSchedule() { - return Optional.empty(); - } - - /** {@inheritDoc} */ - @Override - public boolean isInterruptible() { - return true; - } - - /** {@inheritDoc} */ - @Override - public PipelineJobOutcome call() throws Exception { - boolean enqueuedAJob = false; - while (true) { - try (Timer.Context timer = - appMetrics - .timer(MetricRegistry.name(getClass().getSimpleName(), "call", "iteration")) - .time()) { - /* - * We want to submit up to as many jobs for execution as we can actually work, each - * iteration of this loop. - */ - int executorSlotsOpen = pipelineManager.getOpenExecutorSlots(); - if (executorSlotsOpen <= 0) continue; - - Set> jobsToStart = jobRecordsStore.findPendingJobs(executorSlotsOpen); - LOGGER.trace( - "call() called: executorSlotsOpen='{}', jobsToStart='{}'", - executorSlotsOpen, - jobsToStart); - - // Submit those jobs to start running! - for (PipelineJobRecord jobToStart : jobsToStart) { - pipelineManager.enqueueJob(jobToStart); - enqueuedAJob = true; - } - } - - try { - Thread.sleep(VOLUNTEER_TICK_MILLIS); - } catch (InterruptedException e) { - /* - * Jobs are only interrupted/cancelled as part of application shutdown, so when encountered, - * we'll break out of our scheduling loop and close up shop here. - */ - break; - } - } - - // Did we enqueue at least one job? - return enqueuedAJob ? PipelineJobOutcome.WORK_DONE : PipelineJobOutcome.NOTHING_TO_DO; - } -} diff --git a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/volunteer/package-info.java b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/volunteer/package-info.java deleted file mode 100644 index cfc2a08174..0000000000 --- a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/volunteer/package-info.java +++ /dev/null @@ -1,5 +0,0 @@ -/** - * Contains the {@link gov.cms.bfd.pipeline.sharedutils.PipelineJob} that's responsible for ensuring - * that all the other jobs get executed, as needed. - */ -package gov.cms.bfd.pipeline.app.volunteer; diff --git a/apps/bfd-pipeline/bfd-pipeline-app/src/main/resources/logback.xml b/apps/bfd-pipeline/bfd-pipeline-app/src/main/resources/logback.xml index 2a703684b2..dcdb3060d1 100644 --- a/apps/bfd-pipeline/bfd-pipeline-app/src/main/resources/logback.xml +++ b/apps/bfd-pipeline/bfd-pipeline-app/src/main/resources/logback.xml @@ -23,6 +23,7 @@ + diff --git a/apps/bfd-pipeline/bfd-pipeline-app/src/test/java/gov/cms/bfd/pipeline/app/PipelineApplicationIT.java b/apps/bfd-pipeline/bfd-pipeline-app/src/test/java/gov/cms/bfd/pipeline/app/PipelineApplicationIT.java index a7c9842934..07292328e1 100644 --- a/apps/bfd-pipeline/bfd-pipeline-app/src/test/java/gov/cms/bfd/pipeline/app/PipelineApplicationIT.java +++ b/apps/bfd-pipeline/bfd-pipeline-app/src/test/java/gov/cms/bfd/pipeline/app/PipelineApplicationIT.java @@ -20,7 +20,6 @@ import gov.cms.bfd.pipeline.rda.grpc.server.RandomClaimGeneratorConfig; import gov.cms.bfd.pipeline.rda.grpc.server.RdaMessageSourceFactory; import gov.cms.bfd.pipeline.rda.grpc.server.RdaServer; -import gov.cms.bfd.pipeline.sharedutils.jobs.store.PipelineJobRecordStore; import gov.cms.bfd.pipeline.sharedutils.s3.MinioTestContainer; import gov.cms.bfd.pipeline.sharedutils.s3.S3MinioConfig; import java.io.IOException; @@ -30,8 +29,10 @@ import java.nio.file.Paths; import java.time.Instant; import java.util.Arrays; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import javax.sql.DataSource; import org.apache.commons.codec.binary.Hex; import org.awaitility.Awaitility; @@ -309,12 +310,16 @@ public void rdaPipeline() throws Exception { assertTrue( hasJobRecordMatching( - appRunConsumer, "processed 30 objects in", RdaFissClaimLoadJob.class), + appRunConsumer, + line -> line.contains("processed 30 objects in"), + RdaFissClaimLoadJob.class), "FISS job processed all claims"); assertTrue( hasJobRecordMatching( - appRunConsumer, "processed 30 objects in", RdaMcsClaimLoadJob.class), + appRunConsumer, + line -> line.contains("processed 30 objects in"), + RdaMcsClaimLoadJob.class), "MCS job processed all claims"); // Stop the application. @@ -381,12 +386,16 @@ public void rdaPipelineServerFailure() throws Exception { assertTrue( hasJobRecordMatching( - appRunConsumer, "StatusRuntimeException", RdaFissClaimLoadJob.class), + appRunConsumer, + line -> line.contains("StatusRuntimeException"), + RdaFissClaimLoadJob.class), "FISS job terminated by grpc exception"); assertTrue( hasJobRecordMatching( - appRunConsumer, "StatusRuntimeException", RdaMcsClaimLoadJob.class), + appRunConsumer, + line -> line.contains("StatusRuntimeException"), + RdaMcsClaimLoadJob.class), "MCS job terminated by grpc exception"); // Stop the application. @@ -433,9 +442,7 @@ private static void skipOnUnsupportedOs() { */ private static boolean hasCcwRifLoadJobCompleted(ProcessOutputConsumer appRunConsumer) { return hasJobRecordMatching( - appRunConsumer, - PipelineJobRecordStore.LOG_MESSAGE_PREFIX_JOB_COMPLETED, - CcwRifLoadJob.class); + appRunConsumer, PipelineJobRunner.JobRunSummary::isSuccessString, CcwRifLoadJob.class); } /** @@ -448,7 +455,7 @@ private static boolean hasCcwRifLoadJobCompleted(ProcessOutputConsumer appRunCon private static boolean hasRdaFissLoadJobCompleted(ProcessOutputConsumer appRunConsumer) { return hasJobRecordMatching( appRunConsumer, - PipelineJobRecordStore.LOG_MESSAGE_PREFIX_JOB_COMPLETED, + PipelineJobRunner.JobRunSummary::isSuccessString, RdaFissClaimLoadJob.class); } @@ -461,9 +468,7 @@ private static boolean hasRdaFissLoadJobCompleted(ProcessOutputConsumer appRunCo */ private static boolean hasRdaMcsLoadJobCompleted(ProcessOutputConsumer appRunConsumer) { return hasJobRecordMatching( - appRunConsumer, - PipelineJobRecordStore.LOG_MESSAGE_PREFIX_JOB_COMPLETED, - RdaMcsClaimLoadJob.class); + appRunConsumer, PipelineJobRunner.JobRunSummary::isSuccessString, RdaMcsClaimLoadJob.class); } /** @@ -475,49 +480,21 @@ private static boolean hasRdaMcsLoadJobCompleted(ProcessOutputConsumer appRunCon */ private static boolean hasCcwRifLoadJobFailed(ProcessOutputConsumer appRunConsumer) { return hasJobRecordMatching( - appRunConsumer, PipelineJobRecordStore.LOG_MESSAGE_PREFIX_JOB_FAILED, CcwRifLoadJob.class); - } - - /** - * Checks if the RDA Fiss load job has failed by checking the job records. - * - * @param appRunConsumer the {@link ProcessOutputConsumer} whose output should be checked - * @return true if the application output indicates that data set scanning has - * started, false if not - */ - private static boolean hasRdaFissLoadJobFailed(ProcessOutputConsumer appRunConsumer) { - return hasJobRecordMatching( - appRunConsumer, - PipelineJobRecordStore.LOG_MESSAGE_PREFIX_JOB_FAILED, - RdaFissClaimLoadJob.class); - } - - /** - * Checks if the RDA MCS load job has failed by checking the job records. - * - * @param appRunConsumer the {@link ProcessOutputConsumer} whose output should be checked - * @return true if the application output indicates that data set scanning has - * started, false if not - */ - private static boolean hasRdaMcsLoadJobFailed(ProcessOutputConsumer appRunConsumer) { - return hasJobRecordMatching( - appRunConsumer, - PipelineJobRecordStore.LOG_MESSAGE_PREFIX_JOB_FAILED, - RdaMcsClaimLoadJob.class); + appRunConsumer, PipelineJobRunner.JobRunSummary::isFailureString, CcwRifLoadJob.class); } /** - * Checks if a job has a job record matching a specified value. + * Checks if a job has a job record matching a specified predicate. * * @param appRunConsumer the job to check - * @param prefix the record prefix type to check for - * @param klass the class of the job to check - * @return {@code true} if the job had a record matching the specified prefix type + * @param matcher {@link Predicate} used to find a target string + * @param klass used to verify a target string contains the class name + * @return {@code true} if the job had a record matching the specified predicate and class name */ private static boolean hasJobRecordMatching( - ProcessOutputConsumer appRunConsumer, String prefix, Class klass) { + ProcessOutputConsumer appRunConsumer, Predicate matcher, Class klass) { return appRunConsumer.matches( - line -> line.contains(prefix) && line.contains(klass.getSimpleName())); + line -> matcher.test(line) && line.contains(klass.getSimpleName())); } /** @@ -607,6 +584,16 @@ private static ProcessBuilder createAppProcessBuilder() { DataSource dataSource = DatabaseTestUtils.get().getUnpooledDataSource(); DataSourceComponents dataSourceComponents = new DataSourceComponents(dataSource); + // Remove inherited environment variables that could affect the test in some local environments. + List.of( + AppConfiguration.ENV_VAR_KEY_CCW_RIF_JOB_ENABLED, + AppConfiguration.ENV_VAR_KEY_RDA_JOB_ENABLED, + AppConfiguration.ENV_VAR_KEY_RDA_GRPC_HOST, + AppConfiguration.ENV_VAR_KEY_RDA_GRPC_PORT, + AppConfiguration.ENV_VAR_KEY_RDA_GRPC_AUTH_TOKEN, + AppConfiguration.ENV_VAR_KEY_RDA_GRPC_SERVER_TYPE) + .forEach(envVarName -> appRunBuilder.environment().remove(envVarName)); + appRunBuilder .environment() .put( diff --git a/apps/bfd-pipeline/bfd-pipeline-app/src/test/java/gov/cms/bfd/pipeline/app/PipelineJobRunnerTest.java b/apps/bfd-pipeline/bfd-pipeline-app/src/test/java/gov/cms/bfd/pipeline/app/PipelineJobRunnerTest.java new file mode 100644 index 0000000000..9c5d4664d0 --- /dev/null +++ b/apps/bfd-pipeline/bfd-pipeline-app/src/test/java/gov/cms/bfd/pipeline/app/PipelineJobRunnerTest.java @@ -0,0 +1,425 @@ +package gov.cms.bfd.pipeline.app; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; + +import gov.cms.bfd.pipeline.app.PipelineJobRunner.JobRunSummary; +import gov.cms.bfd.pipeline.sharedutils.PipelineJob; +import gov.cms.bfd.pipeline.sharedutils.PipelineJobOutcome; +import gov.cms.bfd.pipeline.sharedutils.PipelineJobSchedule; +import gov.cms.bfd.sharedutils.interfaces.ThrowingConsumer; +import java.io.IOException; +import java.time.Clock; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +/** Unit tests for {@link PipelineJobRunner}. */ +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +public class PipelineJobRunnerTest { + /** Mock tracker. */ + @Mock private PipelineJobRunner.Tracker tracker; + /** Mock job. */ + @Mock private PipelineJob job; + /** Mock clock. */ + @Mock private Clock clock; + /** Mock sleep function. */ + @Mock private ThrowingConsumer sleeper; + /** Collects the summaries. */ + private List summaries; + /** The runner we are testing. */ + private PipelineJobRunner runner; + + /** + * Sets up common behavior for mocks. + * + * @throws InterruptedException just a pass through because of a method being mocked + */ + @BeforeEach + void setUp() throws InterruptedException { + // By default the job is not interruptable. + doReturn(false).when(job).isInterruptible(); + + // The clock used in tests increments time by 1 ms per call. + final var timestampMillis = new AtomicLong(); + doAnswer(invocation -> Instant.ofEpochMilli(timestampMillis.incrementAndGet())) + .when(clock) + .instant(); + + // Mock tracker assigns job run ids starting at 1. + final var runId = new AtomicLong(); + doAnswer(invocation -> runId.incrementAndGet()).when(tracker).beginningRun(job); + + // Sleeping does nothing at all but the mock will allow us to verify if it was called. + doNothing().when(sleeper).accept(anyLong()); + + // Mock tracker collects job run summaries into a list. + summaries = new ArrayList<>(); + doAnswer(invocation -> summaries.add(invocation.getArgument(0, JobRunSummary.class))) + .when(tracker) + .completedRun(any()); + + // The runner that we'll be testing. + runner = new PipelineJobRunner(tracker, job, sleeper, clock); + } + + /** + * Verifies that a one-time job that finishes successfully works as expected. + * + * @throws InterruptedException just a pass through because of a method being mocked or called + */ + @Test + void noScheduleJobRunsSuccessfully() throws Exception { + // Job has no schedule so it should only run once. + doReturn(Optional.empty()).when(job).getSchedule(); + + // Job will be allowed to run as often as it likes. It just won't want to + // because it has no schedule. + doReturn(true).when(tracker).jobsCanRun(); + + // Any time the job runs it will indicate success. + doReturn(PipelineJobOutcome.WORK_DONE).when(job).call(); + + // Mocks all set up - now run the job. + runner.run(); + + // Verify expected calls were made to the tracker. + verify(tracker).jobsCanRun(); + verify(tracker).beginningRun(job); + verify(tracker, times(0)).sleeping(any()); + verify(tracker).stoppingNormally(job); + verify(tracker, times(0)).stoppingDueToInterrupt(any()); + verify(tracker, times(0)).stoppingDueToException(any(), any()); + verify(tracker).stopped(job); + verifyNoInteractions(sleeper); + + // Verify that the job summary matches expectations + var expectedSummary = + new JobRunSummary( + 1L, + job, + Instant.ofEpochMilli(1), + Instant.ofEpochMilli(2), + Optional.of(PipelineJobOutcome.WORK_DONE), + Optional.empty()); + assertEquals(List.of(expectedSummary), summaries); + } + + /** + * Verifies that a one-time job that is interrupted while running works as expected. + * + * @throws InterruptedException just a pass through because of a method being mocked or called + */ + @Test + void noScheduleInterruptedWhileRunningJob() throws Exception { + // Job has no schedule so it should only run once. + doReturn(Optional.empty()).when(job).getSchedule(); + + // Job will be allowed to run as often as it likes. However, in this test + // it will be stopped by the InterruptedException we configure below. + doReturn(true).when(tracker).jobsCanRun(); + + // When the job runs it will throw an InterruptedException. + final var interrupt = new InterruptedException(); + doThrow(interrupt).when(job).call(); + + // Mocks all set up - now run the job. + runner.run(); + + // Verify expected calls were made to the tracker. + verify(tracker).jobsCanRun(); + verify(tracker).beginningRun(job); + verify(tracker, times(0)).sleeping(any()); + verify(tracker, times(0)).stoppingNormally(job); + verify(tracker).stoppingDueToInterrupt(job); + verify(tracker, times(0)).stoppingDueToException(any(), any()); + verify(tracker).stopped(job); + verifyNoInteractions(sleeper); + + // Verify that the job summary matches expectations + var expectedSummary = + new JobRunSummary( + 1L, + job, + Instant.ofEpochMilli(1), + Instant.ofEpochMilli(2), + Optional.empty(), + Optional.of(interrupt)); + assertEquals(List.of(expectedSummary), summaries); + } + + /** + * Verifies that a one-time job that throws an exception works as expected. + * + * @throws InterruptedException just a pass through because of a method being mocked or called + */ + @Test + void noScheduleJobThrowsExceptionWhileRunning() throws Exception { + // Job has no schedule so it should only run once. + doReturn(Optional.empty()).when(job).getSchedule(); + + // Job will be allowed to run as often as it likes. However, in this test + // it will be stopped by the IOException we configure below. + doReturn(true).when(tracker).jobsCanRun(); + + // Any time the job runs it will thrown an IOException. + final var error = new IOException("boom!"); + doThrow(error).when(job).call(); + + // Mocks all set up - now run the job. + runner.run(); + + // Verify expected calls were made to the tracker. + verify(tracker).jobsCanRun(); + verify(tracker).beginningRun(job); + verify(tracker, times(0)).sleeping(any()); + verify(tracker, times(0)).stoppingNormally(any()); + verify(tracker, times(0)).stoppingDueToInterrupt(any()); + verify(tracker).stoppingDueToException(job, error); + verify(tracker, times(0)).sleeping(any()); + verify(tracker).stopped(job); + verifyNoInteractions(sleeper); + + // Verify that the job summary matches expectations + var expectedSummary = + new JobRunSummary( + 1L, + job, + Instant.ofEpochMilli(1), + Instant.ofEpochMilli(2), + Optional.empty(), + Optional.of(error)); + assertEquals(List.of(expectedSummary), summaries); + } + + /** + * Verifies that a scheduled job runs and sleeps as expected until told to stop by {@link + * PipelineJobRunner.Tracker#jobsCanRun}. + * + * @throws InterruptedException just a pass through because of a method being mocked or called + */ + @Test + void runsUntilStopped() throws Exception { + // Job has a 5 second schedule. We'll verify this value was passed to sleeper below. + final var repeatMills = 5_000L; + doReturn(Optional.of(new PipelineJobSchedule(repeatMills, ChronoUnit.MILLIS))) + .when(job) + .getSchedule(); + + // Job can run twice before being told to stop. Each run of the job calls jobsCanRun + // twice (at top of loop and in mid-loop if statement) so we mock two runs (allowed by true + // being returned four times) and a prevented third run (disallowed by false being returned). + doReturn(true, true, true, true, false).when(tracker).jobsCanRun(); + + // Any time the job runs it will indicate success. + doReturn(PipelineJobOutcome.WORK_DONE).when(job).call(); + + // Mocks all set up - now run the job. + runner.run(); + + // Verify expected calls were made to the tracker. + verify(tracker, times(5)).jobsCanRun(); + verify(tracker, times(2)).beginningRun(job); + verify(tracker, times(2)).sleeping(any()); + verify(tracker).stoppingNormally(job); + verify(tracker, times(0)).stoppingDueToInterrupt(any()); + verify(tracker, times(0)).stoppingDueToException(any(), any()); + verify(tracker).stopped(job); + verify(sleeper, times(2)).accept(repeatMills); + + // Verify that the job summary matches expectations + var expectedSummaries = + List.of( + new JobRunSummary( + 1L, + job, + Instant.ofEpochMilli(1), + Instant.ofEpochMilli(2), + Optional.of(PipelineJobOutcome.WORK_DONE), + Optional.empty()), + new JobRunSummary( + 2L, + job, + Instant.ofEpochMilli(3), + Instant.ofEpochMilli(4), + Optional.of(PipelineJobOutcome.WORK_DONE), + Optional.empty())); + assertEquals(expectedSummaries, summaries); + } + + /** + * Verifies that a scheduled job runs and sleeps as expected until it catches an {@link + * InterruptedException} while sleeping between runs. + * + * @throws InterruptedException just a pass through because of a method being mocked or called + */ + @Test + void runsUntilInterruptedBetweenRuns() throws Exception { + // Job has a 5 second schedule. We'll verify this value was passed to sleeper below. + final var repeatMills = 5_000L; + doReturn(Optional.of(new PipelineJobSchedule(repeatMills, ChronoUnit.MILLIS))) + .when(job) + .getSchedule(); + + // Job will always be allowed to run, but it will be stopped by an InterruptedException + // thrown by sleeper, which we configure below. + doReturn(true).when(tracker).jobsCanRun(); + + // Any time the job runs it will indicate success. + doReturn(PipelineJobOutcome.WORK_DONE).when(job).call(); + + // First call to sleep will work normally (doNothing) but the second call will + // throw an InterruptedException. + doNothing().doThrow(InterruptedException.class).when(sleeper).accept(any()); + + // Mocks all set up - now run the job. + runner.run(); + + // Verify expected calls were made to the tracker. + verify(tracker, times(4)).jobsCanRun(); + verify(tracker, times(2)).beginningRun(job); + verify(tracker, times(2)).sleeping(any()); + verify(tracker, times(0)).stoppingNormally(any()); + verify(tracker).stoppingDueToInterrupt(job); + verify(tracker, times(0)).stoppingDueToException(any(), any()); + verify(tracker).stopped(job); + verify(sleeper, times(2)).accept(repeatMills); + + // Verify that the job summary matches expectations + var expectedSummaries = + List.of( + new JobRunSummary( + 1L, + job, + Instant.ofEpochMilli(1), + Instant.ofEpochMilli(2), + Optional.of(PipelineJobOutcome.WORK_DONE), + Optional.empty()), + new JobRunSummary( + 2L, + job, + Instant.ofEpochMilli(3), + Instant.ofEpochMilli(4), + Optional.of(PipelineJobOutcome.WORK_DONE), + Optional.empty())); + assertEquals(expectedSummaries, summaries); + } + + /** + * Verifies that a scheduled job runs and sleeps as expected until it throws an exception while + * running. + * + * @throws InterruptedException just a pass through because of a method being mocked or called + */ + @Test + void runsUntilThrows() throws Exception { + // Job has a 5 second schedule. We'll verify this value was passed to sleeper below. + final var repeatMills = 5_000L; + doReturn(Optional.of(new PipelineJobSchedule(repeatMills, ChronoUnit.MILLIS))) + .when(job) + .getSchedule(); + + // Job will always be allowed to run, but it will be stopped by an IOException + // thrown by itself, which we configure below. + doReturn(true).when(tracker).jobsCanRun(); + + // First two calls to the job will return success but the third time will throw + // an IOException. + final var error = new IOException("boom!"); + doReturn(PipelineJobOutcome.WORK_DONE, PipelineJobOutcome.WORK_DONE) + .doThrow(error) + .when(job) + .call(); + + // Mocks all set up - now run the job. + runner.run(); + + // Verify expected calls were made to the tracker. + verify(tracker, times(5)).jobsCanRun(); + verify(tracker, times(3)).beginningRun(job); + verify(tracker, times(2)).sleeping(any()); + verify(tracker, times(0)).stoppingNormally(any()); + verify(tracker, times(0)).stoppingDueToInterrupt(any()); + verify(tracker).stoppingDueToException(job, error); + verify(tracker).stopped(job); + verify(sleeper, times(2)).accept(repeatMills); + + // Verify that the job summary matches expectations + var expectedSummaries = + List.of( + new JobRunSummary( + 1L, + job, + Instant.ofEpochMilli(1), + Instant.ofEpochMilli(2), + Optional.of(PipelineJobOutcome.WORK_DONE), + Optional.empty()), + new JobRunSummary( + 2L, + job, + Instant.ofEpochMilli(3), + Instant.ofEpochMilli(4), + Optional.of(PipelineJobOutcome.WORK_DONE), + Optional.empty()), + new JobRunSummary( + 3L, + job, + Instant.ofEpochMilli(5), + Instant.ofEpochMilli(6), + Optional.empty(), + Optional.of(error))); + assertEquals(expectedSummaries, summaries); + } + + /** Verify that job run summaries can be recognized in log lines. */ + @Test + void logMessagesParsedCorrectly() { + final var successSummary = + new JobRunSummary( + 1L, + job, + Instant.ofEpochMilli(1), + Instant.ofEpochMilli(2), + Optional.of(PipelineJobOutcome.WORK_DONE), + Optional.empty()); + + final var error = new IOException("boom!"); + final var failureSummary = + new JobRunSummary( + 3L, + job, + Instant.ofEpochMilli(5), + Instant.ofEpochMilli(6), + Optional.empty(), + Optional.of(error)); + + var successMessage = "some prefix" + successSummary + "some suffix"; + assertTrue(JobRunSummary.isSuccessString(successMessage)); + assertFalse(JobRunSummary.isFailureString(successMessage)); + + var failureMessage = "some prefix" + failureSummary + "some suffix"; + assertFalse(JobRunSummary.isSuccessString(failureMessage)); + assertTrue(JobRunSummary.isFailureString(failureMessage)); + } +} diff --git a/apps/bfd-pipeline/bfd-pipeline-app/src/test/java/gov/cms/bfd/pipeline/app/PipelineManagerIT.java b/apps/bfd-pipeline/bfd-pipeline-app/src/test/java/gov/cms/bfd/pipeline/app/PipelineManagerIT.java index 183e6685e0..4dcc02f192 100644 --- a/apps/bfd-pipeline/bfd-pipeline-app/src/test/java/gov/cms/bfd/pipeline/app/PipelineManagerIT.java +++ b/apps/bfd-pipeline/bfd-pipeline-app/src/test/java/gov/cms/bfd/pipeline/app/PipelineManagerIT.java @@ -1,575 +1,228 @@ package gov.cms.bfd.pipeline.app; -import static com.github.stefanbirkner.systemlambda.SystemLambda.catchSystemExit; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Slf4jReporter; -import gov.cms.bfd.pipeline.PipelineTestUtils; -import gov.cms.bfd.pipeline.app.scheduler.SchedulerJob; -import gov.cms.bfd.pipeline.app.volunteer.VolunteerJob; -import gov.cms.bfd.pipeline.ccw.rif.extract.s3.task.S3TaskManager; -import gov.cms.bfd.pipeline.sharedutils.NullPipelineJobArguments; import gov.cms.bfd.pipeline.sharedutils.PipelineJob; import gov.cms.bfd.pipeline.sharedutils.PipelineJobOutcome; import gov.cms.bfd.pipeline.sharedutils.PipelineJobSchedule; import gov.cms.bfd.pipeline.sharedutils.PipelineJobType; -import gov.cms.bfd.pipeline.sharedutils.jobs.store.PipelineJobRecord; -import gov.cms.bfd.pipeline.sharedutils.jobs.store.PipelineJobRecordStore; import gov.cms.bfd.sharedutils.exceptions.BadCodeMonkeyException; +import gov.cms.bfd.sharedutils.interfaces.ThrowingConsumer; +import java.time.Clock; import java.time.temporal.ChronoUnit; +import java.util.List; import java.util.Optional; -import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.AllArgsConstructor; import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; -import org.mockito.Mock; -import org.mockito.MockedStatic; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Integration tests for {@link PipelineManager}, {@link PipelineJobRecordStore}, and friends. */ -public final class PipelineManagerIT { - private static final Logger LOGGER = LoggerFactory.getLogger(PipelineManagerIT.class); - - /** Mocks the S3 task manager, which is only used in error scenarios. */ - @Mock S3TaskManager mockS3TaskManager; - /** - * Logs a message before each test. - * - * @param testInfo the test info - */ - @BeforeEach - public void starting(TestInfo testInfo) { - MockitoAnnotations.openMocks(this); - LOGGER.info("{}: starting.", testInfo.getDisplayName()); - } +/** Integration tests for {@link PipelineManager}. */ +public final class PipelineManagerIT { + /** Sleep function that keeps the sleep time short for testing. */ + private static final ThrowingConsumer SLEEPER = + millis -> Thread.sleep(Math.min(5, millis)); - /** - * Logs a message after each test. - * - * @param testInfo the test info - */ - @AfterEach - public void finished(TestInfo testInfo) { - LOGGER.info("{}: finished.", testInfo.getDisplayName()); - } + /** We don't care about timestamps in these tests so we can just use system clock. */ + private final Clock clock = Clock.systemUTC(); - /** - * Verifies that {@link PipelineManager} automatically runs {@link MockJob} and {@link - * SchedulerJob}, as expected. - * - * @throws Exception Any unhandled {@link Exception}s will cause this test case to fail. - */ + /** Verifies that {@link PipelineManager} runs a successful mock one-shot job, as expected. */ @Test - public void runBuiltinJobs() throws Exception { - // Create the pipeline. - PipelineJobRecordStore jobRecordStore = - new PipelineJobRecordStore( - PipelineTestUtils.get().getPipelineApplicationState().getMetrics()); - try (PipelineManager pipelineManager = - new PipelineManager( - PipelineTestUtils.get().getPipelineApplicationState().getMetrics(), - jobRecordStore, - mockS3TaskManager)) { - // Verify that there are job records for the built-ins. - - assertEquals(2, jobRecordStore.getJobRecords().size()); - - assertTrue( - jobRecordStore.getJobRecords().stream() - .anyMatch(j -> VolunteerJob.JOB_TYPE.equals(j.getJobType()))); - - assertTrue( - jobRecordStore.getJobRecords().stream() - .anyMatch(j -> SchedulerJob.JOB_TYPE.equals(j.getJobType()))); - } + public void runSuccessfulMockOneshotJob() { + // Since this has no schedule it will run once and then exit. + final var mockJob = new MockJob(Optional.empty(), true, () -> PipelineJobOutcome.WORK_DONE); + + final var pipelineManager = new PipelineManager(SLEEPER, clock, List.of(mockJob)); + pipelineManager.start(); + pipelineManager.awaitCompletion(); + + assertEquals(1, pipelineManager.getCompletedJobs().size()); + var jobSummary = pipelineManager.getCompletedJobs().get(0); + assertEquals(Optional.of(PipelineJobOutcome.WORK_DONE), jobSummary.getOutcome()); + assertEquals(Optional.empty(), jobSummary.getException()); } - /** - * Verifies that {@link PipelineManager} runs a successful mock one-shot job, as expected. - * - * @throws Exception Any unhandled {@link Exception}s will cause this test case to fail. - */ + /** Verifies that {@link PipelineManager} runs a failing mock one-shot job, as expected. */ @Test - public void runSuccessfulMockOneshotJob() throws Exception { - // Create the pipeline and have it run a mock job. - PipelineJobRecordStore jobRecordStore = - new PipelineJobRecordStore( - PipelineTestUtils.get().getPipelineApplicationState().getMetrics()); - try (PipelineManager pipelineManager = - new PipelineManager( - PipelineTestUtils.get().getPipelineApplicationState().getMetrics(), - jobRecordStore, - mockS3TaskManager)) { - MockJob mockJob = new MockJob(Optional.empty(), () -> PipelineJobOutcome.WORK_DONE); - pipelineManager.registerJob(mockJob); - jobRecordStore.submitPendingJob(MockJob.JOB_TYPE, null); - - // Wait until a completed iteration of the mock job can be found. - Awaitility.await() - .atMost(1, TimeUnit.SECONDS) - .until( - () -> - jobRecordStore.getJobRecords().stream() - .anyMatch(j -> MockJob.JOB_TYPE.equals(j.getJobType()) && j.isCompleted())); - - // Verify that one of the completed mock job iterations looks correct. - Optional> mockJobRecord = - jobRecordStore.getJobRecords().stream() - .filter(j -> MockJob.JOB_TYPE.equals(j.getJobType())) - .findAny(); - - assertEquals(Optional.of(PipelineJobOutcome.WORK_DONE), mockJobRecord.get().getOutcome()); - } + public void runFailingMockOneshotJob() { + final var error = new RuntimeException("boom"); + + // Since this has no schedule it will run once and then exit. + final var mockJob = + new MockJob( + Optional.empty(), + true, + () -> { + throw error; + }); + + final var pipelineManager = new PipelineManager(SLEEPER, clock, List.of(mockJob)); + pipelineManager.start(); + pipelineManager.awaitCompletion(); + + assertEquals(1, pipelineManager.getCompletedJobs().size()); + var jobSummary = pipelineManager.getCompletedJobs().get(0); + assertEquals(Optional.empty(), jobSummary.getOutcome()); + assertEquals(Optional.of(error), jobSummary.getException()); + assertEquals(error, pipelineManager.getError()); } - /** - * Verifies that {@link PipelineManager} runs a failing mock one-shot job, as expected. - * - * @throws Exception Any unhandled {@link Exception}s will cause this test case to fail. - */ + /** Verifies that {@link PipelineManager} runs a successful mock scheduled job, as expected. */ @Test - public void runFailingMockOneshotJob() throws Exception { - try (MockedStatic mockPipelineApp = - Mockito.mockStatic(PipelineApplication.class)) { - // Create the pipeline and have it run a mock job. - PipelineJobRecordStore jobRecordStore = - new PipelineJobRecordStore( - PipelineTestUtils.get().getPipelineApplicationState().getMetrics()); - try (PipelineManager pipelineManager = - new PipelineManager( - PipelineTestUtils.get().getPipelineApplicationState().getMetrics(), - jobRecordStore, - mockS3TaskManager)) { - MockJob mockJob = - new MockJob( - Optional.empty(), - () -> { - throw new RuntimeException("boom"); - }); - pipelineManager.registerJob(mockJob); - - int statusCode = - catchSystemExit( - () -> { - jobRecordStore.submitPendingJob(MockJob.JOB_TYPE, null); - - // Wait until a completed iteration of the mock job can be found. - Awaitility.await() - .atMost(1, TimeUnit.SECONDS) - .until( - () -> - jobRecordStore.getJobRecords().stream() - .anyMatch( - j -> - MockJob.JOB_TYPE.equals(j.getJobType()) - && j.isCompleted())); - }); - - assertEquals(2, statusCode); - - // Verify that one of the completed mock job iterations looks correct. - Optional> mockJobRecord = - jobRecordStore.getJobRecords().stream() - .filter(j -> MockJob.JOB_TYPE.equals(j.getJobType())) - .findAny(); - - assertEquals(RuntimeException.class, mockJobRecord.get().getFailure().get().getType()); - - assertEquals("boom", mockJobRecord.get().getFailure().get().getMessage()); - } - } + public void runSuccessfulScheduledJob() { + final var mockJob = + new MockJob( + Optional.of(new PipelineJobSchedule(10, ChronoUnit.MILLIS)), + true, + () -> PipelineJobOutcome.WORK_DONE); + + final var pipelineManager = new PipelineManager(SLEEPER, clock, List.of(mockJob)); + pipelineManager.start(); + + // Wait until a completed iteration of the mock job can be found. + Awaitility.await() + .atMost(1, TimeUnit.SECONDS) + .until( + () -> + pipelineManager.getCompletedJobs().stream() + .anyMatch(j -> MockJob.JOB_TYPE.equals(j.getJob().getType()))); + + pipelineManager.stop(); + pipelineManager.awaitCompletion(); + + // Verify that one of the completed mock job iterations looks correct. + Optional mockJobSummary = + pipelineManager.getCompletedJobs().stream() + .filter( + j -> MockJob.JOB_TYPE.equals(j.getJob().getType()) && j.getOutcome().isPresent()) + .findAny(); + + assertTrue(mockJobSummary.isPresent()); + assertEquals(Optional.of(PipelineJobOutcome.WORK_DONE), mockJobSummary.get().getOutcome()); + assertNull(pipelineManager.getError()); } /** - * Verifies that {@link PipelineManager} runs a successful mock scheduled job, as expected. - * - * @throws Exception Any unhandled {@link Exception}s will cause this test case to fail. + * Verifies that {@link PipelineManager} runs a mock scheduled job until it fails and them shuts + * down, as expected. */ @Test - public void runSuccessfulScheduledJob() throws Exception { - // Create the pipeline and have it run a mock job. - PipelineJobRecordStore jobRecordStore = - new PipelineJobRecordStore( - PipelineTestUtils.get().getPipelineApplicationState().getMetrics()); - try (PipelineManager pipelineManager = - new PipelineManager( - PipelineTestUtils.get().getPipelineApplicationState().getMetrics(), - jobRecordStore, - mockS3TaskManager)) { - MockJob mockJob = - new MockJob( - Optional.of(new PipelineJobSchedule(1, ChronoUnit.MILLIS)), - () -> PipelineJobOutcome.WORK_DONE); - pipelineManager.registerJob(mockJob); - jobRecordStore.submitPendingJob(MockJob.JOB_TYPE, null); - - // Wait until a completed iteration of the mock job can be found. - Awaitility.await() - .atMost(1, TimeUnit.SECONDS) - .until( - () -> - jobRecordStore.getJobRecords().stream() - .filter(j -> MockJob.JOB_TYPE.equals(j.getJobType()) && j.isCompleted()) - .findAny() - .isPresent()); - - // Verify that one of the completed mock job iterations looks correct. - Optional> mockJobRecord = - jobRecordStore.getJobRecords().stream() - .filter(j -> MockJob.JOB_TYPE.equals(j.getJobType()) && j.isCompleted()) - .findAny(); - - assertEquals(Optional.of(PipelineJobOutcome.WORK_DONE), mockJobRecord.get().getOutcome()); - } - } + public void runFailingScheduledJob() { + final var error = new RuntimeException("boom"); + + final var runCount = new AtomicInteger(); + final var mockJob = + new MockJob( + Optional.of(new PipelineJobSchedule(10, ChronoUnit.MILLIS)), + true, + () -> { + if (runCount.incrementAndGet() >= 3) { + throw error; + } else { + return PipelineJobOutcome.WORK_DONE; + } + }); - /** - * Verifies that {@link PipelineManager} runs a failing mock scheduled job, as expected. - * - * @throws Exception Any unhandled {@link Exception}s will cause this test case to fail. - */ - @Test - public void runFailingScheduledJob() throws Exception { - // Create the pipeline and have it run a mock job. - PipelineJobRecordStore jobRecordStore = - new PipelineJobRecordStore( - PipelineTestUtils.get().getPipelineApplicationState().getMetrics()); - try (PipelineManager pipelineManager = - new PipelineManager( - PipelineTestUtils.get().getPipelineApplicationState().getMetrics(), - jobRecordStore, - mockS3TaskManager)) { - // The delay has to be long enough to avoid race conditions where the job gets re-scheduled - // before the app stops. There is a window of time between the failed job being dequeued - // and the app shutting down during which the scheduler can reschedule the app. - MockJob mockJob = - new MockJob( - Optional.of(new PipelineJobSchedule(50, ChronoUnit.MILLIS)), - () -> { - throw new RuntimeException("boom"); - }); - pipelineManager.registerJob(mockJob); - - int statusCode = - catchSystemExit( - () -> { - jobRecordStore.submitPendingJob(MockJob.JOB_TYPE, null); - - // Wait until a completed job can be found. - Awaitility.await() - .atMost(1, TimeUnit.SECONDS) - .until( - () -> - jobRecordStore.getJobRecords().stream() - .filter( - j -> MockJob.JOB_TYPE.equals(j.getJobType()) && j.isCompleted()) - .findAny() - .isPresent()); - }); - - assertEquals(2, statusCode); - - // Verify that one of the completed mock job iterations looks correct. - Optional> mockJobRecord = - jobRecordStore.getJobRecords().stream() - .filter(j -> MockJob.JOB_TYPE.equals(j.getJobType()) && j.isCompleted()) - .findAny(); - - assertEquals(RuntimeException.class, mockJobRecord.get().getFailure().get().getType()); - - assertEquals("boom", mockJobRecord.get().getFailure().get().getMessage()); - - // Make sure that the job stopped trying to execute after it failed. - - var jobRecords = - jobRecordStore.getJobRecords().stream() - .filter(j -> MockJob.JOB_TYPE.equals(j.getJobType())) - .toList(); - if (jobRecords.size() != 1) { - assertEquals( - "", - jobRecordStore.getJobRecords().stream() - .map(PipelineJobRecord::toString) - .collect(Collectors.joining("\n"))); - } - assertEquals(1, jobRecords.size()); - } - } + final var pipelineManager = new PipelineManager(SLEEPER, clock, List.of(mockJob)); + pipelineManager.start(); + pipelineManager.awaitCompletion(); - /** - * Verifies that {@link PipelineManager#stop()} works, as expected. - * - * @throws Exception Any unhandled {@link Exception}s will cause this test case to fail. - */ - @Test - public void runInterruptibleJobsThenStop() throws Exception { - // Create the pipeline and have it run a mock job. - PipelineJobRecordStore jobRecordStore = - new PipelineJobRecordStore( - PipelineTestUtils.get().getPipelineApplicationState().getMetrics()); - try (PipelineManager pipelineManager = - new PipelineManager( - PipelineTestUtils.get().getPipelineApplicationState().getMetrics(), - jobRecordStore, - mockS3TaskManager)) { - MockJob mockJob = - new MockJob( - Optional.of(new PipelineJobSchedule(1, ChronoUnit.MILLIS)), - () -> { - // Add an artificial delay that we'll be able to measure. - Thread.sleep(500); - return PipelineJobOutcome.WORK_DONE; - }); - pipelineManager.registerJob(mockJob); - jobRecordStore.submitPendingJob(MockJob.JOB_TYPE, null); - - // Wait until the mock job has started. - Awaitility.await() - .atMost(1, TimeUnit.SECONDS) - .until( - () -> - jobRecordStore.getJobRecords().stream() - .filter(j -> MockJob.JOB_TYPE.equals(j.getJobType()) && j.isStarted()) - .findAny() - .isPresent()); - - // Stop the pipeline and then make sure that the job was actually interrupted. - pipelineManager.stop(); - PipelineJobRecord mockJobRecord = - jobRecordStore.findMostRecent(MockJob.JOB_TYPE).get(); - - assertTrue(mockJobRecord.getCanceledTime().isPresent()); - - assertTrue(mockJobRecord.getDuration().get().toMillis() < 500); - } - } + final var summaries = pipelineManager.getCompletedJobs(); + assertEquals(3, summaries.size()); - /** - * Verifies that {@link PipelineManager#stop()} works, as expected. - * - * @throws Exception Any unhandled {@link Exception}s will cause this test case to fail. - */ - @Test - public void runUninterruptibleJobsThenStop() throws Exception { - // Create the pipeline and have it run a mock job. - PipelineJobRecordStore jobRecordStore = - new PipelineJobRecordStore( - PipelineTestUtils.get().getPipelineApplicationState().getMetrics()); - try (PipelineManager pipelineManager = - new PipelineManager( - PipelineTestUtils.get().getPipelineApplicationState().getMetrics(), - jobRecordStore, - mockS3TaskManager)) { - MockJob mockJob = - new MockJob( - Optional.of(new PipelineJobSchedule(1, ChronoUnit.MILLIS)), - false, - () -> { - return PipelineJobOutcome.WORK_DONE; - }); - pipelineManager.registerJob(mockJob); - jobRecordStore.submitPendingJob(MockJob.JOB_TYPE, null); - - // Wait until the mock job has started. - Awaitility.await() - .atMost(1, TimeUnit.SECONDS) - .until( - () -> - jobRecordStore.getJobRecords().stream() - .filter(j -> MockJob.JOB_TYPE.equals(j.getJobType()) && j.isStarted()) - .findAny() - .isPresent()); - - // Stop the pipeline. If this doesn't hang, we're good. - pipelineManager.stop(); - } - } + var jobSummary = summaries.get(0); + assertEquals(Optional.of(PipelineJobOutcome.WORK_DONE), jobSummary.getOutcome()); + assertEquals(Optional.empty(), jobSummary.getException()); - /** - * Verifies that {@link PipelineManager#stop()} works, as expected. - * - * @throws Exception Any unhandled {@link Exception}s will cause this test case to fail. - */ - @Test - public void runThenStopAndCancelPendingJobs() throws Exception { - // Create the pipeline and a slow mock job that we can use. - PipelineJobRecordStore jobRecordStore = - new PipelineJobRecordStore( - PipelineTestUtils.get().getPipelineApplicationState().getMetrics()); - try (PipelineManager pipelineManager = - new PipelineManager( - PipelineTestUtils.get().getPipelineApplicationState().getMetrics(), - jobRecordStore, - mockS3TaskManager)) { - MockJob mockJob = - new MockJob( - Optional.empty(), - () -> { - // Add an artificial delay that we'll be able to measure. - Thread.sleep(500); - return PipelineJobOutcome.WORK_DONE; - }); - pipelineManager.registerJob(mockJob); - - /* - * Once the VolunteerJob is running, submit enough slow mock jobs to fill up the - * PipelineManager's executor threads/slots. - */ - Awaitility.await() - .atMost(1, TimeUnit.SECONDS) - .until( - () -> - jobRecordStore.getJobRecords().stream() - .filter(j -> VolunteerJob.JOB_TYPE.equals(j.getJobType()) && j.isStarted()) - .findAny() - .isPresent()); - int openExecutorSlots = pipelineManager.getOpenExecutorSlots(); - for (int i = 0; i < openExecutorSlots; i++) { - jobRecordStore.submitPendingJob(MockJob.JOB_TYPE, null); - } + jobSummary = summaries.get(1); + assertEquals(Optional.of(PipelineJobOutcome.WORK_DONE), jobSummary.getOutcome()); + assertEquals(Optional.empty(), jobSummary.getException()); - // Add one extra job that should sit as pending for a bit. - jobRecordStore.submitPendingJob(MockJob.JOB_TYPE, null); - - // Wait until one of the mock jobs has started. - Awaitility.await() - .atMost(1, TimeUnit.SECONDS) - .until( - () -> - jobRecordStore.getJobRecords().stream() - .filter(j -> MockJob.JOB_TYPE.equals(j.getJobType()) && j.isStarted()) - .findAny() - .isPresent()); - - // Stop the pipeline and verify that at least one job was cancelled before it started. - pipelineManager.stop(); - - assertTrue( - jobRecordStore.getJobRecords().stream() - .filter(j -> MockJob.JOB_TYPE.equals(j.getJobType()) && !j.isStarted()) - .findAny() - .isPresent()); - } + jobSummary = summaries.get(2); + assertEquals(Optional.empty(), jobSummary.getOutcome()); + assertEquals(Optional.of(error), jobSummary.getException()); + assertEquals(error, pipelineManager.getError()); } /** - * Verifies that {@link PipelineManager} can performantly handle large numbers of job executions, - * as expected. Note that "performantly" is relative to what we need: fast enough to run a few - * jobs every second of the day. Mostly, though, this test case is used as a good way to inspect - * and evaluate the various metrics that the application collects. - * - *

This is intentionally left ignored most of the time, so as to not slow down our builds. It - * should only be run if/when someone is looking into performance issues. + * Verifies that {@link PipelineManager#stop()} works, as expected. * - * @throws Exception Any unhandled {@link Exception}s will cause this test case to fail. + * @throws Exception pass through if test throws */ @Test - @Disabled - public void runWayTooManyJobsThenStop() throws Exception { - // Let's speed things up a bit, so we can run more iterations, faster. - SchedulerJob.SCHEDULER_TICK_MILLIS = 1; - VolunteerJob.VOLUNTEER_TICK_MILLIS = 1; - - MetricRegistry appMetrics = new MetricRegistry(); - Slf4jReporter.forRegistry(appMetrics).outputTo(LOGGER).build().start(30, TimeUnit.SECONDS); - - // Create the pipeline. - PipelineJobRecordStore jobRecordStore = new PipelineJobRecordStore(appMetrics); - try (PipelineManager pipelineManager = - new PipelineManager(appMetrics, jobRecordStore, mockS3TaskManager)) { - // Register a mock unscheduled job. - MockJob mockUnscheduledJob = - new MockJob( - Optional.empty(), - () -> { - return PipelineJobOutcome.WORK_DONE; - }); - pipelineManager.registerJob(mockUnscheduledJob); - - // Register a second scheduled job. - MockJob mockScheduledJob = - new MockJob( - Optional.of(new PipelineJobSchedule(1, ChronoUnit.MILLIS)), - () -> { - return PipelineJobOutcome.WORK_DONE; - }) { - /* - * Very hacky, but here we're extending MockJob with an anonymous class that has a - * different getType() value. - */ - - /** - * @see gov.cms.bfd.pipeline.app.PipelineManagerIT.MockJob#getType() - */ - @Override - public PipelineJobType getType() { - return new PipelineJobType<>(this); - } - }; - pipelineManager.registerJob(mockScheduledJob); - - /* - * Submit way too many executions of the unscheduled job. The number here corresponds to how - * many executions you'd get if it was run once a second, every second of the day. - */ - for (int i = 0; i < 24 * 60 * 60; i++) { - jobRecordStore.submitPendingJob(MockJob.JOB_TYPE, null); - } - - /* - * Wait until all of the jobs have completed, with a large timeout. Don't worry: it only takes - * about 500 seconds on my system. - */ - Awaitility.await() - .atMost(20, TimeUnit.MINUTES) - .until( - () -> - jobRecordStore.getJobRecords().stream() - .noneMatch(j -> MockJob.JOB_TYPE.equals(j.getJobType()) && !j.isCompleted())); - - // Stop the pipeline. - pipelineManager.stop(); - - // Verify that all jobs completed successfully. - Set> unsuccessfulJobs = - jobRecordStore.getJobRecords().stream() - .filter(j -> MockJob.JOB_TYPE.equals(j.getJobType()) && !j.isCompletedSuccessfully()) - .collect(Collectors.toSet()); - - assertEquals(0, unsuccessfulJobs.size()); - - // Ensure that the final metrics get logged. - Slf4jReporter.forRegistry(appMetrics).outputTo(LOGGER).build().report(); - } finally { - configureTimers(); - } + public void runInterruptableJobsThenStop() throws Exception { + // lets the main thread know the job has started + final var latch = new CountDownLatch(1); + + final var mockJob = + new MockJob( + Optional.empty(), + true, + () -> { + // sync up with the test thread + latch.countDown(); + + // Sleep that will be interrupted + Thread.sleep(10_000); + return PipelineJobOutcome.WORK_DONE; + }); + + final var pipelineManager = new PipelineManager(SLEEPER, clock, List.of(mockJob)); + pipelineManager.start(); + + // wait until we know the job has started + latch.await(); + + // stop the job before it can finish + pipelineManager.stop(); + + // wait for the job to finish + pipelineManager.awaitCompletion(); + + // verify the job was interrupted + assertEquals(1, pipelineManager.getCompletedJobs().size()); + var jobSummary = pipelineManager.getCompletedJobs().get(0); + assertEquals(Optional.empty(), jobSummary.getOutcome()); + assertEquals( + Optional.of(InterruptedException.class), jobSummary.getException().map(Object::getClass)); + assertNull(pipelineManager.getError()); } - /** Reduce tick time on built-in jobs, to speed test execution. */ - @BeforeAll - public static void configureTimers() { - VolunteerJob.VOLUNTEER_TICK_MILLIS = 10; - SchedulerJob.SCHEDULER_TICK_MILLIS = 10; + /** Verifies that {@link PipelineManager#stop()} works with uninterruptible jobs. */ + @Test + public void runUninterruptibleJobsThenStop() { + final var mockJob = + new MockJob( + Optional.of(new PipelineJobSchedule(1, ChronoUnit.MILLIS)), + false, + () -> PipelineJobOutcome.WORK_DONE); + + final var pipelineManager = new PipelineManager(SLEEPER, clock, List.of(mockJob)); + pipelineManager.start(); + + // Wait until the mock job has started. + Awaitility.await() + .atMost(1, TimeUnit.SECONDS) + .until( + () -> + pipelineManager.getCompletedJobs().stream() + .anyMatch(j -> MockJob.JOB_TYPE.equals(j.getJob().getType()))); + + // Stop the pipeline. If this doesn't hang, we're good. + pipelineManager.stop(); + pipelineManager.awaitCompletion(); } /** This mock {@link PipelineJob} returns a specified result. */ - private static class MockJob implements PipelineJob { + @AllArgsConstructor + private static class MockJob implements PipelineJob { /** Represents the job type for this mock job. */ - public static final PipelineJobType JOB_TYPE = - new PipelineJobType(MockJob.class); + public static final PipelineJobType JOB_TYPE = new PipelineJobType(MockJob.class); /** The pipeline job schedule for the mock job. */ private final Optional schedule; @@ -578,47 +231,16 @@ private static class MockJob implements PipelineJob { /** The {@link Callable} that will create the values to use for {@link #call()}. */ private final Callable jobResultProducer; - /** - * Constructs a new {@link MockJob} instance. - * - * @param schedule the value to use for {@link #getSchedule()} - * @param interruptible the value to use for {@link #isInterruptible()} - * @param jobResultProducer the {@link Callable} that will create the values to use for {@link - * #call()} - */ - public MockJob( - Optional schedule, - boolean interruptible, - Callable jobResultProducer) { - this.schedule = schedule; - this.interruptible = interruptible; - this.jobResultProducer = jobResultProducer; - } - - /** - * Constructs a new {@link MockJob} instance. - * - * @param schedule the value to use for {@link #getSchedule()} - * @param jobResultProducer the {@link Callable} that will create the values to use for {@link - * #call()} - */ - public MockJob(Optional schedule, Callable jobResultProducer) { - this(schedule, true, jobResultProducer); - } - - /** {@inheritDoc} */ @Override public Optional getSchedule() { return schedule; } - /** {@inheritDoc} */ @Override public boolean isInterruptible() { return interruptible; } - /** {@inheritDoc} */ @Override public PipelineJobOutcome call() throws Exception { Object result = jobResultProducer.call(); diff --git a/apps/bfd-pipeline/bfd-pipeline-app/src/test/resources/logback-test.xml b/apps/bfd-pipeline/bfd-pipeline-app/src/test/resources/logback-test.xml index 5e8d8db377..1e00875aef 100644 --- a/apps/bfd-pipeline/bfd-pipeline-app/src/test/resources/logback-test.xml +++ b/apps/bfd-pipeline/bfd-pipeline-app/src/test/resources/logback-test.xml @@ -11,9 +11,8 @@ - - + diff --git a/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/CcwRifLoadJob.java b/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/CcwRifLoadJob.java index 1a58d3d848..d96ddee29f 100644 --- a/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/CcwRifLoadJob.java +++ b/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/CcwRifLoadJob.java @@ -11,7 +11,6 @@ import gov.cms.bfd.pipeline.ccw.rif.extract.s3.S3RifFile; import gov.cms.bfd.pipeline.ccw.rif.extract.s3.task.DataSetMoveTask; import gov.cms.bfd.pipeline.ccw.rif.extract.s3.task.S3TaskManager; -import gov.cms.bfd.pipeline.sharedutils.NullPipelineJobArguments; import gov.cms.bfd.pipeline.sharedutils.PipelineApplicationState; import gov.cms.bfd.pipeline.sharedutils.PipelineJob; import gov.cms.bfd.pipeline.sharedutils.PipelineJobOutcome; @@ -67,7 +66,7 @@ * created. Within each of those directories will be manifest files and the RIF files that they * reference. */ -public final class CcwRifLoadJob implements PipelineJob { +public final class CcwRifLoadJob implements PipelineJob { private static final Logger LOGGER = LoggerFactory.getLogger(CcwRifLoadJob.class); /** Shortcut for calculating GIGA (filesize). */ diff --git a/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/load/RifLoader.java b/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/load/RifLoader.java index 80165f0d4f..cefd94eda2 100644 --- a/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/load/RifLoader.java +++ b/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/load/RifLoader.java @@ -50,8 +50,8 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -102,12 +102,6 @@ public final class RifLoader { /** The maximum amount of time in hours we will wait for a job to complete loading its batches. */ private final int MAX_BATCH_WAIT_TIME_HOURS = 72; - /** - * Keeps track of a fatal error during loading in one of the batch threads so we know to fail the - * job (and kill the pipeline). - */ - private static volatile AtomicBoolean fatalFailure; - /** * Constructs a new {@link RifLoader} instance. * @@ -119,7 +113,6 @@ public RifLoader(LoadAppOptions options, PipelineApplicationState appState) { this.appState = appState; idHasher = new IdHasher(options.getIdHasherConfig()); - fatalFailure = new AtomicBoolean(); } /** @@ -187,7 +180,6 @@ public void process( Consumer errorHandler, Consumer resultHandler) { - fatalFailure.set(false); BlockingThreadPoolExecutor loadExecutor = createLoadExecutor(options); MetricRegistry fileEventMetrics = dataToLoad.getSourceEvent().getEventMetrics(); @@ -248,6 +240,8 @@ public Integer getValue() { * always run in a consistent manner. */ + final var error = new AtomicReference(); + // Define the Consumer that will handle each batch. Consumer>> batchProcessor = recordsBatch -> { @@ -257,18 +251,30 @@ public Integer getValue() { * constructor), this will block if too many tasks are already * pending. That's desirable behavior, as it prevents * OutOfMemoryErrors. + * + * The error handler will store the first exception encountered in + * the AtomicReference and discard any others. We only need one to + * terminate the pipeline. */ - processAsync(loadExecutor, recordsBatch, loadedFileId, resultHandler); + processAsync( + loadExecutor, + recordsBatch, + loadedFileId, + resultHandler, + e -> error.compareAndSet(null, e)); }; // Collect records into batches and submit each to batchProcessor. + // Any exception will trigger a clean shutdown of the stream. try { if (options.getRecordBatchSize() > 1) { BatchSpliterator.batches(dataToLoad.getRecords(), options.getRecordBatchSize()) + .takeWhile(record -> error.get() == null) // stop if an exception is thrown .forEach(batchProcessor); } else { dataToLoad .getRecords() + .takeWhile(record -> error.get() == null) // stop if an exception is thrown .map( record -> { List> ittyBittyBatch = new LinkedList<>(); @@ -278,9 +284,11 @@ record -> { .forEach(batchProcessor); } } catch (Exception e) { - LOGGER.error("Encountered an issue while parsing file batches (RifLoader), load failed."); + LOGGER.error( + "Encountered an issue while parsing file batches (RifLoader), load failed. {}", + e.getMessage()); timerDataSetFile.stop(); - throw e; + error.compareAndSet(null, e); } // Wait for all submitted batches to complete. @@ -294,23 +302,18 @@ record -> { "%s failed to complete processing the records in time: '%s'.", this.getClass().getSimpleName(), dataToLoad)); } catch (InterruptedException e) { - // Interrupts should not be used on this thread, so go boom. - throw new RuntimeException(e); + LOGGER.error("Encountered an unexpected InterruptedException, load failed."); + error.compareAndSet(null, e); } /* - * If any batch load tripped a fatal error, we should throw an exception - * on this thread to kill the pipeline. Exceptions thrown from the batch thread with the - * error are ignored/lost, so have this synchronized boolean signal to do this specifically on the main thread. - * We also can't use a passed-in error handler from up the execution chain or else the pipeline manager shutdown procedure - * waits for this thread to shut down, which is waiting on the above 72-hour timeout before it unblocks. - * - * FUTURE: If we wish to be more discerning about recoverable errors, we could return false here which will - * allow the pipeline to continue operating if an error is considered non-fatal (it also currently moves the failed files - * to "Done" instead of "Failed", which would need to be fixed.) + * If any batch load tripped a fatal error, we call the error handler. + * The stream will have already cleanly halted processing. */ - if (fatalFailure.get()) { - throw new IllegalStateException("Fatal error during rif file processing."); + final Exception ex = error.get(); + if (ex != null) { + LOGGER.error("terminated by exception: message={}", ex.getMessage(), ex); + errorHandler.accept(ex); } LOGGER.info("Processed '{}'.", dataToLoad); @@ -326,12 +329,14 @@ record -> { * @param recordsBatch the {@link RifRecordEvent}s to process * @param loadedFileId the loaded file id * @param resultHandler the {@link Consumer} to notify when the batch completes successfully + * @param errorHandler used to pass through exceptions encountered during processing */ private void processAsync( BlockingThreadPoolExecutor loadExecutor, List> recordsBatch, long loadedFileId, - Consumer resultHandler) { + Consumer resultHandler, + Consumer errorHandler) { loadExecutor.submit( () -> { RifFileEvent fileEvent = recordsBatch.get(0).getFileEvent(); @@ -382,8 +387,7 @@ private void processAsync( .mark(1); LOGGER.error("Error caught when processing async batch!", failure); - // Cannot use errorHandler here, as it will cause a long wait before shutdown - fatalFailure.set(true); + errorHandler.accept(failure); } }); } diff --git a/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/test/java/gov/cms/bfd/pipeline/ccw/rif/load/RifLoaderIT.java b/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/test/java/gov/cms/bfd/pipeline/ccw/rif/load/RifLoaderIT.java index 59bf277d8f..0c9319b763 100644 --- a/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/test/java/gov/cms/bfd/pipeline/ccw/rif/load/RifLoaderIT.java +++ b/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/test/java/gov/cms/bfd/pipeline/ccw/rif/load/RifLoaderIT.java @@ -68,6 +68,7 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; +import org.opentest4j.AssertionFailedError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -247,9 +248,9 @@ public void failOnUpdateBeneficiaryBeforeInsert() { Stream editedSample = editSamples(samplesStream, fileEditor); // Load the edited sample to verify that it fails, as expected. - IllegalStateException thrown = + AssertionFailedError thrown = assertThrows( - IllegalStateException.class, + AssertionFailedError.class, () -> { loadSample( "SAMPLE_A, bene only, UPDATE", @@ -257,7 +258,7 @@ public void failOnUpdateBeneficiaryBeforeInsert() { editedSample); }); - assertTrue(thrown.getMessage().contains("Fatal error during rif file processing")); + assertEquals("Load errors encountered. ==> expected: <0> but was: <1>", thrown.getMessage()); } /** Ensures that loading a single file results in a loaded file in the loaded batches. */ diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc-apps/src/main/java/gov/cms/bfd/pipeline/rda/grpc/apps/DirectRdaLoadApp.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc-apps/src/main/java/gov/cms/bfd/pipeline/rda/grpc/apps/DirectRdaLoadApp.java index 8bb2044f8b..0ad5d5a5fe 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc-apps/src/main/java/gov/cms/bfd/pipeline/rda/grpc/apps/DirectRdaLoadApp.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc-apps/src/main/java/gov/cms/bfd/pipeline/rda/grpc/apps/DirectRdaLoadApp.java @@ -75,7 +75,7 @@ public static void main(String[] args) throws Exception { pooledDataSource, PipelineApplicationState.RDA_PERSISTENCE_UNIT_NAME, Clock.systemUTC())) { - final Optional> job = createPipelineJob(jobConfig, appState, claimType); + final Optional job = createPipelineJob(jobConfig, appState, claimType); if (!job.isPresent()) { System.err.printf("error: invalid claim type: '%s' expected 'fiss' or 'mcs'%n", claimType); System.exit(1); @@ -96,7 +96,7 @@ public static void main(String[] args) throws Exception { * @param claimType whether to use fiss or mcs claims * @return the pipeline job for mcs or fiss */ - private static Optional> createPipelineJob( + private static Optional createPipelineJob( RdaLoadOptions jobConfig, PipelineApplicationState appState, String claimType) { final var mbiCache = jobConfig.createComputedMbiCache(appState); switch (claimType.toLowerCase()) { diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc-apps/src/main/java/gov/cms/bfd/pipeline/rda/grpc/apps/LoadRdaJsonApp.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc-apps/src/main/java/gov/cms/bfd/pipeline/rda/grpc/apps/LoadRdaJsonApp.java index cf8dab45c5..066c6babf1 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc-apps/src/main/java/gov/cms/bfd/pipeline/rda/grpc/apps/LoadRdaJsonApp.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc-apps/src/main/java/gov/cms/bfd/pipeline/rda/grpc/apps/LoadRdaJsonApp.java @@ -104,8 +104,8 @@ public static void main(String[] args) throws Exception { pooledDataSource, PipelineApplicationState.RDA_PERSISTENCE_UNIT_NAME, Clock.systemUTC())) { - final List> jobs = config.createPipelineJobs(jobConfig, appState); - for (PipelineJob job : jobs) { + final List jobs = config.createPipelineJobs(jobConfig, appState); + for (PipelineJob job : jobs) { LOGGER.info("starting job {}", job.getClass().getSimpleName()); job.call(); } @@ -283,7 +283,7 @@ private RdaMessageSourceFactory.Config createMessageSourceFactoryConfig() { * @param appState the pipeline application state * @return the pipeline jobs to execute */ - private List> createPipelineJobs( + private List createPipelineJobs( RdaLoadOptions jobConfig, PipelineApplicationState appState) { final var mbiCache = jobConfig.createComputedMbiCache(appState); return List.of( diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/AbstractRdaLoadJob.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/AbstractRdaLoadJob.java index 35f26e5434..7f14dea6a1 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/AbstractRdaLoadJob.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/AbstractRdaLoadJob.java @@ -6,7 +6,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import gov.cms.bfd.pipeline.rda.grpc.source.RdaVersion; -import gov.cms.bfd.pipeline.sharedutils.NullPipelineJobArguments; import gov.cms.bfd.pipeline.sharedutils.PipelineJob; import gov.cms.bfd.pipeline.sharedutils.PipelineJobOutcome; import gov.cms.bfd.pipeline.sharedutils.PipelineJobSchedule; @@ -36,8 +35,7 @@ * will do any work. The other threads will all immediately return with an indication that they have * no work to do. */ -public abstract class AbstractRdaLoadJob - implements PipelineJob { +public abstract class AbstractRdaLoadJob implements PipelineJob { /** * Denotes the preferred execution of a sink. diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/RdaServerJob.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/RdaServerJob.java index 989744386d..ebcb7dc23b 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/RdaServerJob.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/main/java/gov/cms/bfd/pipeline/rda/grpc/RdaServerJob.java @@ -6,7 +6,6 @@ import gov.cms.bfd.pipeline.rda.grpc.server.RandomClaimGeneratorConfig; import gov.cms.bfd.pipeline.rda.grpc.server.RdaMessageSourceFactory; import gov.cms.bfd.pipeline.rda.grpc.server.RdaServer; -import gov.cms.bfd.pipeline.sharedutils.NullPipelineJobArguments; import gov.cms.bfd.pipeline.sharedutils.PipelineJob; import gov.cms.bfd.pipeline.sharedutils.PipelineJobOutcome; import gov.cms.bfd.pipeline.sharedutils.PipelineJobSchedule; @@ -27,7 +26,7 @@ * gRPC uses its own thread pool the job thread simply sleeps and waits for an interrupt to be * received. When the interrupt is received the server is stopped. */ -public class RdaServerJob implements PipelineJob { +public class RdaServerJob implements PipelineJob { private static final Logger LOGGER = LoggerFactory.getLogger(RdaServerJob.class); /** The server configuration. */ diff --git a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/RdaLoadJobIT.java b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/RdaLoadJobIT.java index cf47e84f31..9f4010c70b 100644 --- a/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/RdaLoadJobIT.java +++ b/apps/bfd-pipeline/bfd-pipeline-rda-grpc/src/test/java/gov/cms/bfd/pipeline/rda/grpc/RdaLoadJobIT.java @@ -123,7 +123,7 @@ public void fissClaimsTest() throws Exception { port -> { final RdaLoadOptions config = createRdaLoadOptions(port); final var mbiCache = config.createComputedMbiCache(appState); - final PipelineJob job = config.createFissClaimsLoadJob(appState, mbiCache); + final PipelineJob job = config.createFissClaimsLoadJob(appState, mbiCache); job.call(); }); final ImmutableList expectedClaims = @@ -235,7 +235,7 @@ public void mcsClaimsTest() throws Exception { () -> { final var config = createRdaLoadOptions(-1); final var mbiCache = config.createComputedMbiCache(appState); - final PipelineJob job = config.createMcsClaimsLoadJob(appState, mbiCache); + final PipelineJob job = config.createMcsClaimsLoadJob(appState, mbiCache); job.call(); }); final ImmutableList expectedClaims = diff --git a/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/NullPipelineJobArguments.java b/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/NullPipelineJobArguments.java deleted file mode 100644 index 5cc29ab7a9..0000000000 --- a/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/NullPipelineJobArguments.java +++ /dev/null @@ -1,9 +0,0 @@ -package gov.cms.bfd.pipeline.sharedutils; - -/** - * A "null" {@link PipelineJobArguments} implementation for those {@link PipelineJob}s that do not - * need any arguments. - */ -public final class NullPipelineJobArguments implements PipelineJobArguments { - // Nothing here: this is for PipelineJobs that don't need any arguments. -} diff --git a/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/PipelineJob.java b/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/PipelineJob.java index 9374940b19..72c6ee5e90 100644 --- a/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/PipelineJob.java +++ b/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/PipelineJob.java @@ -9,20 +9,16 @@ *

{@link PipelineJob} implementations that are meant to be triggered by other jobs * SHALL also provide a {@code JOB_TYPE} constant for other jobs to reference, * which must return the same value as {@link #getType()}. - * - * @param the {@link PipelineJobArguments} type associated with this {@link PipelineJob} - * implementation (see {@link NullPipelineJobArguments} for those {@link PipelineJob} - * implementations which do not need arguments) */ -public interface PipelineJob extends Callable { +public interface PipelineJob extends Callable { /** * Gets the {@link PipelineJobType} that uniquely identifies this {@link PipelineJob} * implementation. * * @return the pipeline job type */ - default PipelineJobType getType() { - return new PipelineJobType<>(this); + default PipelineJobType getType() { + return new PipelineJobType(this); } /** diff --git a/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/PipelineJobArguments.java b/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/PipelineJobArguments.java deleted file mode 100644 index e7eb0eedf2..0000000000 --- a/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/PipelineJobArguments.java +++ /dev/null @@ -1,47 +0,0 @@ -package gov.cms.bfd.pipeline.sharedutils; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.UncheckedIOException; - -/** - * Implementations of this interface represent the arguments that can be passed to non-scheduled - * executions of a {@link PipelineJob} implementation. - */ -public interface PipelineJobArguments { - /** The object mapper for serialization. */ - static ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - /** - * Serializes the {@link PipelineJobArguments} to a string. - * - * @return a full-fidelity, round-trippable representation of these {@link PipelineJobArguments}, - * serialized as a {@link String} - */ - default String serialize() { - try { - return OBJECT_MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new UncheckedIOException(e); - } - } - - /** - * Deserialize the {@link PipelineJobArguments} from a string into the specified class. - * - * @param the {@link Class} of this {@link PipelineJobArguments} implementation - * @param serializedArguments a serialized representation of an instance of this {@link - * PipelineJobArguments} implementation, as produced by {@link #serialize()} - * @param argumentsClass the {@link Class} of this {@link PipelineJobArguments} implementation - * @return the deserialized {@link PipelineJobArguments} represented by the specified {@link - * String} - */ - default PipelineJobArguments deserialize( - String serializedArguments, Class argumentsClass) { - try { - return OBJECT_MAPPER.readValue(serializedArguments, argumentsClass); - } catch (JsonProcessingException e) { - throw new UncheckedIOException(e); - } - } -} diff --git a/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/PipelineJobRecordId.java b/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/PipelineJobRecordId.java deleted file mode 100644 index 6f3d4c5cf5..0000000000 --- a/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/PipelineJobRecordId.java +++ /dev/null @@ -1,42 +0,0 @@ -package gov.cms.bfd.pipeline.sharedutils; - -import java.util.concurrent.atomic.AtomicInteger; - -/** Models the unique identifier for a {@link PipelineJob} that has been submitted for execution. */ -public final class PipelineJobRecordId { - /** Used to generate unique values for {@link #id}. */ - private static final AtomicInteger ID_SEQUENCE = new AtomicInteger(0); - /** The unique record id. */ - private final long id; - - /** Constructs a new unique {@link PipelineJobRecordId}. */ - public PipelineJobRecordId() { - this.id = ID_SEQUENCE.getAndIncrement(); - } - - /** {@inheritDoc} */ - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + (int) (id ^ (id >>> 32)); - return result; - } - - /** {@inheritDoc} */ - @Override - public boolean equals(Object obj) { - if (this == obj) return true; - if (obj == null) return false; - if (getClass() != obj.getClass()) return false; - PipelineJobRecordId other = (PipelineJobRecordId) obj; - if (id != other.id) return false; - return true; - } - - /** {@inheritDoc} */ - @Override - public String toString() { - return "" + id; - } -} diff --git a/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/PipelineJobType.java b/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/PipelineJobType.java index eda15e5bb7..1ad713b54f 100644 --- a/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/PipelineJobType.java +++ b/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/PipelineJobType.java @@ -3,12 +3,8 @@ /** * Uniquely identifies a type of {@link PipelineJob}, generally corresponding to the {@link * PipelineJob} implementation {@link Class}. - * - * @param the {@link PipelineJobArguments} type associated with the {@link PipelineJob} - * implementation (see {@link NullPipelineJobArguments} for those {@link PipelineJob} - * implementations which do not need arguments) */ -public final class PipelineJobType { +public final class PipelineJobType { /** The type id. */ private final String typeId; @@ -18,8 +14,8 @@ public final class PipelineJobType { * @param job the {@link PipelineJob} to build a {@link PipelineJobType} for */ @SuppressWarnings("unchecked") - public PipelineJobType(PipelineJob job) { - this((Class>) job.getClass()); + public PipelineJobType(PipelineJob job) { + this((Class) job.getClass()); } /** @@ -30,7 +26,7 @@ public PipelineJobType(PipelineJob job) { * @param jobClass the {@link PipelineJob} implementation {@link Class} build a {@link * PipelineJobType} for */ - public > PipelineJobType(Class jobClass) { + public PipelineJobType(Class jobClass) { this.typeId = jobClass.getTypeName(); } diff --git a/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/jobs/store/PipelineJobFailure.java b/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/jobs/store/PipelineJobFailure.java deleted file mode 100644 index d0aa37bc0c..0000000000 --- a/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/jobs/store/PipelineJobFailure.java +++ /dev/null @@ -1,51 +0,0 @@ -package gov.cms.bfd.pipeline.sharedutils.jobs.store; - -import gov.cms.bfd.pipeline.sharedutils.PipelineJob; - -/** Represents the outcome of a failed {@link PipelineJob} execution. */ -public final class PipelineJobFailure { - /** Holds the exception that caused the failure. */ - private final Throwable exception; - - /** - * Constructs a new {@link PipelineJobFailure} instance. - * - * @param exception the {@link Throwable} that the {@link PipelineJob} execution produced, which - * will be used for {@link #getType()} and {@link #getMessage()} - */ - public PipelineJobFailure(Throwable exception) { - this.exception = exception; - } - - /** - * Gets the {@link #exception} class. - * - * @return the {@link Throwable#getClass()} of the exception that the {@link PipelineJob} - * execution produced - */ - public Class getType() { - return exception.getClass(); - } - - /** - * Gets the {@link #exception} message. - * - * @return the {@link Throwable#getMessage()} of the exception that the {@link PipelineJob} - * execution produced - */ - public String getMessage() { - return exception.getMessage(); - } - - /** {@inheritDoc} */ - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("PipelineJobFailure [getType()="); - builder.append(getType()); - builder.append(", getMessage()="); - builder.append(getMessage()); - builder.append("]"); - return builder.toString(); - } -} diff --git a/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/jobs/store/PipelineJobRecord.java b/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/jobs/store/PipelineJobRecord.java deleted file mode 100644 index 6eaa60d3dc..0000000000 --- a/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/jobs/store/PipelineJobRecord.java +++ /dev/null @@ -1,362 +0,0 @@ -package gov.cms.bfd.pipeline.sharedutils.jobs.store; - -import gov.cms.bfd.pipeline.sharedutils.PipelineJob; -import gov.cms.bfd.pipeline.sharedutils.PipelineJobArguments; -import gov.cms.bfd.pipeline.sharedutils.PipelineJobOutcome; -import gov.cms.bfd.pipeline.sharedutils.PipelineJobRecordId; -import gov.cms.bfd.pipeline.sharedutils.PipelineJobType; -import gov.cms.bfd.sharedutils.exceptions.BadCodeMonkeyException; -import java.time.Duration; -import java.time.Instant; -import java.util.Optional; - -/** - * Models the status of a {@link PipelineJob} that has been submitted for execution. This is - * basically a state machine data store, that models the various states and transitions that a job - * can proceed through. Jobs can be in the following states: - * - *

- * - *

Design Note: I considered coding this as an actual state machine, but all of the - * libraries and patterns available for doing so in Java looked like they'd add more complexity than - * they removed. Also: if we ever decided to scale job execution across multiple nodes, this - * data/class would need to become a JPA entity, and none of the state machine patterns seemed - * suited to that. For example, Spring - * Statemachine will persist all of the state machine's data as a combined serialized blob, - * which is not what we need. - */ -public final class PipelineJobRecord { - /** The pipeline job id. */ - private final PipelineJobRecordId id; - /** The pipeline job type. */ - private final PipelineJobType jobType; - /** The pipeline job arguments. */ - private final A jobArguments; - /** The pipeline job creation time. */ - private final Instant createdTime; - /** The pipeline job canceled time. */ - private Optional canceledTime; - - /** - * "Why is this variable {@code volatile}," you might ask? Good question! The short answer is the - * unhelpful, "because things can crash unless it is." The better answer is complicated: - * When the VolunteerJob is bootstrapped, it's enqueued by PipelineManager's constructor, and ends - * up setting this field -- on the application's main thread. Then, VolunteerJob starts running, - * and goes looking for non-enqueued jobs. Unless this field is volatile, it will sometimes "find - * itself" (lol), thinking that it's not enqueued, because it's reading a stale value of it. That - * causes things to go boom. So: we mark this as volatile, and everything's copacetic. Isn't - * concurrency fun?! - * - *

There's also be an issue with the other jobs, where they get enqueued by the VolunteerJob on - * its thread and then, when they try to fire their start event on their thread, notice that - * they're not enqueued and freak out about it. Marking this as {@code volatile} fixes that, too. - * - *

"Okay, so why aren't the other fields {@code volatile}, too," you might then ask? Also a - * good question! It's because the start and completed events always run on the job's worker - * thread and those events aren't queried on any other threads for anything important. (Worth - * noting: the {@code final} are safe, just due to how they're treated in the Java Memory Model.) - * - *

"You left out cancel, though," you might follow up with. Good catch! But mostly, it just - * doesn't really matter, even though it is set by a different thread, as it's not relied on for - * anything important. - * - *

If any of these other fields end up landing in application logic, we may - * need to mark them volatile -- depending on exactly how they're used and accessed. - */ - private volatile Optional enqueuedTime; - - /** The pipeline job start time. */ - private Optional startedTime; - /** The pipeline job completion time. */ - private Optional completedTime; - /** The pipeline job outcome (if success). */ - private Optional outcome; - /** The pipeline job outcome (if failure). */ - private Optional failure; - - /** - * Constructs a new {@link PipelineJobRecord} instance. - * - * @param jobType the value to use for {@link #getJobType()} - * @param jobArguments the value to use for {@link #getJobArguments()} - */ - public PipelineJobRecord(PipelineJobType jobType, A jobArguments) { - this.id = new PipelineJobRecordId(); - this.jobType = jobType; - this.jobArguments = jobArguments; - this.createdTime = Instant.now(); - - this.canceledTime = Optional.empty(); - this.enqueuedTime = Optional.empty(); - this.startedTime = Optional.empty(); - this.completedTime = Optional.empty(); - this.outcome = Optional.empty(); - this.failure = Optional.empty(); - } - - /** - * Gets the {@link #id}. - * - * @return the {@link PipelineJobRecordId} that uniquely identifies this {@link PipelineJobRecord} - */ - public PipelineJobRecordId getId() { - return id; - } - - /** - * Gets the {@link #jobType}. - * - * @return the {@link PipelineJobType} that this {@link PipelineJobRecord} is for - */ - public PipelineJobType getJobType() { - return jobType; - } - - /** - * Gets the {@link #jobArguments}. - * - * @return the {@link PipelineJobArguments} that the job should be run with, if any ( - * null if there are none) - */ - public A getJobArguments() { - return jobArguments; - } - - /** - * Gets the {@link #createdTime}. - * - * @return the {@link Instant} that this {@link PipelineJobRecord} was created at - */ - public Instant getCreatedTime() { - return createdTime; - } - - /** - * Gets the {@link #canceledTime}. - * - * @return the {@link Instant} that this job was canceled at, if any - */ - public Optional getCanceledTime() { - return canceledTime; - } - - /** - * Determines if the job has been cancelled. - * - * @return true if the job has been canceled, false if it has not - */ - public boolean isCanceled() { - return canceledTime.isPresent(); - } - - /** - * Sets the {@link #canceledTime}. - * - * @param canceledTime the value to set {@link #getCanceledTime()} to - */ - public void setCanceledTime(Instant canceledTime) { - if (this.canceledTime.isPresent()) throw new BadCodeMonkeyException(); - if (this.completedTime.isPresent()) throw new BadCodeMonkeyException(); - - this.canceledTime = Optional.of(canceledTime); - } - - /** - * Gets the {@link #enqueuedTime}. - * - * @return the {@link Instant} that this job was enqueued to an executor for execution, if any - */ - public Optional getEnqueuedTime() { - return enqueuedTime; - } - - /** - * Sets the {@link #enqueuedTime}. - * - * @param enqueuedTime the value to set {@link #getEnqueuedTime()} to - */ - public void setEnqueuedTime(Instant enqueuedTime) { - // Validate the state transition. - if (this.enqueuedTime.isPresent()) throw new BadCodeMonkeyException(); - if (this.canceledTime.isPresent()) throw new BadCodeMonkeyException(); - if (this.startedTime.isPresent()) throw new BadCodeMonkeyException(); - if (this.completedTime.isPresent()) throw new BadCodeMonkeyException(); - - this.enqueuedTime = Optional.of(enqueuedTime); - } - - /** - * Determines if this job has been enqueued. - * - * @return true if this job has been enqueued, false if it has not - */ - public boolean isEnqueued() { - return enqueuedTime.isPresent(); - } - - /** - * Gets the {@link #startedTime}. - * - * @return the {@link Instant} that this job started running at, if any - */ - public Optional getStartedTime() { - return startedTime; - } - - /** - * Determines if the job has been started. - * - * @return true if this job has started running, false if it has not - */ - public boolean isStarted() { - return startedTime.isPresent(); - } - - /** - * Sets the {@link #startedTime}. - * - * @param startedTime the value to set {@link #getStartedTime()} to - */ - public void setStartedTime(Instant startedTime) { - if (!this.enqueuedTime.isPresent()) throw new BadCodeMonkeyException(); - if (this.canceledTime.isPresent()) throw new BadCodeMonkeyException(); - if (this.startedTime.isPresent()) throw new BadCodeMonkeyException(); - if (this.completedTime.isPresent()) throw new BadCodeMonkeyException(); - - this.startedTime = Optional.of(startedTime); - } - - /** - * Gets the {@link #completedTime}. - * - * @return the {@link Instant} that this job completed at, if any - */ - public Optional getCompletedTime() { - return completedTime; - } - - /** - * Determines if the job is completed (success or failure). - * - * @return true if the job has completed (either successfully or with a failure), - * false if it has not - */ - public boolean isCompleted() { - return completedTime.isPresent(); - } - - /** - * Gets the duration of the job if complete. - * - * @return a {@link Duration} representing how long the job ran for, or {@link Optional#empty()} - * if it hasn't started or completed - */ - public Optional getDuration() { - if (isCompleted()) { - return Optional.of(Duration.between(startedTime.get(), completedTime.get())); - } else if (isCanceled()) { - return Optional.of(Duration.between(startedTime.get(), canceledTime.get())); - } else { - return Optional.empty(); - } - } - - /** - * Gets the {@link #outcome}. - * - * @return the {@link PipelineJobOutcome} for this job if it has completed successfully, or {@link - * Optional#empty()} if it is either as-yet-incomplete or if it failed (in which case, {@link - * #getFailure()} will have a value) - */ - public Optional getOutcome() { - return outcome; - } - - /** - * Gets the {@link #failure}. - * - * @return the {@link PipelineJobFailure} for this job if it has completed with a failure, or - * {@link Optional#empty()} if it is either as-yet-incomplete or if it succeeded (in which - * case, {@link #getOutcome()} will have a value) - */ - public Optional getFailure() { - return failure; - } - - /** - * Determines if the job completed successfully. - * - * @return true if {@link #getOutcome()} is present, false if it's not - */ - public boolean isCompletedSuccessfully() { - return outcome.isPresent(); - } - - /** - * Marks the {@link PipelineJob} as having completed successfully. - * - * @param completedTime the value to use for {@link #getCompletedTime()} - * @param outcome the value to use for {@link #getOutcome()} - */ - public void setCompleted(Instant completedTime, PipelineJobOutcome outcome) { - if (!this.enqueuedTime.isPresent()) throw new BadCodeMonkeyException(); - if (this.canceledTime.isPresent()) throw new BadCodeMonkeyException(); - if (!this.startedTime.isPresent()) throw new BadCodeMonkeyException(); - if (this.completedTime.isPresent()) throw new BadCodeMonkeyException(); - - this.completedTime = Optional.of(completedTime); - this.outcome = Optional.of(outcome); - } - - /** - * Marks the {@link PipelineJob} as having completed with an exception. - * - * @param completedTime the value to use for {@link #getCompletedTime()} - * @param failure the value to use for {@link #getFailure()} - */ - public void setCompleted(Instant completedTime, PipelineJobFailure failure) { - if (!this.enqueuedTime.isPresent()) throw new BadCodeMonkeyException(); - if (this.canceledTime.isPresent()) throw new BadCodeMonkeyException(); - if (!this.startedTime.isPresent()) throw new BadCodeMonkeyException(); - if (this.completedTime.isPresent()) throw new BadCodeMonkeyException(); - - this.completedTime = Optional.of(completedTime); - this.failure = Optional.of(failure); - } - - /** {@inheritDoc} */ - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("PipelineJobRecord [id="); - builder.append(id); - builder.append(", jobType="); - builder.append(jobType); - builder.append(", jobArguments="); - builder.append(jobArguments); - builder.append(", createdTime="); - builder.append(createdTime); - builder.append(", canceledTime="); - builder.append(canceledTime); - builder.append(", enqueuedTime="); - builder.append(enqueuedTime); - builder.append(", startedTime="); - builder.append(startedTime); - builder.append(", completedTime="); - builder.append(completedTime); - builder.append(", outcome="); - builder.append(outcome); - builder.append(", failure="); - builder.append(failure); - builder.append("]"); - return builder.toString(); - } -} diff --git a/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/jobs/store/PipelineJobRecordStore.java b/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/jobs/store/PipelineJobRecordStore.java deleted file mode 100644 index 45e220cfb3..0000000000 --- a/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/jobs/store/PipelineJobRecordStore.java +++ /dev/null @@ -1,311 +0,0 @@ -package gov.cms.bfd.pipeline.sharedutils.jobs.store; - -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; -import gov.cms.bfd.pipeline.sharedutils.PipelineJob; -import gov.cms.bfd.pipeline.sharedutils.PipelineJobArguments; -import gov.cms.bfd.pipeline.sharedutils.PipelineJobOutcome; -import gov.cms.bfd.pipeline.sharedutils.PipelineJobRecordId; -import gov.cms.bfd.pipeline.sharedutils.PipelineJobType; -import java.time.Duration; -import java.time.Instant; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Stores and manages the {@link PipelineJobRecord}s representing the history of submitted {@link - * PipelineJob}s. - */ -public final class PipelineJobRecordStore { - /** - * The {@link Logger} message that will be recorded if/when a {@link PipelineJob} completes - * successfully. - */ - public static final String LOG_MESSAGE_PREFIX_JOB_COMPLETED = "Job completed"; - - /** The {@link Logger} message that will be recorded if/when a {@link PipelineJob} fails. */ - public static final String LOG_MESSAGE_PREFIX_JOB_FAILED = "Job failed"; - - private static final Logger LOGGER = LoggerFactory.getLogger(PipelineJobRecordStore.class); - - /** The number of milliseconds to wait between polling job dependencies' status. */ - private static final int JOB_DEPENDENCY_POLL_MILLIS = 100; - - /** The metrics registry. */ - private final MetricRegistry appMetrics; - - /** - * The "database" for all {@link PipelineJobRecord}s in the application. - * - *

This will be read and modified from almost every thread in the application, and from the - * perspective of any one of them, is only eventually consistent. This tends to work out okay for - * our use cases, as you're not likely to have two separate jobs racing to create the - * same new job, but care should still be taken when working with it. - */ - private final ConcurrentMap> jobRecords; - - /** - * Constructs a new {@link PipelineJobRecordStore} instance. - * - * @param appMetrics the {@link MetricRegistry} for the overall application - */ - public PipelineJobRecordStore(MetricRegistry appMetrics) { - this.appMetrics = appMetrics; - this.jobRecords = new ConcurrentHashMap<>(); - } - - /** - * Note: this should only be used in test code, as it's performance characteristics are terrible - * for production use. - * - * @return the full {@link Collection} of {@link PipelineJobRecord}s that have been submitted - */ - public Collection> getJobRecords() { - return Collections.unmodifiableCollection(jobRecords.values()); - } - - /** - * Gets a collection of pending jobs. - * - * @param maxJobRecords the maximum number of matching {@link PipelineJobRecord}s to return - * @return up to the specified number of {@link PipelineJobRecord}s where {@link - * PipelineJobRecord#isStarted()} is false - */ - public Set> findPendingJobs(int maxJobRecords) { - try (Timer.Context timer = - appMetrics - .timer(MetricRegistry.name(getClass().getSimpleName(), "findPendingJobs")) - .time()) { - /* - * Design note: We don't garbage collect completed jobs, so this will get slower as job count - * grows. However, in tests to simulate this behavior, it looks like it only gets up to the - * tens of milliseconds after a week of operation, worst-case, with our current number of - * jobs. That's acceptable, since only VolunteerJob calls this. - */ - return jobRecords.values().stream() - .filter(j -> !j.isEnqueued()) - .sorted(Comparator.comparing(PipelineJobRecord::getCreatedTime)) - .limit(maxJobRecords) - .collect(Collectors.toSet()); - } - } - - /** - * Find most recent optional. - * - * @param the type parameter - * @param type the {@link PipelineJobRecord#getJobType()} to match against - * @return the {@link PipelineJobRecord} that matches the criteria with the most recent {@link - * PipelineJobRecord#getCreatedTime()} value - */ - @SuppressWarnings({"unchecked", "rawtypes"}) - public Optional> findMostRecent( - PipelineJobType type) { - try (Timer.Context timer = - appMetrics - .timer(MetricRegistry.name(getClass().getSimpleName(), "findMostRecent")) - .time()) { - if (type == null) throw new IllegalArgumentException(); - - /* - * Design note: We don't garbage collect completed jobs, so this will get slower as job count - * grows. However, in tests to simulate this behavior, it looks like it only gets up to the - * tens of milliseconds after a week of operation, worst-case, with our current number of - * jobs. That's acceptable, since only SchedulerJob calls this. - */ - Optional mostRecentRecord = - jobRecords.values().stream() - .filter(j -> type.equals(j.getJobType())) - .max(Comparator.comparing(PipelineJobRecord::getCreatedTime)); - return mostRecentRecord; - } - } - - /** - * Submits the specified {@link PipelineJob} / {@link PipelineJobType} for execution, with the - * specified {@link PipelineJobArguments}. - * - * @param the type parameter - * @param jobType the {@link PipelineJob#getType()} of the {@link PipelineJob} to run - * @param jobArguments the {@link PipelineJobArguments} to run the specified {@link PipelineJob} - * with - * @return a new {@link PipelineJobRecord} that tracks the status of the requested job run - */ - public PipelineJobRecord submitPendingJob( - PipelineJobType jobType, A jobArguments) { - PipelineJobRecord jobRecord = new PipelineJobRecord(jobType, jobArguments); - this.jobRecords.put(jobRecord.getId(), jobRecord); - LOGGER.trace( - "submitPendingJob(...) called: jobType='{}', jobArguments='{}', jobRecord='{}'", - jobType, - jobArguments, - jobRecord); - - return jobRecord; - } - - /** - * Records {@link Instant#now()} as the {@link PipelineJobRecord#getEnqueuedTime()} value for the - * {@link PipelineJobRecord} with the specified {@link PipelineJobRecordId}. - * - * @param jobRecordId the {@link PipelineJobRecord#getId()} value of the {@link PipelineJobRecord} - * to update - */ - public void recordJobEnqueue(PipelineJobRecordId jobRecordId) { - PipelineJobRecord jobRecord = jobRecords.get(jobRecordId); - if (jobRecord == null) throw new IllegalStateException(); - - jobRecord.setEnqueuedTime(Instant.now()); - - // Record how long it took the job to go from being created to being enqueued. - appMetrics - .timer(MetricRegistry.name(PipelineJob.class.getSimpleName(), "createdToEnqueued")) - .update(Duration.between(jobRecord.getCreatedTime(), jobRecord.getEnqueuedTime().get())); - - LOGGER.trace("recordJobEnqueue(...) called: jobRecord='{}'", jobRecord); - } - - /** - * Records {@link Instant#now()} as the {@link PipelineJobRecord#getStartedTime()} value for the - * {@link PipelineJobRecord} with the specified {@link PipelineJobRecordId}. - * - * @param jobRecordId the {@link PipelineJobRecord#getId()} value of the {@link PipelineJobRecord} - * to update - */ - public void recordJobStart(PipelineJobRecordId jobRecordId) { - PipelineJobRecord jobRecord = jobRecords.get(jobRecordId); - if (jobRecord == null) throw new IllegalStateException(); - - jobRecord.setStartedTime(Instant.now()); - - // Record how long it took the job to go from being enqueued to being started. - appMetrics - .timer(MetricRegistry.name(PipelineJob.class.getSimpleName(), "enqueuedToStarted")) - .update( - Duration.between(jobRecord.getEnqueuedTime().get(), jobRecord.getStartedTime().get())); - - LOGGER.trace("recordJobStart(...) called: jobRecord='{}'", jobRecord); - } - - /** - * Records {@link Instant#now()} as the {@link PipelineJobRecord#getCanceledTime()} value for the - * {@link PipelineJobRecord} with the specified {@link PipelineJobRecordId}. - * - * @param jobRecordId the {@link PipelineJobRecord#getId()} value of the {@link PipelineJobRecord} - * to update - */ - public void recordJobCancellation(PipelineJobRecordId jobRecordId) { - PipelineJobRecord jobRecord = jobRecords.get(jobRecordId); - if (jobRecord == null) throw new IllegalStateException(); - - jobRecord.setCanceledTime(Instant.now()); - - // Record how long it took the job to go from being started to being canceled. - if (jobRecord.getStartedTime().isPresent()) { - appMetrics - .timer(MetricRegistry.name(PipelineJob.class.getSimpleName(), "startedToCanceled")) - .update( - Duration.between( - jobRecord.getStartedTime().get(), jobRecord.getCanceledTime().get())); - } - - LOGGER.trace("recordJobCancellation(...) called: jobRecord='{}'", jobRecord); - } - - /** - * Records {@link Instant#now()} as the {@link PipelineJobRecord#getCompletedTime()} value for the - * {@link PipelineJobRecord} with the specified {@link PipelineJobRecordId}, along with the other - * data provided. - * - * @param jobRecordId the {@link PipelineJobRecord#getId()} value of the {@link PipelineJobRecord} - * to update - * @param jobOutcome the {@link PipelineJobOutcome} that the job execution produced/returned - */ - public void recordJobCompletion(PipelineJobRecordId jobRecordId, PipelineJobOutcome jobOutcome) { - PipelineJobRecord jobRecord = jobRecords.get(jobRecordId); - if (jobRecord == null) throw new IllegalStateException(); - - jobRecord.setCompleted(Instant.now(), jobOutcome); - - // Record how long it took the job to go from being started to being completed (succeeded). - appMetrics - .timer( - MetricRegistry.name( - PipelineJob.class.getSimpleName(), "startedToCompleted", "succeeded")) - .update( - Duration.between(jobRecord.getStartedTime().get(), jobRecord.getCompletedTime().get())); - - // Log the completion at a high level, so that monitoring tools can pick it up. - LOGGER.info("{}: jobRecord='{}'", LOG_MESSAGE_PREFIX_JOB_COMPLETED, jobRecord); - - LOGGER.trace("recordJobCompletion(...) called: jobRecord='{}'", jobRecord); - } - - /** - * Records {@link Instant#now()} as the {@link PipelineJobRecord#getCompletedTime()} value for the - * {@link PipelineJobRecord} with the specified {@link PipelineJobRecordId}, along with the other - * data provided. - * - * @param jobRecordId the {@link PipelineJobRecord#getId()} value of the {@link PipelineJobRecord} - * to update - * @param jobFailure the {@link PipelineJobFailure} that the job execution produced/resulted in - */ - public void recordJobFailure(PipelineJobRecordId jobRecordId, PipelineJobFailure jobFailure) { - PipelineJobRecord jobRecord = jobRecords.get(jobRecordId); - if (jobRecord == null) throw new IllegalStateException(); - - jobRecord.setCompleted(Instant.now(), jobFailure); - - // Record how long it took the job to go from being started to being completed (failed). - appMetrics - .timer( - MetricRegistry.name(PipelineJob.class.getSimpleName(), "startedToCompleted", "failed")) - .update( - Duration.between(jobRecord.getStartedTime().get(), jobRecord.getCompletedTime().get())); - - // Log the failure at a high level, so that monitoring tools can pick it up. - LOGGER.warn("{}: jobRecord='{}'", LOG_MESSAGE_PREFIX_JOB_FAILED, jobRecord); - - /* - * It's redundant, but we'll leave the same TRACE logging here as everything else has, for - * consistency's sake. - */ - LOGGER.trace("recordJobFailure(...) called: jobRecord='{}'", jobRecord); - } - - /** - * Blocks until the specified jobs have completed their most recent execution. - * - * @param jobsToWaitOn the {@link PipelineJobRecord}s to wait for the completion of - * @throws InterruptedException Any {@link InterruptedException}s encountered will be bubbled up. - * @throws IllegalStateException An {@link IllegalStateException} will be thrown if no execution a - * specified job can be found. This likely indicates a logic error elsewhere in the - * application, where a job should have been triggered, but wasn't. - */ - public void waitForJobs(PipelineJobRecord... jobsToWaitOn) throws InterruptedException { - /* - * Design Note: I'm not particularly thrilled with this design for job dependency handling, as - * it has a couple drawbacks. First, the dependent job sits here occupying an executor slot even - * though it really can't do anything. Second, polling for dependency status is clunky; a - * notification-based approach would be cleaner. Nevertheless... this could be redesigned if our - * needs warranted it and the current approach is definitely good enough for our needs right - * now. - */ - - // Now, poll each of those jobs until they're complete. - for (PipelineJobRecord jobToWaitOn : jobsToWaitOn) { - // Poll for completion. - while (!jobToWaitOn.isCompleted()) { - Thread.sleep(JOB_DEPENDENCY_POLL_MILLIS); - } - } - } -} diff --git a/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/jobs/store/package-info.java b/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/jobs/store/package-info.java deleted file mode 100644 index 6139eff02c..0000000000 --- a/apps/bfd-pipeline/bfd-pipeline-shared-utils/src/main/java/gov/cms/bfd/pipeline/sharedutils/jobs/store/package-info.java +++ /dev/null @@ -1,5 +0,0 @@ -/** - * Contains the models and services for tracking pending, in-progress, and completed {@link - * gov.cms.bfd.pipeline.sharedutils.PipelineJob}s. - */ -package gov.cms.bfd.pipeline.sharedutils.jobs.store; diff --git a/apps/pom.xml b/apps/pom.xml index 167ba3ea8f..6c80241bf7 100644 --- a/apps/pom.xml +++ b/apps/pom.xml @@ -543,6 +543,30 @@ test + + + org.mockito + mockito-core + ${mockito.version} + test + + + + org.mockito + mockito-inline + ${mockito-inline.version} + test + + + + org.mockito + mockito-junit-jupiter + ${mockito.version} + test + + com.newrelic.telemetry dropwizard-metrics-newrelic