Skip to content

Commit

Permalink
[GOBBLIN-2181] Ensure DagTask::conclude after a non-transient excep…
Browse files Browse the repository at this point in the history
…tion (#4084)
  • Loading branch information
vsinghal85 authored and Will-Lo committed Dec 13, 2024
1 parent 89bd41b commit 6920851
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.gobblin.service.modules.orchestration;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -39,6 +41,7 @@
import org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExceptionUtils;
import org.apache.gobblin.util.ExecutorsUtils;


Expand Down Expand Up @@ -67,6 +70,8 @@ public class DagProcessingEngine extends AbstractIdleService {
public static final String DEFAULT_JOB_START_DEADLINE_TIME_MS = "defaultJobStartDeadlineTimeMillis";
@Getter static long defaultJobStartDeadlineTimeMillis;
public static final String DEFAULT_FLOW_FAILURE_OPTION = FailureOption.FINISH_ALL_POSSIBLE.name();
// TODO Update to fetch list from config once transient exception handling is implemented and retryable exceptions defined
public static final List<Class<? extends Exception>> retryableExceptions = Collections.EMPTY_LIST;

@Inject
public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, DagProcFactory dagProcFactory,
Expand All @@ -85,6 +90,10 @@ private static void setDefaultJobStartDeadlineTimeMs(long deadlineTimeMs) {
defaultJobStartDeadlineTimeMillis = deadlineTimeMs;
}

public static boolean isTransientException(Exception e) {
return ExceptionUtils.isExceptionInstanceOf(e, retryableExceptions);
}

@Override
protected void startUp() {
Integer numThreads = ConfigUtils.getInt
Expand Down Expand Up @@ -151,6 +160,12 @@ public void run() {
} catch (Exception e) {
log.error("DagProcEngineThread: " + dagProc.contextualizeStatus("error"), e);
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
if (!DagProcessingEngine.isTransientException(e)) {
log.warn(dagProc.contextualizeStatus("ignoring non-transient exception by concluding so no retries"));
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();
dagTask.conclude();
}
// TODO add the else block for transient exceptions and add conclude task only if retry limit is not breached
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.gobblin.service.modules.orchestration.proc;

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;

import com.typesafe.config.Config;

Expand All @@ -33,15 +31,12 @@
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.util.ExceptionUtils;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagUtils;
import org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
import org.apache.gobblin.util.ConfigUtils;


/**
Expand All @@ -62,7 +57,6 @@ public abstract class DagProc<T> {
@Getter protected final Dag.DagId dagId;
@Getter protected final DagNodeId dagNodeId;
protected static final MetricContext metricContext = Instrumented.getMetricContext(new State(), DagProc.class);
protected final List<Class<? extends Exception>> nonRetryableExceptions;
protected static final EventSubmitter eventSubmitter = new EventSubmitter.Builder(
metricContext, "org.apache.gobblin.service").build();

Expand All @@ -71,14 +65,6 @@ public DagProc(DagTask dagTask, Config config) {
this.dagId = DagUtils.generateDagId(this.dagTask.getDagAction().getFlowGroup(),
this.dagTask.getDagAction().getFlowName(), this.dagTask.getDagAction().getFlowExecutionId());
this.dagNodeId = this.dagTask.getDagAction().getDagNodeId();
this.nonRetryableExceptions = ConfigUtils.getStringList(config, ServiceConfigKeys.DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY)
.stream().map(className -> {
try {
return (Class<? extends Exception>) Class.forName(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
}

public final void process(DagManagementStateStore dagManagementStateStore,
Expand All @@ -92,20 +78,9 @@ public final void process(DagManagementStateStore dagManagementStateStore,
dagProcEngineMetrics.markDagActionsInitialize(getDagActionType(), false);
throw e;
}
try {
logContextualizedInfo("ready to process");
act(dagManagementStateStore, state, dagProcEngineMetrics);
logContextualizedInfo("processed");
} catch (Exception e) {
if (isNonTransientException(e)) {
log.error("Ignoring non transient exception. DagTask {} will conclude and will not be retried. Exception - {} ",
getDagTask(), e);
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
} else {
throw e;
}
}
}

protected abstract T initialize(DagManagementStateStore dagManagementStateStore) throws IOException;
Expand All @@ -126,8 +101,4 @@ public String contextualizeStatus(String message) {
public void logContextualizedInfo(String message) {
log.info(contextualizeStatus(message));
}

protected boolean isNonTransientException(Exception e) {
return ExceptionUtils.isExceptionInstanceOf(e, this.nonRetryableExceptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public class DagProcUtils {
public static void submitNextNodes(DagManagementStateStore dagManagementStateStore, Dag<JobExecutionPlan> dag,
Dag.DagId dagId) throws IOException {
Set<Dag.DagNode<JobExecutionPlan>> nextNodes = DagUtils.getNext(dag);

if (nextNodes.size() == 1) {
Dag.DagNode<JobExecutionPlan> dagNode = nextNodes.iterator().next();
DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, dagId);
Expand Down Expand Up @@ -139,12 +138,15 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat
dagManagementStateStore.updateDagNode(dagNode);
sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);
} catch (Exception e) {
TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
String message = "Cannot submit job " + DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri;
log.error(message, e);
jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + e.getMessage());
if (jobFailedTimer != null) {
jobFailedTimer.stop(jobMetadata);
// Only mark the job as failed in case of non transient exceptions
if (!DagProcessingEngine.isTransientException(e)) {
TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + e.getMessage());
if (jobFailedTimer != null) {
jobFailedTimer.stop(jobMetadata);
}
}
try {
// when there is no exception, quota will be released in job status monitor or re-evaluate dag proc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,24 @@ public void dagProcessingTest()
// (MAX_NUM_OF_TASKS + 1) th call
int expectedNumOfInvocations = MockedDagTaskStream.MAX_NUM_OF_TASKS + ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS;
int expectedExceptions = MockedDagTaskStream.MAX_NUM_OF_TASKS / MockedDagTaskStream.FAILING_DAGS_FREQUENCY;
int expectedNonRetryableExceptions = MockedDagTaskStream.MAX_NUM_OF_TASKS / MockedDagTaskStream.FAILING_DAGS_WITH_NON_RETRYABLE_EXCEPTIONS_FREQUENCY;

AssertWithBackoff.assertTrue(input -> Mockito.mockingDetails(this.dagTaskStream).getInvocations().size() == expectedNumOfInvocations,
10000L, "dagTaskStream was not called " + expectedNumOfInvocations + " number of times. "
+ "Actual number of invocations " + Mockito.mockingDetails(this.dagTaskStream).getInvocations().size(),
log, 1, 1000L);

// Currently we are treating all exceptions as non retryable and totalExceptionCount will be equal to count of non retryable exceptions
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(), expectedExceptions);
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(), expectedNonRetryableExceptions);
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(), expectedExceptions);
}

@Test
public void isNonTransientExceptionTest(){
/*
These exceptions examples are solely for testing purpose, ultimately it would come down
to the config defined for the transient exceptions, when we implement retry logic
*/
Assert.assertTrue(!DagProcessingEngine.isTransientException(new RuntimeException("Simulating a non retryable exception!")));
Assert.assertTrue(!DagProcessingEngine.isTransientException(new AzkabanClientException("Simulating a retryable exception!")));
}

private enum ExceptionType {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package org.apache.gobblin.service.modules.orchestration.proc;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.mockito.Mockito;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class DagProcUtilsTest {

DagManagementStateStore dagManagementStateStore;
SpecExecutor mockSpecExecutor;

@BeforeMethod
public void setUp() {
dagManagementStateStore = Mockito.mock(DagManagementStateStore.class);
mockSpecExecutor = new MockedSpecExecutor(Mockito.mock(Config.class));
}

@Test
public void testSubmitNextNodesSuccess() throws URISyntaxException, IOException {
Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678);
List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans();
List<Dag.DagNode<JobExecutionPlan>> dagNodeList = jobExecutionPlans.stream()
.map(Dag.DagNode<JobExecutionPlan>::new)
.collect(Collectors.toList());
Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
for (JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
Mockito.verify(dagManagementStateStore, Mockito.times(1))
.addJobDagAction(jobExecutionPlan.getFlowGroup(), jobExecutionPlan.getFlowName(),
jobExecutionPlan.getFlowExecutionId(), jobExecutionPlan.getJobName(),
DagActionStore.DagActionType.REEVALUATE);
}
Mockito.verifyNoMoreInteractions(dagManagementStateStore);
}

@Test
public void testWhenSubmitToExecutorSuccess() throws URISyntaxException, IOException {
Dag.DagId dagId = new Dag.DagId("flowGroup1", "flowName1", 2345680);
List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(0);
Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan);
dagNodeList.add(dagNode);
Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class);
Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics);
Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode);
Mockito.doNothing().when(metrics).incrementJobsSentToExecutor(dagNode);
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
Mockito.verify(dagManagementStateStore, Mockito.times(2)).getDagManagerMetrics();
Mockito.verify(dagManagementStateStore, Mockito.times(1)).tryAcquireQuota(Collections.singleton(dagNode));
Mockito.verify(dagManagementStateStore, Mockito.times(1)).updateDagNode(dagNode);
Mockito.verify(dagManagementStateStore, Mockito.times(1)).addDagAction(Mockito.any(DagActionStore.DagAction.class));

Mockito.verify(metrics, Mockito.times(1)).incrementRunningJobMetrics(dagNode);
Mockito.verify(metrics, Mockito.times(1)).incrementJobsSentToExecutor(dagNode);
Mockito.verifyNoMoreInteractions(dagManagementStateStore);
}

