From 848f4d5de6d02fa5ef1f7dda7d6b1c1f7d4d44a0 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Thu, 27 Jul 2023 11:22:22 +0200 Subject: [PATCH] TEZ-4505: Create counters about time intervals spent in certain states in StateMachineTez --- .../common/counters/TaskAttemptCounter.java | 22 +++++++++++ .../apache/tez/dag/api/TezConfiguration.java | 13 +++++++ .../apache/tez/dag/app/dag/TaskAttempt.java | 1 + .../apache/tez/dag/app/dag/impl/DAGImpl.java | 4 ++ .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 25 +++++++++++- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 21 ++++++++++ .../org/apache/tez/state/StateMachineTez.java | 39 +++++++++++++++++++ 7 files changed, 123 insertions(+), 2 deletions(-) create mode 100644 tez-api/src/main/java/org/apache/tez/common/counters/TaskAttemptCounter.java diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TaskAttemptCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TaskAttemptCounter.java new file mode 100644 index 0000000000..7f39cfdb59 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/common/counters/TaskAttemptCounter.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.common.counters; + +public class TaskAttemptCounter { + +} diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 9e2e2d89cf..b094483041 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -2088,6 +2088,19 @@ static Set getPropertySet() { + "dag.status.pollinterval-ms"; public static final long TEZ_DAG_STATUS_POLLINTERVAL_MS_DEFAULT = 500; + /** + * Boolean value + * Whether to count how much time the DAG StateMachine spends in certain states + * and report it in counters. + * Minor performance degradation is possible so this is turned off by default. + */ + @ConfigurationScope(Scope.DAG) + @ConfigurationProperty(type = "boolean") + public static final String TEZ_DAG_STATE_INTERVAL_MONITOR_ENABLED = TEZ_PREFIX + + "dag.state.interval.monitor.enabled"; + //FIXME: false + public static final boolean TEZ_DAG_STATE_INTERVAL_MONITOR_ENABLED_DEFAULT = true; + /** * Long value. * Time to wait (in seconds) for apps to complete on MiniTezCluster shutdown. diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java index f51b576dfe..14c7562b1d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java @@ -76,6 +76,7 @@ public void setCounters(TezCounters counters) { List getDiagnostics(); TaskAttemptTerminationCause getTerminationCause(); TezCounters getCounters(); + TezCounters getStateCounters(); @VisibleForTesting void setCounters(TezCounters counters); float getProgress(); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index aa28e02441..7fe7b82863 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -588,6 +588,9 @@ private void augmentStateMachine() { STATE_CHANGED_CALLBACK) .registerStateEnteredCallback(DAGState.ERROR, STATE_CHANGED_CALLBACK); + if (StateMachineTez.isStateIntervalMonitorEnabled(dagConf)) { + stateMachine.enableStateIntervalMonitor(); + } } private static class DagStateChangedCallback @@ -1943,6 +1946,7 @@ public TezCounters constructFinalFullcounters() { for (Vertex v : this.vertices.values()) { aggregateTezCounters.aggrAllCounters(v.getAllCounters()); } + stateMachine.incrementStateCounters(this.getClass().getSimpleName() + "_STATES", aggregateTezCounters); return aggregateTezCounters; } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 289f1a1887..4606125231 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -38,9 +38,11 @@ import org.apache.tez.dag.app.dag.event.TaskEventTAKilled; import org.apache.tez.dag.app.dag.event.TaskEventTALaunched; import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded; +import org.apache.tez.dag.app.dag.event.TaskEventType; import org.apache.tez.dag.app.rm.AMSchedulerEventTAStateUpdated; import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.serviceplugins.api.TaskScheduler; +import org.apache.tez.state.StateMachineTez; import org.apache.tez.util.StringInterner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +65,7 @@ import org.apache.hadoop.yarn.util.Records; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.DAGCounter; +import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; @@ -76,7 +79,9 @@ import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; +import org.apache.tez.dag.app.dag.TaskStateInternal; import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.event.DAGEvent; import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate; @@ -242,7 +247,7 @@ public static DataEventDependencyInfo fromProto(DataEventDependencyInfoProto pro private static SingleArcTransition STATUS_UPDATER = new StatusUpdaterTransition(); - private final StateMachine stateMachine; + private final StateMachineTez stateMachine; // TODO TEZ-2003 (post) TEZ-2667 We may need some additional state management for STATUS_UPDATES, FAILED, KILLED coming in before // TASK_STARTED_REMOTELY. In case of a PUSH it's more intuitive to send TASK_STARTED_REMOTELY after communicating @@ -571,7 +576,10 @@ public TaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler, this.reportedStatus = new TaskAttemptStatus(this.attemptId); initTaskAttemptStatus(reportedStatus); RackResolver.init(conf); - this.stateMachine = stateMachineFactory.make(this); + this.stateMachine = + new StateMachineTez( + stateMachineFactory.make(this), this); + augmentStateMachine(); this.isRescheduled = isRescheduled; this.taskResource = resource; this.containerContext = containerContext; @@ -584,6 +592,12 @@ public TaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler, null : appContext.getDAGRecoveryData().getTaskAttemptRecoveryData(attemptId); } + private void augmentStateMachine() { + if (StateMachineTez.isStateIntervalMonitorEnabled(conf)) { + stateMachine.enableStateIntervalMonitor(); + } + } + @Override public TezTaskAttemptID getTaskAttemptID() { return attemptId; @@ -655,6 +669,13 @@ public TezCounters getCounters() { } } + @Override + public TezCounters getStateCounters() { + TezCounters counters = new TezCounters(); + stateMachine.incrementStateCounters("TaskAttemptCounter_" + getVertex().getName(), counters); + return counters; + } + @VisibleForTesting @Override public void setCounters(TezCounters counters) { diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 1a88673a0b..b03ec26717 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -39,6 +39,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.tez.common.counters.TaskAttemptCounter; import org.apache.tez.dag.app.dag.event.TaskEventTAFailed; import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -58,6 +59,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.TezUncheckedException; @@ -315,6 +317,9 @@ private void augmentStateMachine() { stateMachine .registerStateEnteredCallback(TaskStateInternal.SUCCEEDED, STATE_CHANGED_CALLBACK); + if (StateMachineTez.isStateIntervalMonitorEnabled(conf)) { + stateMachine.enableStateIntervalMonitor(); + } } private final StateMachineTez @@ -474,6 +479,10 @@ public TezCounters getCounters() { try { TaskAttempt bestAttempt = selectBestAttempt(); TezCounters taskCounters = (bestAttempt != null) ? bestAttempt.getCounters() : TaskAttemptImpl.EMPTY_COUNTERS; + + stateMachine.incrementStateCounters(TaskCounter.class.getSimpleName() + "_" + getVertex().getName(), taskCounters); + maybeMergeAllTaskAttemptCounters(taskCounters); + if (getVertex().isSpeculationEnabled()) { tezCounters.incrAllCounters(taskCounters); return tezCounters; @@ -484,6 +493,18 @@ public TezCounters getCounters() { } } + private void maybeMergeAllTaskAttemptCounters(TezCounters taskCounters) { + if (stateMachine.isStateIntervalMonitorEnabled()) { + // if the state interval monitoring is disabled for this TaskImpl.stateMachine, + // it's disabled for all TaskAttemptImpl's stateMachine too + return; + } + String groupName = TaskAttemptCounter.class.getSimpleName() + "_" + getVertex().getName(); + for (TaskAttempt at : attempts.values()) { + taskCounters.getGroup(groupName).aggrAllCounters(at.getStateCounters().getGroup(groupName)); + } + } + TaskStatistics getStatistics() { // simply return the stats from the best attempt readLock.lock(); diff --git a/tez-dag/src/main/java/org/apache/tez/state/StateMachineTez.java b/tez-dag/src/main/java/org/apache/tez/state/StateMachineTez.java index cbb838d7f0..98019b3346 100644 --- a/tez-dag/src/main/java/org/apache/tez/state/StateMachineTez.java +++ b/tez-dag/src/main/java/org/apache/tez/state/StateMachineTez.java @@ -21,8 +21,12 @@ import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.StateMachine; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.records.TezID; public class StateMachineTez, EVENTTYPE extends Enum, EVENT, OPERAND> @@ -34,6 +38,10 @@ public class StateMachineTez, EVENTTYPE extends Enum realStatemachine; + private boolean isStateIntervalMonitorEnabled = false; + private long lastStateChangedTime = Time.monotonicNow(); + private Map intervalSpentInStatesMs = new HashMap<>(); + @SuppressWarnings("unchecked") public StateMachineTez(StateMachine sm, OPERAND operand) { this.realStatemachine = sm; @@ -67,7 +75,38 @@ public STATE doTransition(EVENTTYPE eventType, EVENT event) throws if (callback != null) { callback.onStateChanged(operand, newState); } + if (isStateIntervalMonitorEnabled) { + String stateName = oldState.name(); + if (!intervalSpentInStatesMs.containsKey(stateName)) { + intervalSpentInStatesMs.put(stateName, 0L); + } + long now = Time.monotonicNow(); + intervalSpentInStatesMs.put(stateName, now - lastStateChangedTime); + lastStateChangedTime = now; + } } return newState; } + + public static boolean isStateIntervalMonitorEnabled(Configuration conf) { + return conf.getBoolean(TezConfiguration.TEZ_DAG_STATE_INTERVAL_MONITOR_ENABLED, + TezConfiguration.TEZ_DAG_STATE_INTERVAL_MONITOR_ENABLED_DEFAULT); + } + + public boolean isStateIntervalMonitorEnabled() { + return isStateIntervalMonitorEnabled; + } + + public void enableStateIntervalMonitor() { + this.isStateIntervalMonitorEnabled = true; + } + + public void incrementStateCounters(String group, TezCounters counters) { + if (!isStateIntervalMonitorEnabled) { + return; + } + for (Map.Entry e : intervalSpentInStatesMs.entrySet()) { + counters.getGroup(group).findCounter(e.getKey(), true).increment(e.getValue()); + } + } }