Skip to content

Commit

Permalink
[admin-tool][server] Add new admin-tool command to dump heartbeat sta…
Browse files Browse the repository at this point in the history
…tus from server; Scan and log stale HB replica (#1275)

This PR adds two features related to heartbeat:

Add a heartbeat scan thread to periodically run and log lagging resources (every minute by default, this should be good enough not to spam logging). This can be further collected by other logging collecting system and we can easily detect on which host, what replica is lagging by how much.
Add a command to dump heartbeat status from a host. It has 2 optional filter: partition filter and lag filter. You can choose to see only specific topic / topic-partition or you can choose to only see resources that are lagging. This serves as the manual helper when (1) might be missing stuff.
  • Loading branch information
sixpluszero authored Nov 6, 2024
1 parent aeade7e commit ddaa0bb
Show file tree
Hide file tree
Showing 27 changed files with 926 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModel;
import com.linkedin.davinci.listener.response.AdminResponse;
import com.linkedin.davinci.listener.response.TopicPartitionIngestionContextResponse;
import com.linkedin.davinci.listener.response.ReplicaIngestionResponse;
import com.linkedin.davinci.notifier.LogNotifier;
import com.linkedin.davinci.notifier.PushStatusNotifier;
import com.linkedin.davinci.notifier.VeniceNotifier;
Expand Down Expand Up @@ -1147,7 +1147,6 @@ private Properties getKafkaConsumerProperties(VeniceStoreVersionConfig storeConf
return kafkaConsumerProperties;
}

@Override
public ByteBuffer getStoreVersionCompressionDictionary(String topicName) {
return storageMetadataService.getStoreVersionCompressionDictionary(topicName);
}
Expand All @@ -1156,7 +1155,6 @@ public StoreIngestionTask getStoreIngestionTask(String topicName) {
return topicNameToIngestionTaskMap.get(topicName);
}

@Override
public AdminResponse getConsumptionSnapshots(String topicName, ComplementSet<Integer> partitions) {
AdminResponse response = new AdminResponse();
StoreIngestionTask ingestionTask = getStoreIngestionTask(topicName);
Expand All @@ -1172,26 +1170,25 @@ public AdminResponse getConsumptionSnapshots(String topicName, ComplementSet<Int
return response;
}

public TopicPartitionIngestionContextResponse getTopicPartitionIngestionContext(
public ReplicaIngestionResponse getTopicPartitionIngestionContext(
String versionTopic,
String topicName,
int partitionId) {
TopicPartitionIngestionContextResponse topicPartitionIngestionContextResponse =
new TopicPartitionIngestionContextResponse();
ReplicaIngestionResponse replicaIngestionResponse = new ReplicaIngestionResponse();
PubSubTopic pubSubVersionTopic = pubSubTopicRepository.getTopic(versionTopic);
PubSubTopic requestTopic = pubSubTopicRepository.getTopic(topicName);
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(requestTopic, partitionId);
try {
byte[] topicPartitionInfo = aggKafkaConsumerService.getIngestionInfoFor(pubSubVersionTopic, pubSubTopicPartition);
topicPartitionIngestionContextResponse.setTopicPartitionIngestionContext(topicPartitionInfo);
replicaIngestionResponse.setPayload(topicPartitionInfo);
} catch (Exception e) {
topicPartitionIngestionContextResponse.setError(true);
topicPartitionIngestionContextResponse.setMessage(e.getMessage());
replicaIngestionResponse.setError(true);
replicaIngestionResponse.setMessage(e.getMessage());
LOGGER.error(
"Error on get topic partition ingestion context for resource: " + Utils.getReplicaId(topicName, partitionId),
e);
}
return topicPartitionIngestionContextResponse;
return replicaIngestionResponse;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.linkedin.davinci.kafka.consumer;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;


public class ReplicaHeartbeatInfo {
private String replicaId;
private String region;
private String leaderState;
private boolean readyToServe;
private long heartbeat;
private long lag;

@JsonCreator
public ReplicaHeartbeatInfo(
@JsonProperty("replicaId") String replicaId,
@JsonProperty("region") String region,
@JsonProperty("leaderState") String leaderState,
@JsonProperty("readyToServe") boolean readyToServe,
@JsonProperty("heartbeat") long heartbeat,
@JsonProperty("lag") long lag) {
this.leaderState = leaderState;
this.replicaId = replicaId;
this.region = region;
this.readyToServe = readyToServe;
this.heartbeat = heartbeat;
this.lag = lag;
}

public long getHeartbeat() {
return heartbeat;
}

public void setHeartbeat(long heartbeat) {
this.heartbeat = heartbeat;
}

public String getLeaderState() {
return leaderState;
}

public void setLeaderState(String leaderState) {
this.leaderState = leaderState;
}

public String getReplicaId() {
return replicaId;
}

public void setReplicaId(String replicaId) {
this.replicaId = replicaId;
}

public String getRegion() {
return region;
}

public void setRegion(String region) {
this.region = region;
}

public long getLag() {
return lag;
}

public void setLag(long lag) {
this.lag = lag;
}

public boolean isReadyToServe() {
return readyToServe;
}

public void setReadyToServe(boolean readyToServe) {
this.readyToServe = readyToServe;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

ReplicaHeartbeatInfo replicaHeartbeatInfo = (ReplicaHeartbeatInfo) o;
return this.replicaId.equals(replicaHeartbeatInfo.getReplicaId())
&& this.heartbeat == replicaHeartbeatInfo.getHeartbeat() && this.region.equals(replicaHeartbeatInfo.getRegion())
&& this.readyToServe == replicaHeartbeatInfo.isReadyToServe() && this.lag == replicaHeartbeatInfo.getLag()
&& this.leaderState.equals(replicaHeartbeatInfo.leaderState);
}

@Override
public int hashCode() {
int result = replicaId.hashCode();
result = 31 * result + region.hashCode();
result = 31 * result + leaderState.hashCode();
result = 31 * result + Boolean.hashCode(readyToServe);
result = 31 * result + Long.hashCode(heartbeat);
result = 31 * result + Long.hashCode(lag);
return result;
}

@Override
public String toString() {
return "{" + "\"replica\"='" + replicaId + '\'' + ", \"region\"='" + region + '\'' + ", \"leaderState\"='"
+ leaderState + '\'' + ", \"readyToServe\"='" + readyToServe + '\'' + ", \"heartbeat\"=" + heartbeat
+ ", \"lag\"=" + lag + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModel;
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.stats.AggVersionedIngestionStats;
import com.linkedin.davinci.storage.IngestionMetadataRetriever;
import java.util.Set;
import java.util.concurrent.CompletableFuture;


/**
* An interface for Store Ingestion Service for Venice.
*/
public interface StoreIngestionService extends IngestionMetadataRetriever {
public interface StoreIngestionService {
/**
* Starts consuming messages from Kafka Partition corresponding to Venice Partition.
* @param veniceStore Venice Store for the partition.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.linkedin.davinci.stats.ingestion.heartbeat;

import com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType;
import com.linkedin.davinci.kafka.consumer.ReplicaHeartbeatInfo;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.tehuti.metrics.MetricConfig;
import io.tehuti.metrics.MetricsRepository;
Expand Down Expand Up @@ -35,14 +38,19 @@
* Each region gets a different lag monitor
*/
public class HeartbeatMonitoringService extends AbstractVeniceService {
private final Thread reportingThread;
private static final Logger LOGGER = LogManager.getLogger(HeartbeatMonitoringService.class);
public static final int DEFAULT_REPORTER_THREAD_SLEEP_INTERVAL_SECONDS = 60;
public static final int DEFAULT_LAG_LOGGING_THREAD_SLEEP_INTERVAL_SECONDS = 60;
public static final long DEFAULT_STALE_HEARTBEAT_LOG_THRESHOLD_MILLIS = TimeUnit.MINUTES.toMillis(10);

private static final Logger LOGGER = LogManager.getLogger(HeartbeatMonitoringService.class);

private final Thread reportingThread;
private final Thread lagLoggingThread;

private final Set<String> regionNames;
private final String localRegionName;

// store -> version -> partition -> region -> timestamp
// store -> version -> partition -> region -> (timestamp, RTS)
private final Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> followerHeartbeatTimeStamps;
private final Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> leaderHeartbeatTimeStamps;
HeartbeatVersionedStats versionStatsReporter;
Expand All @@ -55,9 +63,10 @@ public HeartbeatMonitoringService(
this.regionNames = regionNames;
this.localRegionName = localRegionName;
this.reportingThread = new HeartbeatReporterThread();
followerHeartbeatTimeStamps = new VeniceConcurrentHashMap<>();
leaderHeartbeatTimeStamps = new VeniceConcurrentHashMap<>();
versionStatsReporter = new HeartbeatVersionedStats(
this.lagLoggingThread = new HeartbeatLagLoggingThread();
this.followerHeartbeatTimeStamps = new VeniceConcurrentHashMap<>();
this.leaderHeartbeatTimeStamps = new VeniceConcurrentHashMap<>();
this.versionStatsReporter = new HeartbeatVersionedStats(
metricsRepository,
metadataRepository,
() -> new HeartbeatStat(new MetricConfig(), regionNames),
Expand Down Expand Up @@ -138,15 +147,85 @@ public void removeLagMonitor(Version version, int partition) {
removeEntry(followerHeartbeatTimeStamps, version, partition);
}

public Map<String, ReplicaHeartbeatInfo> getHeartbeatInfo(
String versionTopicName,
int partitionFilter,
boolean filterLagReplica) {
Map<String, ReplicaHeartbeatInfo> aggregateResult = new VeniceConcurrentHashMap<>();
long currentTimestamp = System.currentTimeMillis();
aggregateResult.putAll(
getHeartbeatInfoFromMap(
leaderHeartbeatTimeStamps,
LeaderFollowerStateType.LEADER.name(),
currentTimestamp,
versionTopicName,
partitionFilter,
filterLagReplica));
aggregateResult.putAll(
getHeartbeatInfoFromMap(
followerHeartbeatTimeStamps,
LeaderFollowerStateType.STANDBY.name(),
currentTimestamp,
versionTopicName,
partitionFilter,
filterLagReplica));
return aggregateResult;
}

Map<String, ReplicaHeartbeatInfo> getHeartbeatInfoFromMap(
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> heartbeatTimestampMap,
String leaderState,
long currentTimestamp,
String versionTopicName,
int partitionFilter,
boolean filterLagReplica) {
Map<String, ReplicaHeartbeatInfo> result = new VeniceConcurrentHashMap<>();
for (Map.Entry<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> storeName: heartbeatTimestampMap
.entrySet()) {
for (Map.Entry<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>> version: storeName.getValue()
.entrySet()) {
for (Map.Entry<Integer, Map<String, Pair<Long, Boolean>>> partition: version.getValue().entrySet()) {
for (Map.Entry<String, Pair<Long, Boolean>> region: partition.getValue().entrySet()) {
String topicName = Version.composeKafkaTopic(storeName.getKey(), version.getKey());
long heartbeatTs = region.getValue().getLeft();
long lag = currentTimestamp - heartbeatTs;
if (!versionTopicName.equals(topicName)) {
continue;
}
if (partitionFilter >= 0 && partitionFilter != partition.getKey()) {
continue;
}
if (filterLagReplica && lag < DEFAULT_STALE_HEARTBEAT_LOG_THRESHOLD_MILLIS) {
continue;
}
String replicaId =
Utils.getReplicaId(Version.composeKafkaTopic(storeName.getKey(), version.getKey()), partition.getKey());
ReplicaHeartbeatInfo replicaHeartbeatInfo = new ReplicaHeartbeatInfo(
replicaId,
region.getKey(),
leaderState,
region.getValue().getRight(),
heartbeatTs,
lag);
result.put(replicaId + "-" + region.getKey(), replicaHeartbeatInfo);
}
}
}
}
return result;
}

@Override
public boolean startInner() throws Exception {
reportingThread.start();
lagLoggingThread.start();
return true;
}

@Override
public void stopInner() throws Exception {
reportingThread.interrupt();
lagLoggingThread.interrupt();
}

/**
Expand Down Expand Up @@ -251,6 +330,39 @@ protected void record() {
.recordFollowerLag(storeName, version, region, heartbeatTs, isReadyToServe)));
}

protected void checkAndMaybeLogHeartbeatDelayMap(
Map<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> heartbeatTimestamps) {
long currentTimestamp = System.currentTimeMillis();
for (Map.Entry<String, Map<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>>> storeName: heartbeatTimestamps
.entrySet()) {
for (Map.Entry<Integer, Map<Integer, Map<String, Pair<Long, Boolean>>>> version: storeName.getValue()
.entrySet()) {
for (Map.Entry<Integer, Map<String, Pair<Long, Boolean>>> partition: version.getValue().entrySet()) {
for (Map.Entry<String, Pair<Long, Boolean>> region: partition.getValue().entrySet()) {
long heartbeatTs = region.getValue().getLeft();
long lag = currentTimestamp - heartbeatTs;
if (lag > DEFAULT_STALE_HEARTBEAT_LOG_THRESHOLD_MILLIS && region.getValue().getRight()) {
String replicaId = Utils
.getReplicaId(Version.composeKafkaTopic(storeName.getKey(), version.getKey()), partition.getKey());
LOGGER.warn(
"Replica: {}, region: {} is having heartbeat lag: {}, latest heartbeat: {}, current timestamp: {}",
replicaId,
region.getKey(),
lag,
heartbeatTs,
currentTimestamp);
}
}
}
}
}
}

protected void checkAndMaybeLogHeartbeatDelay() {
checkAndMaybeLogHeartbeatDelayMap(leaderHeartbeatTimeStamps);
checkAndMaybeLogHeartbeatDelayMap(followerHeartbeatTimeStamps);
}

@FunctionalInterface
interface ReportLagFunction {
void apply(String storeName, int version, String region, long lag, boolean isReadyToServe);
Expand All @@ -272,7 +384,27 @@ public void run() {
break;
}
}
LOGGER.info("RemoteIngestionRepairService thread interrupted! Shutting down...");
LOGGER.info("Heartbeat lag metric reporting thread interrupted! Shutting down...");
}
}

private class HeartbeatLagLoggingThread extends Thread {
HeartbeatLagLoggingThread() {
super("Ingestion-Heartbeat-Lag-Logging-Service-Thread");
}

@Override
public void run() {
while (!Thread.interrupted()) {
checkAndMaybeLogHeartbeatDelay();
try {
TimeUnit.SECONDS.sleep(DEFAULT_LAG_LOGGING_THREAD_SLEEP_INTERVAL_SECONDS);
} catch (InterruptedException e) {
// We've received an interrupt which is to be expected, so we'll just leave the loop and log
break;
}
}
LOGGER.info("Heartbeat lag logging thread interrupted! Shutting down...");
}
}
}
Loading

0 comments on commit ddaa0bb

Please sign in to comment.