@Test(expectedExceptions = RuntimeException.class)
public void testWhenSubmitToExecutorGivesRuntimeException() throws URISyntaxException, IOException, ExecutionException, InterruptedException{
Dag.DagId dagId = new Dag.DagId("flowGroup3", "flowName3", 2345678);
List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(2);
Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan);
dagNodeList.add(dagNode);
Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
SpecProducer<Spec> mockedSpecProducer = mockSpecExecutor.getProducer().get();
Mockito.doThrow(RuntimeException.class).when(mockedSpecProducer).addSpec(Mockito.any(JobSpec.class));
DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class);
Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics);
Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode);
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
Mockito.verify(mockedSpecProducer, Mockito.times(1)).addSpec(Mockito.any(JobSpec.class));
Mockito.verify(dagManagementStateStore, Mockito.times(1)).getDagManagerMetrics();
Mockito.verify(metrics, Mockito.times(1)).incrementRunningJobMetrics(dagNode);
Mockito.verifyNoMoreInteractions(dagManagementStateStore);
}

private List<JobExecutionPlan> getJobExecutionPlans() throws URISyntaxException {
Config flowConfig1 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName1")
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup1").build();
Config flowConfig2 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName2")
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup2").build();
Config flowConfig3 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName3")
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup3").build();
List<Config> flowConfigs = Arrays.asList(flowConfig1, flowConfig2, flowConfig3);

Config jobConfig1 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1")
.addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName1").build();
Config jobConfig2 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job2")
.addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName2").build();
Config jobConfig3 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1")
.addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName3").build();
List<Config> jobConfigs = Arrays.asList(jobConfig1, jobConfig2, jobConfig3);
List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Config jobConfig = jobConfigs.get(i);
FlowSpec flowSpec = FlowSpec.builder("testFlowSpec").withConfig(flowConfigs.get(i)).build();
if (i == 2) {
jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec,
jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH, ConfigValueFactory.fromAnyRef("testUri")),
mockSpecExecutor, 0L, ConfigFactory.empty()));
} else {
jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec,
jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH, ConfigValueFactory.fromAnyRef("testUri")),
new InMemorySpecExecutor(ConfigFactory.empty()), 0L, ConfigFactory.empty()));
}
}
return jobExecutionPlans;
}
}

0 comments on commit 6920851

Please sign in to comment.