Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Export heron-instances' streams-aggregated metrics #2020

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* 4. Expose methods which could be called externally to change the value of metrics
*/

public class BoltMetrics implements ComponentMetrics {
public class BoltMetrics extends ComponentMetrics {
private final CountMetric ackCount;
private final ReducedMetric<MeanReducerState, Number, Double> processLatency;
private final ReducedMetric<MeanReducerState, Number, Double> failLatency;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,14 @@
// limitations under the License.
package com.twitter.heron.common.utils.metrics;

import com.twitter.heron.classification.InterfaceAudience;
import com.twitter.heron.classification.InterfaceStability;

/**
* Interface for common metric actions that both spouts and bolts support
* Abstract Class for common metric actions that both spouts and bolts support
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface ComponentMetrics {
public abstract class ComponentMetrics {
// Metric-name suffix reserved for value aggregating on all different streams
public static final String ALL_STREAMS_AGGREGATED = "__all-streams-aggregated";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put this constant string into a separate final class like MetricsConstants?

Making a variable public in interface or abstract class smells dangerous.

Copy link
Contributor Author

@maosongfu maosongfu Jun 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One the one hand, it seems we don't have such Constants class; I don't see how necessary to add a class for a single filed;
on the other hand, this class is designed to abstract out the shared actions/implementations, which can be a good place to put a shared field.

BTW, I would prefer this class being an abstract class to an interface, since IMO this class is more used to reduce duplicated code, rather than to define some required behaviors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making a field public while allowing the class to be extended is in general a bad design.

For the problem why it's an interface, @billonahill may have a better idea.

To land this metric fast, I'll approve it for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No rush for this pull request. Will also wait for Bill's comments.


public abstract void serializeDataTuple(String streamId, long latency);

void serializeDataTuple(String streamId, long latency);
void emittedTuple(String streamId);
public abstract void emittedTuple(String streamId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public class FullBoltMetrics extends BoltMetrics {
// so instance could not produce more tuples
private final CountMetric outQueueFullCount;


public FullBoltMetrics() {
ackCount = new MultiCountMetric();
processLatency = new MultiReducedMetric<>(new MeanReducer());
Expand Down Expand Up @@ -126,6 +125,8 @@ public void ackedTuple(String streamId, String sourceComponent, long latency) {
ackCount.scope(streamId).incr();
processLatency.scope(streamId).update(latency);

ackCount.scope(ALL_STREAMS_AGGREGATED).incr();

// Consider there are cases that different streams with the same streamId,
// but with different source component. We need to distinguish them too.
String globalStreamId =
Expand All @@ -138,6 +139,8 @@ public void failedTuple(String streamId, String sourceComponent, long latency) {
failCount.scope(streamId).incr();
failLatency.scope(streamId).update(latency);

failCount.scope(ALL_STREAMS_AGGREGATED).incr();

// Consider there are cases that different streams with the same streamId,
// but with different source component. We need to distinguish them too.
String globalStreamId =
Expand All @@ -151,6 +154,9 @@ public void executeTuple(String streamId, String sourceComponent, long latency)
executeLatency.scope(streamId).update(latency);
executeTimeNs.scope(streamId).incrBy(latency);

executeCount.scope(ALL_STREAMS_AGGREGATED).incr();
executeTimeNs.scope(ALL_STREAMS_AGGREGATED).incrBy(latency);

// Consider there are cases that different streams with the same streamId,
// but with different source component. We need to distinguish them too.
String globalStreamId =
Expand All @@ -170,6 +176,7 @@ public void updateOutQueueFullCount() {

public void deserializeDataTuple(String streamId, String sourceComponent, long latency) {
deserializationTimeNs.scope(streamId).incrBy(latency);
deserializationTimeNs.scope(ALL_STREAMS_AGGREGATED).incrBy(latency);

// Consider there are cases that different streams with the same streamId,
// but with different source component. We need to distinguish them too.
Expand All @@ -180,6 +187,7 @@ public void deserializeDataTuple(String streamId, String sourceComponent, long l

public void serializeDataTuple(String streamId, long latency) {
serializationTimeNs.scope(streamId).incrBy(latency);
serializationTimeNs.scope(ALL_STREAMS_AGGREGATED).incrBy(latency);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,25 @@ public void initMultiCountMetrics(PhysicalPlanHelper helper) {
public void ackedTuple(String streamId, long latency) {
ackCount.scope(streamId).incr();
completeLatency.scope(streamId).update(latency);

ackCount.scope(ALL_STREAMS_AGGREGATED).incr();
}

public void failedTuple(String streamId, long latency) {
failCount.scope(streamId).incr();
failLatency.scope(streamId).update(latency);

failCount.scope(ALL_STREAMS_AGGREGATED).incr();
}

public void timeoutTuple(String streamId) {
timeoutCount.scope(streamId).incr();
timeoutCount.scope(ALL_STREAMS_AGGREGATED).incr();
}

public void emittedTuple(String streamId) {
emitCount.scope(streamId).incr();
emitCount.scope(ALL_STREAMS_AGGREGATED).incr();
}

public void nextTuple(long latency) {
Expand All @@ -140,6 +146,7 @@ public void updatePendingTuplesCount(long count) {

public void serializeDataTuple(String streamId, long latency) {
serializationTimeNs.scope(streamId).incrBy(latency);
serializationTimeNs.scope(ALL_STREAMS_AGGREGATED).incrBy(latency);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* 4. Expose methods which could be called externally to change the value of metrics
*/

public class SpoutMetrics implements ComponentMetrics {
public class SpoutMetrics extends ComponentMetrics {
private final CountMetric ackCount;
private final ReducedMetric<MeanReducerState, Number, Double> completeLatency;
private final ReducedMetric<MeanReducerState, Number, Double> failLatency;
Expand Down