Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] TEZ-4505: Create counters about time intervals spent in certain states in StateMachineTez #304

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.common.counters;

public class TaskAttemptCounter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't decode the usage of this 'empty' class, we are only using the name of this class, if only name is required can't we have a string constant or an enum

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just a leftover, actually I'm in the middle of this, full of bugs, let me set this PR to [DRAFT]


}
13 changes: 13 additions & 0 deletions tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -2088,6 +2088,19 @@ static Set<String> getPropertySet() {
+ "dag.status.pollinterval-ms";
public static final long TEZ_DAG_STATUS_POLLINTERVAL_MS_DEFAULT = 500;

/**
* Boolean value
* Whether to count how much time the DAG StateMachine spends in certain states
* and report it in counters.
* Minor performance degradation is possible so this is turned off by default.
*/
@ConfigurationScope(Scope.DAG)
@ConfigurationProperty(type = "boolean")
public static final String TEZ_DAG_STATE_INTERVAL_MONITOR_ENABLED = TEZ_PREFIX
+ "dag.state.interval.monitor.enabled";
//FIXME: false
public static final boolean TEZ_DAG_STATE_INTERVAL_MONITOR_ENABLED_DEFAULT = true;

/**
* Long value.
* Time to wait (in seconds) for apps to complete on MiniTezCluster shutdown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public void setCounters(TezCounters counters) {
List<String> getDiagnostics();
TaskAttemptTerminationCause getTerminationCause();
TezCounters getCounters();
TezCounters getStateCounters();
@VisibleForTesting
void setCounters(TezCounters counters);
float getProgress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,9 @@ private void augmentStateMachine() {
STATE_CHANGED_CALLBACK)
.registerStateEnteredCallback(DAGState.ERROR,
STATE_CHANGED_CALLBACK);
if (StateMachineTez.isStateIntervalMonitorEnabled(dagConf)) {
stateMachine.enableStateIntervalMonitor();
}
}

private static class DagStateChangedCallback
Expand Down Expand Up @@ -1943,6 +1946,7 @@ public TezCounters constructFinalFullcounters() {
for (Vertex v : this.vertices.values()) {
aggregateTezCounters.aggrAllCounters(v.getAllCounters());
}
stateMachine.incrementStateCounters(this.getClass().getSimpleName() + "_STATES", aggregateTezCounters);
return aggregateTezCounters;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
import org.apache.tez.dag.app.dag.event.TaskEventTAKilled;
import org.apache.tez.dag.app.dag.event.TaskEventTALaunched;
import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.rm.AMSchedulerEventTAStateUpdated;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.state.StateMachineTez;
import org.apache.tez.util.StringInterner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -63,6 +65,7 @@
import org.apache.hadoop.yarn.util.Records;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
Expand All @@ -76,7 +79,9 @@
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.TaskStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
Expand Down Expand Up @@ -242,7 +247,7 @@ public static DataEventDependencyInfo fromProto(DataEventDependencyInfoProto pro
private static SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
STATUS_UPDATER = new StatusUpdaterTransition();

private final StateMachine<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent> stateMachine;
private final StateMachineTez<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent, TaskAttemptImpl> stateMachine;

// TODO TEZ-2003 (post) TEZ-2667 We may need some additional state management for STATUS_UPDATES, FAILED, KILLED coming in before
// TASK_STARTED_REMOTELY. In case of a PUSH it's more intuitive to send TASK_STARTED_REMOTELY after communicating
Expand Down Expand Up @@ -571,7 +576,10 @@ public TaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler,
this.reportedStatus = new TaskAttemptStatus(this.attemptId);
initTaskAttemptStatus(reportedStatus);
RackResolver.init(conf);
this.stateMachine = stateMachineFactory.make(this);
this.stateMachine =
new StateMachineTez<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent, TaskAttemptImpl>(
stateMachineFactory.make(this), this);
augmentStateMachine();
this.isRescheduled = isRescheduled;
this.taskResource = resource;
this.containerContext = containerContext;
Expand All @@ -584,6 +592,12 @@ public TaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler,
null : appContext.getDAGRecoveryData().getTaskAttemptRecoveryData(attemptId);
}

private void augmentStateMachine() {
if (StateMachineTez.isStateIntervalMonitorEnabled(conf)) {
stateMachine.enableStateIntervalMonitor();
}
}

@Override
public TezTaskAttemptID getTaskAttemptID() {
return attemptId;
Expand Down Expand Up @@ -655,6 +669,13 @@ public TezCounters getCounters() {
}
}

@Override
public TezCounters getStateCounters() {
TezCounters counters = new TezCounters();
stateMachine.incrementStateCounters("TaskAttemptCounter_" + getVertex().getName(), counters);
return counters;
}

@VisibleForTesting
@Override
public void setCounters(TezCounters counters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.tez.common.counters.TaskAttemptCounter;
import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
Expand All @@ -58,6 +59,7 @@
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezUncheckedException;
Expand Down Expand Up @@ -315,6 +317,9 @@ private void augmentStateMachine() {
stateMachine
.registerStateEnteredCallback(TaskStateInternal.SUCCEEDED,
STATE_CHANGED_CALLBACK);
if (StateMachineTez.isStateIntervalMonitorEnabled(conf)) {
stateMachine.enableStateIntervalMonitor();
}
}

private final StateMachineTez<TaskStateInternal, TaskEventType, TaskEvent, TaskImpl>
Expand Down Expand Up @@ -474,6 +479,10 @@ public TezCounters getCounters() {
try {
TaskAttempt bestAttempt = selectBestAttempt();
TezCounters taskCounters = (bestAttempt != null) ? bestAttempt.getCounters() : TaskAttemptImpl.EMPTY_COUNTERS;

stateMachine.incrementStateCounters(TaskCounter.class.getSimpleName() + "_" + getVertex().getName(), taskCounters);
maybeMergeAllTaskAttemptCounters(taskCounters);

if (getVertex().isSpeculationEnabled()) {
tezCounters.incrAllCounters(taskCounters);
return tezCounters;
Expand All @@ -484,6 +493,18 @@ public TezCounters getCounters() {
}
}

private void maybeMergeAllTaskAttemptCounters(TezCounters taskCounters) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit
I think we can ditch maybe from the name & just have mergeAllTaskAttemptCounters(, if stateIntervalMontior is enabled it does merge always

if (stateMachine.isStateIntervalMonitorEnabled()) {
// if the state interval monitoring is disabled for this TaskImpl.stateMachine,
// it's disabled for all TaskAttemptImpl's stateMachine too
return;
}
String groupName = TaskAttemptCounter.class.getSimpleName() + "_" + getVertex().getName();
for (TaskAttempt at : attempts.values()) {
taskCounters.getGroup(groupName).aggrAllCounters(at.getStateCounters().getGroup(groupName));
}
}

TaskStatistics getStatistics() {
// simply return the stats from the best attempt
readLock.lock();
Expand Down
39 changes: 39 additions & 0 deletions tez-dag/src/main/java/org/apache/tez/state/StateMachineTez.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.records.TezID;

public class StateMachineTez<STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT, OPERAND>
Expand All @@ -34,6 +38,10 @@ public class StateMachineTez<STATE extends Enum<STATE>, EVENTTYPE extends Enum<E

private final StateMachine<STATE, EVENTTYPE, EVENT> realStatemachine;

private boolean isStateIntervalMonitorEnabled = false;
private long lastStateChangedTime = Time.monotonicNow();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be set as part of enableStateIntervalMonitor?

private Map<String, Long> intervalSpentInStatesMs = new HashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be final


@SuppressWarnings("unchecked")
public StateMachineTez(StateMachine sm, OPERAND operand) {
this.realStatemachine = sm;
Expand Down Expand Up @@ -67,7 +75,38 @@ public STATE doTransition(EVENTTYPE eventType, EVENT event) throws
if (callback != null) {
callback.onStateChanged(operand, newState);
}
if (isStateIntervalMonitorEnabled) {
String stateName = oldState.name();
if (!intervalSpentInStatesMs.containsKey(stateName)) {
intervalSpentInStatesMs.put(stateName, 0L);
}
long now = Time.monotonicNow();
intervalSpentInStatesMs.put(stateName, now - lastStateChangedTime);
lastStateChangedTime = now;
}
}
return newState;
}

public static boolean isStateIntervalMonitorEnabled(Configuration conf) {
return conf.getBoolean(TezConfiguration.TEZ_DAG_STATE_INTERVAL_MONITOR_ENABLED,
TezConfiguration.TEZ_DAG_STATE_INTERVAL_MONITOR_ENABLED_DEFAULT);
}

public boolean isStateIntervalMonitorEnabled() {
return isStateIntervalMonitorEnabled;
}

public void enableStateIntervalMonitor() {
this.isStateIntervalMonitorEnabled = true;
}

public void incrementStateCounters(String group, TezCounters counters) {
if (!isStateIntervalMonitorEnabled) {
return;
}
for (Map.Entry<String, Long> e : intervalSpentInStatesMs.entrySet()) {
counters.getGroup(group).findCounter(e.getKey(), true).increment(e.getValue());
}
}
}