Skip to content

Commit

Permalink
[server][metrics] Fix sawtooth behavior on follower lag monitoring (#…
Browse files Browse the repository at this point in the history
…1305)

* [server][metrics] Fix sawtooth behavior on follower lag monitoring

For AA stores heartbeats from all peer colos are forwarded to the local follower replica, and the last received timestamp heartbeat is reported.  The effect of this is that if one colo is lagging, this will cause the metric to jump around as unhealthy heartbeats are interleaved with healthy ones.  This change addresses that by having the follower strictly report only the highest timestamp amongst heartbeats it's recieved.
  • Loading branch information
ZacAttack authored Nov 15, 2024
1 parent 98b913a commit bb7d601
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -51,8 +49,8 @@ public class HeartbeatMonitoringService extends AbstractVeniceService {
private final String localRegionName;

// store -> version -> partition -> region -> (timestamp, RTS)
private final Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> followerHeartbeatTimeStamps;
private final Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> leaderHeartbeatTimeStamps;
private final Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> followerHeartbeatTimeStamps;
private final Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> leaderHeartbeatTimeStamps;
HeartbeatVersionedStats versionStatsReporter;

public HeartbeatMonitoringService(
Expand All @@ -76,7 +74,7 @@ public HeartbeatMonitoringService(
}

private synchronized void initializeEntry(
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> heartbeatTimestamps,
Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> heartbeatTimestamps,
Version version,
int partition,
boolean isFollower) {
Expand All @@ -87,20 +85,21 @@ private synchronized void initializeEntry(
heartbeatTimestamps.computeIfAbsent(version.getStoreName(), storeKey -> new VeniceConcurrentHashMap<>())
.computeIfAbsent(version.getNumber(), versionKey -> new VeniceConcurrentHashMap<>())
.computeIfAbsent(partition, partitionKey -> {
Map<String, Pair<Long, Boolean>> regionTimestamps = new VeniceConcurrentHashMap<>();
Map<String, HeartbeatTimeStampEntry> regionTimestamps = new VeniceConcurrentHashMap<>();
if (version.isActiveActiveReplicationEnabled() && !isFollower) {
for (String region: regionNames) {
regionTimestamps.put(region, new MutablePair<>(System.currentTimeMillis(), false));
regionTimestamps.put(region, new HeartbeatTimeStampEntry(System.currentTimeMillis(), false, false));
}
} else {
regionTimestamps.put(localRegionName, new MutablePair<>(System.currentTimeMillis(), false));
regionTimestamps
.put(localRegionName, new HeartbeatTimeStampEntry(System.currentTimeMillis(), false, false));
}
return regionTimestamps;
});
}

private synchronized void removeEntry(
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> heartbeatTimestamps,
Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> heartbeatTimestamps,
Version version,
int partition) {
heartbeatTimestamps.computeIfPresent(version.getStoreName(), (storeKey, versionMap) -> {
Expand Down Expand Up @@ -173,21 +172,21 @@ public Map<String, ReplicaHeartbeatInfo> getHeartbeatInfo(
}

Map<String, ReplicaHeartbeatInfo> getHeartbeatInfoFromMap(
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> heartbeatTimestampMap,
Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> heartbeatTimestampMap,
String leaderState,
long currentTimestamp,
String versionTopicName,
int partitionFilter,
boolean filterLagReplica) {
Map<String, ReplicaHeartbeatInfo> result = new VeniceConcurrentHashMap<>();
for (Map.Entry<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> storeName: heartbeatTimestampMap
for (Map.Entry<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> storeName: heartbeatTimestampMap
.entrySet()) {
for (Map.Entry<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>> version: storeName.getValue()
for (Map.Entry<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>> version: storeName.getValue()
.entrySet()) {
for (Map.Entry<Integer, Map<String, Pair<Long, Boolean>>> partition: version.getValue().entrySet()) {
for (Map.Entry<String, Pair<Long, Boolean>> region: partition.getValue().entrySet()) {
for (Map.Entry<Integer, Map<String, HeartbeatTimeStampEntry>> partition: version.getValue().entrySet()) {
for (Map.Entry<String, HeartbeatTimeStampEntry> region: partition.getValue().entrySet()) {
String topicName = Version.composeKafkaTopic(storeName.getKey(), version.getKey());
long heartbeatTs = region.getValue().getLeft();
long heartbeatTs = region.getValue().timestamp;
long lag = currentTimestamp - heartbeatTs;
if (!versionTopicName.equals(topicName)) {
continue;
Expand All @@ -204,7 +203,7 @@ Map<String, ReplicaHeartbeatInfo> getHeartbeatInfoFromMap(
replicaId,
region.getKey(),
leaderState,
region.getValue().getRight(),
region.getValue().readyToServe,
heartbeatTs,
lag);
result.put(replicaId + "-" + region.getKey(), replicaHeartbeatInfo);
Expand Down Expand Up @@ -245,7 +244,7 @@ public void recordLeaderHeartbeat(
String region,
Long timestamp,
boolean isReadyToServe) {
recordHeartbeat(store, version, partition, region, timestamp, leaderHeartbeatTimeStamps, isReadyToServe);
recordHeartbeat(store, version, partition, region, timestamp, leaderHeartbeatTimeStamps, isReadyToServe, false);
}

/**
Expand All @@ -265,7 +264,7 @@ public void recordFollowerHeartbeat(
String region,
Long timestamp,
boolean isReadyToServe) {
recordHeartbeat(store, version, partition, region, timestamp, followerHeartbeatTimeStamps, isReadyToServe);
recordHeartbeat(store, version, partition, region, timestamp, followerHeartbeatTimeStamps, isReadyToServe, true);
}

private void recordHeartbeat(
Expand All @@ -274,13 +273,24 @@ private void recordHeartbeat(
int partition,
String region,
Long timestamp,
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> heartbeatTimestamps,
boolean isReadyToServe) {
Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> heartbeatTimestamps,
boolean isReadyToServe,
boolean retainHighestTimeStamp) {
if (region != null) {
heartbeatTimestamps.computeIfPresent(store, (storeKey, perVersionMap) -> {
perVersionMap.computeIfPresent(version, (versionKey, perPartitionMap) -> {
perPartitionMap.computeIfPresent(partition, (partitionKey, perRegionMap) -> {
perRegionMap.put(region, new MutablePair<>(timestamp, isReadyToServe));
// If we are retaining only the highest timestamp for a given heartbeat, if the current held heartbeat
// is of a higher value AND was an entry was consumed (not a place holder value by the process) then
// we will No-Op in favor of retaining that higher timestamp. This behavior is specific to follower
// nodes because the intent of this metric is to only show the lag of the follower relative to the leader
if (retainHighestTimeStamp && perRegionMap.get(region) != null
&& perRegionMap.get(region).timestamp > timestamp && perRegionMap.get(region).consumedFromUpstream) {
// No-Op
} else {
// record the heartbeat time stamp
perRegionMap.put(region, new HeartbeatTimeStampEntry(timestamp, isReadyToServe, true));
}
return perRegionMap;
});
return perPartitionMap;
Expand All @@ -290,29 +300,29 @@ private void recordHeartbeat(
}
}

protected Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> getLeaderHeartbeatTimeStamps() {
protected Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> getLeaderHeartbeatTimeStamps() {
return leaderHeartbeatTimeStamps;
}

protected Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> getFollowerHeartbeatTimeStamps() {
protected Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> getFollowerHeartbeatTimeStamps() {
return followerHeartbeatTimeStamps;
}

protected void recordLags(
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> heartbeatTimestamps,
Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> heartbeatTimestamps,
ReportLagFunction lagFunction) {
for (Map.Entry<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> storeName: heartbeatTimestamps
for (Map.Entry<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> storeName: heartbeatTimestamps
.entrySet()) {
for (Map.Entry<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>> version: storeName.getValue()
for (Map.Entry<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>> version: storeName.getValue()
.entrySet()) {
for (Map.Entry<Integer, Map<String, Pair<Long, Boolean>>> partition: version.getValue().entrySet()) {
for (Map.Entry<String, Pair<Long, Boolean>> region: partition.getValue().entrySet()) {
for (Map.Entry<Integer, Map<String, HeartbeatTimeStampEntry>> partition: version.getValue().entrySet()) {
for (Map.Entry<String, HeartbeatTimeStampEntry> region: partition.getValue().entrySet()) {
lagFunction.apply(
storeName.getKey(),
version.getKey(),
region.getKey(),
region.getValue().getLeft(),
region.getValue().getRight());
region.getValue().timestamp,
region.getValue().readyToServe);
}
}
}
Expand All @@ -331,17 +341,17 @@ protected void record() {
}

protected void checkAndMaybeLogHeartbeatDelayMap(
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> heartbeatTimestamps) {
Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> heartbeatTimestamps) {
long currentTimestamp = System.currentTimeMillis();
for (Map.Entry<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> storeName: heartbeatTimestamps
for (Map.Entry<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> storeName: heartbeatTimestamps
.entrySet()) {
for (Map.Entry<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>> version: storeName.getValue()
for (Map.Entry<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>> version: storeName.getValue()
.entrySet()) {
for (Map.Entry<Integer, Map<String, Pair<Long, Boolean>>> partition: version.getValue().entrySet()) {
for (Map.Entry<String, Pair<Long, Boolean>> region: partition.getValue().entrySet()) {
long heartbeatTs = region.getValue().getLeft();
for (Map.Entry<Integer, Map<String, HeartbeatTimeStampEntry>> partition: version.getValue().entrySet()) {
for (Map.Entry<String, HeartbeatTimeStampEntry> region: partition.getValue().entrySet()) {
long heartbeatTs = region.getValue().timestamp;
long lag = currentTimestamp - heartbeatTs;
if (lag > DEFAULT_STALE_HEARTBEAT_LOG_THRESHOLD_MILLIS && region.getValue().getRight()) {
if (lag > DEFAULT_STALE_HEARTBEAT_LOG_THRESHOLD_MILLIS && region.getValue().readyToServe) {
String replicaId = Utils
.getReplicaId(Version.composeKafkaTopic(storeName.getKey(), version.getKey()), partition.getKey());
LOGGER.warn(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.linkedin.davinci.stats.ingestion.heartbeat;

public class HeartbeatTimeStampEntry {
/**
* Whether this heartbeat entry is for a partition which is ready to serve
*/
public final boolean readyToServe;

/**
* Whether this heartbeat entry was consumed from input or if the system initialized it as a default entry
*/
public final boolean consumedFromUpstream;

/**
* The timestamp associated with this entry
*/
public final long timestamp;

public HeartbeatTimeStampEntry(long timestamp, boolean readyToServe, boolean consumedFromUpstream) {
this.readyToServe = readyToServe;
this.consumedFromUpstream = consumedFromUpstream;
this.timestamp = timestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,19 @@
import io.tehuti.metrics.MetricsRepository;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.commons.lang3.tuple.Pair;


public class HeartbeatVersionedStats extends AbstractVeniceAggVersionedStats<HeartbeatStat, HeartbeatStatReporter> {
private final Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> leaderMonitors;
private final Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> followerMonitors;
private final Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> leaderMonitors;
private final Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> followerMonitors;

public HeartbeatVersionedStats(
MetricsRepository metricsRepository,
ReadOnlyStoreRepository metadataRepository,
Supplier<HeartbeatStat> statsInitiator,
StatsSupplier<HeartbeatStatReporter> reporterSupplier,
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> leaderMonitors,
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> followerMonitors) {
Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> leaderMonitors,
Map<String, Map<Integer, Map<Integer, Map<String, HeartbeatTimeStampEntry>>>> followerMonitors) {
super(metricsRepository, metadataRepository, statsInitiator, reporterSupplier, true);
this.leaderMonitors = leaderMonitors;
this.followerMonitors = followerMonitors;
Expand Down
Loading

0 comments on commit bb7d601

Please sign in to comment.