Skip to content

Commit

Permalink
TEZ-4505: Create counters about time intervals spent in certain state…
Browse files Browse the repository at this point in the history
…s in StateMachineTez
  • Loading branch information
abstractdog committed Jul 27, 2023
1 parent 5beab4c commit 5366844
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.common.counters;

public class TaskAttemptCounter {

}
13 changes: 13 additions & 0 deletions tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -2088,6 +2088,19 @@ static Set<String> getPropertySet() {
+ "dag.status.pollinterval-ms";
public static final long TEZ_DAG_STATUS_POLLINTERVAL_MS_DEFAULT = 500;

/**
* Boolean value
* Whether to count how much time the DAG StateMachine spends in certain states
* and report it in counters.
* Minor performance degradation is possible so this is turned off by default.
*/
@ConfigurationScope(Scope.DAG)
@ConfigurationProperty(type = "boolean")
public static final String TEZ_DAG_STATE_INTERVAL_MONITOR_ENABLED = TEZ_PREFIX
+ "dag.state.interval.monitor.enabled";
//FIXME: false
public static final boolean TEZ_DAG_STATE_INTERVAL_MONITOR_ENABLED_DEFAULT = true;

/**
* Long value.
* Time to wait (in seconds) for apps to complete on MiniTezCluster shutdown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public void setCounters(TezCounters counters) {
List<String> getDiagnostics();
TaskAttemptTerminationCause getTerminationCause();
TezCounters getCounters();
TezCounters getStateCounters();
@VisibleForTesting
void setCounters(TezCounters counters);
float getProgress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,9 @@ private void augmentStateMachine() {
STATE_CHANGED_CALLBACK)
.registerStateEnteredCallback(DAGState.ERROR,
STATE_CHANGED_CALLBACK);
if (StateMachineTez.isStateIntervalMonitorEnabled(dagConf)) {
stateMachine.enableStateIntervalMonitor();
}
}

private static class DagStateChangedCallback
Expand Down Expand Up @@ -1943,6 +1946,7 @@ public TezCounters constructFinalFullcounters() {
for (Vertex v : this.vertices.values()) {
aggregateTezCounters.aggrAllCounters(v.getAllCounters());
}
stateMachine.incrementStateCounters(DAGCounter.class.getName(), aggregateTezCounters);
return aggregateTezCounters;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
import org.apache.tez.dag.app.dag.event.TaskEventTAKilled;
import org.apache.tez.dag.app.dag.event.TaskEventTALaunched;
import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.rm.AMSchedulerEventTAStateUpdated;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.state.StateMachineTez;
import org.apache.tez.util.StringInterner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -63,6 +65,7 @@
import org.apache.hadoop.yarn.util.Records;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
Expand All @@ -76,7 +79,9 @@
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.TaskStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
Expand Down Expand Up @@ -242,7 +247,7 @@ public static DataEventDependencyInfo fromProto(DataEventDependencyInfoProto pro
private static SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
STATUS_UPDATER = new StatusUpdaterTransition();

private final StateMachine<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent> stateMachine;
private final StateMachineTez<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent, TaskAttemptImpl> stateMachine;

// TODO TEZ-2003 (post) TEZ-2667 We may need some additional state management for STATUS_UPDATES, FAILED, KILLED coming in before
// TASK_STARTED_REMOTELY. In case of a PUSH it's more intuitive to send TASK_STARTED_REMOTELY after communicating
Expand Down Expand Up @@ -571,7 +576,10 @@ public TaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler,
this.reportedStatus = new TaskAttemptStatus(this.attemptId);
initTaskAttemptStatus(reportedStatus);
RackResolver.init(conf);
this.stateMachine = stateMachineFactory.make(this);
this.stateMachine =
new StateMachineTez<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent, TaskAttemptImpl>(
stateMachineFactory.make(this), this);
augmentStateMachine();
this.isRescheduled = isRescheduled;
this.taskResource = resource;
this.containerContext = containerContext;
Expand All @@ -584,6 +592,12 @@ public TaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler,
null : appContext.getDAGRecoveryData().getTaskAttemptRecoveryData(attemptId);
}

private void augmentStateMachine() {
if (StateMachineTez.isStateIntervalMonitorEnabled(conf)) {
stateMachine.enableStateIntervalMonitor();
}
}

@Override
public TezTaskAttemptID getTaskAttemptID() {
return attemptId;
Expand Down Expand Up @@ -655,6 +669,13 @@ public TezCounters getCounters() {
}
}

@Override
public TezCounters getStateCounters() {
TezCounters counters = new TezCounters();
stateMachine.incrementStateCounters("TaskAttemptCounter_" + getVertex().getName(), counters);
return counters;
}

@VisibleForTesting
@Override
public void setCounters(TezCounters counters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.tez.common.counters.TaskAttemptCounter;
import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
Expand All @@ -58,6 +59,7 @@
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezUncheckedException;
Expand Down Expand Up @@ -315,6 +317,9 @@ private void augmentStateMachine() {
stateMachine
.registerStateEnteredCallback(TaskStateInternal.SUCCEEDED,
STATE_CHANGED_CALLBACK);
if (StateMachineTez.isStateIntervalMonitorEnabled(conf)) {
stateMachine.enableStateIntervalMonitor();
}
}

private final StateMachineTez<TaskStateInternal, TaskEventType, TaskEvent, TaskImpl>
Expand Down Expand Up @@ -474,6 +479,10 @@ public TezCounters getCounters() {
try {
TaskAttempt bestAttempt = selectBestAttempt();
TezCounters taskCounters = (bestAttempt != null) ? bestAttempt.getCounters() : TaskAttemptImpl.EMPTY_COUNTERS;

stateMachine.incrementStateCounters(TaskCounter.class.getSimpleName() + "_" + getVertex().getName(), taskCounters);
maybeMergeAllTaskAttemptCounters(taskCounters);

if (getVertex().isSpeculationEnabled()) {
tezCounters.incrAllCounters(taskCounters);
return tezCounters;
Expand All @@ -484,6 +493,18 @@ public TezCounters getCounters() {
}
}

private void maybeMergeAllTaskAttemptCounters(TezCounters taskCounters) {
if (stateMachine.isStateIntervalMonitorEnabled()) {
// if the state interval monitoring is disabled for this TaskImpl.stateMachine,
// it's disabled for all TaskAttemptImpl's stateMachine too
return;
}
String groupName = TaskAttemptCounter.class.getSimpleName() + "_" + getVertex().getName();
for (TaskAttempt at : attempts.values()) {
taskCounters.getGroup(groupName).aggrAllCounters(at.getStateCounters().getGroup(groupName));
}
}

TaskStatistics getStatistics() {
// simply return the stats from the best attempt
readLock.lock();
Expand Down
39 changes: 39 additions & 0 deletions tez-dag/src/main/java/org/apache/tez/state/StateMachineTez.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.records.TezID;

public class StateMachineTez<STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT, OPERAND>
Expand All @@ -34,6 +38,10 @@ public class StateMachineTez<STATE extends Enum<STATE>, EVENTTYPE extends Enum<E

private final StateMachine<STATE, EVENTTYPE, EVENT> realStatemachine;

private boolean isStateIntervalMonitorEnabled = false;
private long lastStateChangedTime = Time.monotonicNow();
private Map<String, Long> intervalSpentInStatesMs = new HashMap<>();

@SuppressWarnings("unchecked")
public StateMachineTez(StateMachine sm, OPERAND operand) {
this.realStatemachine = sm;
Expand Down Expand Up @@ -67,7 +75,38 @@ public STATE doTransition(EVENTTYPE eventType, EVENT event) throws
if (callback != null) {
callback.onStateChanged(operand, newState);
}
if (isStateIntervalMonitorEnabled) {
String stateName = oldState.name();
if (!intervalSpentInStatesMs.containsKey(stateName)) {
intervalSpentInStatesMs.put(stateName, 0L);
}
long now = Time.monotonicNow();
intervalSpentInStatesMs.put(stateName, now - lastStateChangedTime);
lastStateChangedTime = now;
}
}
return newState;
}

public static boolean isStateIntervalMonitorEnabled(Configuration conf) {
return conf.getBoolean(TezConfiguration.TEZ_DAG_STATE_INTERVAL_MONITOR_ENABLED,
TezConfiguration.TEZ_DAG_STATE_INTERVAL_MONITOR_ENABLED_DEFAULT);
}

public boolean isStateIntervalMonitorEnabled() {
return isStateIntervalMonitorEnabled;
}

public void enableStateIntervalMonitor() {
this.isStateIntervalMonitorEnabled = true;
}

public void incrementStateCounters(String group, TezCounters counters) {
if (isStateIntervalMonitorEnabled) {
return;
}
for (Map.Entry<String, Long> e : intervalSpentInStatesMs.entrySet()) {
counters.getGroup(group).findCounter(e.getKey(), true).increment(e.getValue());
}
}
}

0 comments on commit 5366844

Please sign in to comment.