From ddaa0bb5dcdd1741f58017725118434063f35fe3 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Wed, 6 Nov 2024 15:30:47 -0800 Subject: [PATCH] [admin-tool][server] Add new admin-tool command to dump heartbeat status from server; Scan and log stale HB replica (#1275) This PR adds two features related to heartbeat: Add a heartbeat scan thread to periodically run and log lagging resources (every minute by default, this should be good enough not to spam logging). This can be further collected by other logging collecting system and we can easily detect on which host, what replica is lagging by how much. Add a command to dump heartbeat status from a host. It has 2 optional filter: partition filter and lag filter. You can choose to see only specific topic / topic-partition or you can choose to only see resources that are lagging. This serves as the manual helper when (1) might be missing stuff. --- .../consumer/KafkaStoreIngestionService.java | 17 +- .../kafka/consumer/ReplicaHeartbeatInfo.java | 112 ++++++++++++ .../kafka/consumer/StoreIngestionService.java | 3 +- .../heartbeat/HeartbeatMonitoringService.java | 146 +++++++++++++++- .../storage/IngestionMetadataRetriever.java | 8 +- .../IngestionMetadataRetrieverDelegator.java | 69 ++++++++ .../consumer/ReplicaHeartbeatInfoTest.java | 52 ++++++ .../HeartbeatMonitoringServiceTest.java | 124 +++++++++++++- ...gestionMetadataRetrieverDelegatorTest.java | 32 ++++ .../java/com/linkedin/venice/AdminTool.java | 48 +++++- .../main/java/com/linkedin/venice/Arg.java | 3 +- .../java/com/linkedin/venice/Command.java | 8 +- .../response/ReplicaIngestionResponse.java | 34 ++++ ...opicPartitionIngestionContextResponse.java | 34 ---- .../com/linkedin/venice/meta/QueryAction.java | 6 +- .../venice/endToEnd/PartialUpdateTest.java | 8 +- .../endToEnd/TestDumpIngestionContext.java | 161 ++++++++++++++++++ .../utils/VeniceServerWrapper.java | 2 + .../listener/OutboundHttpWrapperHandler.java | 13 +- .../listener/RouterRequestHttpHandler.java | 7 + .../listener/ServerStoreAclHandler.java | 7 +- .../listener/StorageReadRequestHandler.java | 17 +- .../listener/request/HeartbeatRequest.java | 45 +++++ .../linkedin/venice/server/VeniceServer.java | 11 +- .../OutboundHttpWrapperHandlerTest.java | 6 +- .../listener/ServerStoreAclHandlerTest.java | 7 +- .../StorageReadRequestHandlerTest.java | 47 +++-- 27 files changed, 926 insertions(+), 101 deletions(-) create mode 100644 clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ReplicaHeartbeatInfo.java create mode 100644 clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/IngestionMetadataRetrieverDelegator.java create mode 100644 clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/ReplicaHeartbeatInfoTest.java create mode 100644 clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/IngestionMetadataRetrieverDelegatorTest.java create mode 100644 internal/venice-common/src/main/java/com/linkedin/davinci/listener/response/ReplicaIngestionResponse.java delete mode 100644 internal/venice-common/src/main/java/com/linkedin/davinci/listener/response/TopicPartitionIngestionContextResponse.java create mode 100644 internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestDumpIngestionContext.java create mode 100644 services/venice-server/src/main/java/com/linkedin/venice/listener/request/HeartbeatRequest.java diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index 89946242d7..ba3bb10c1a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -20,7 +20,7 @@ import com.linkedin.davinci.config.VeniceStoreVersionConfig; import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModel; import com.linkedin.davinci.listener.response.AdminResponse; -import com.linkedin.davinci.listener.response.TopicPartitionIngestionContextResponse; +import com.linkedin.davinci.listener.response.ReplicaIngestionResponse; import com.linkedin.davinci.notifier.LogNotifier; import com.linkedin.davinci.notifier.PushStatusNotifier; import com.linkedin.davinci.notifier.VeniceNotifier; @@ -1147,7 +1147,6 @@ private Properties getKafkaConsumerProperties(VeniceStoreVersionConfig storeConf return kafkaConsumerProperties; } - @Override public ByteBuffer getStoreVersionCompressionDictionary(String topicName) { return storageMetadataService.getStoreVersionCompressionDictionary(topicName); } @@ -1156,7 +1155,6 @@ public StoreIngestionTask getStoreIngestionTask(String topicName) { return topicNameToIngestionTaskMap.get(topicName); } - @Override public AdminResponse getConsumptionSnapshots(String topicName, ComplementSet partitions) { AdminResponse response = new AdminResponse(); StoreIngestionTask ingestionTask = getStoreIngestionTask(topicName); @@ -1172,26 +1170,25 @@ public AdminResponse getConsumptionSnapshots(String topicName, ComplementSet>>>> heartbeatTimestamps) { + long currentTimestamp = System.currentTimeMillis(); + for (Map.Entry>>>> storeName: heartbeatTimestamps + .entrySet()) { + for (Map.Entry>>> version: storeName.getValue() + .entrySet()) { + for (Map.Entry>> partition: version.getValue().entrySet()) { + for (Map.Entry> region: partition.getValue().entrySet()) { + long heartbeatTs = region.getValue().getLeft(); + long lag = currentTimestamp - heartbeatTs; + if (lag > DEFAULT_STALE_HEARTBEAT_LOG_THRESHOLD_MILLIS && region.getValue().getRight()) { + String replicaId = Utils + .getReplicaId(Version.composeKafkaTopic(storeName.getKey(), version.getKey()), partition.getKey()); + LOGGER.warn( + "Replica: {}, region: {} is having heartbeat lag: {}, latest heartbeat: {}, current timestamp: {}", + replicaId, + region.getKey(), + lag, + heartbeatTs, + currentTimestamp); + } + } + } + } + } + } + + protected void checkAndMaybeLogHeartbeatDelay() { + checkAndMaybeLogHeartbeatDelayMap(leaderHeartbeatTimeStamps); + checkAndMaybeLogHeartbeatDelayMap(followerHeartbeatTimeStamps); + } + @FunctionalInterface interface ReportLagFunction { void apply(String storeName, int version, String region, long lag, boolean isReadyToServe); @@ -272,7 +384,27 @@ public void run() { break; } } - LOGGER.info("RemoteIngestionRepairService thread interrupted! Shutting down..."); + LOGGER.info("Heartbeat lag metric reporting thread interrupted! Shutting down..."); + } + } + + private class HeartbeatLagLoggingThread extends Thread { + HeartbeatLagLoggingThread() { + super("Ingestion-Heartbeat-Lag-Logging-Service-Thread"); + } + + @Override + public void run() { + while (!Thread.interrupted()) { + checkAndMaybeLogHeartbeatDelay(); + try { + TimeUnit.SECONDS.sleep(DEFAULT_LAG_LOGGING_THREAD_SLEEP_INTERVAL_SECONDS); + } catch (InterruptedException e) { + // We've received an interrupt which is to be expected, so we'll just leave the loop and log + break; + } + } + LOGGER.info("Heartbeat lag logging thread interrupted! Shutting down..."); } } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/IngestionMetadataRetriever.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/IngestionMetadataRetriever.java index b9a1c892bd..8c32eeae0b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/IngestionMetadataRetriever.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/IngestionMetadataRetriever.java @@ -1,7 +1,7 @@ package com.linkedin.davinci.storage; import com.linkedin.davinci.listener.response.AdminResponse; -import com.linkedin.davinci.listener.response.TopicPartitionIngestionContextResponse; +import com.linkedin.davinci.listener.response.ReplicaIngestionResponse; import com.linkedin.venice.utils.ComplementSet; import java.nio.ByteBuffer; @@ -11,9 +11,7 @@ public interface IngestionMetadataRetriever { AdminResponse getConsumptionSnapshots(String topicName, ComplementSet partitions); - TopicPartitionIngestionContextResponse getTopicPartitionIngestionContext( - String versionTopic, - String topicName, - int partitionNum); + ReplicaIngestionResponse getTopicPartitionIngestionContext(String versionTopic, String topicName, int partitionNum); + ReplicaIngestionResponse getHeartbeatLag(String versionTopicName, int partitionFilter, boolean filterLagReplica); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/IngestionMetadataRetrieverDelegator.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/IngestionMetadataRetrieverDelegator.java new file mode 100644 index 0000000000..d4a5e6bd16 --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/IngestionMetadataRetrieverDelegator.java @@ -0,0 +1,69 @@ +package com.linkedin.davinci.storage; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService; +import com.linkedin.davinci.kafka.consumer.ReplicaHeartbeatInfo; +import com.linkedin.davinci.listener.response.AdminResponse; +import com.linkedin.davinci.listener.response.ReplicaIngestionResponse; +import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService; +import com.linkedin.venice.helix.VeniceJsonSerializer; +import com.linkedin.venice.utils.ComplementSet; +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +public class IngestionMetadataRetrieverDelegator implements IngestionMetadataRetriever { + private static final Logger LOGGER = LogManager.getLogger(IngestionMetadataRetrieverDelegator.class); + private final KafkaStoreIngestionService kafkaStoreIngestionService; + private final HeartbeatMonitoringService heartbeatMonitoringService; + + private final VeniceJsonSerializer> replicaInfoJsonSerializer = + new VeniceJsonSerializer<>(new TypeReference>() { + }); + + public IngestionMetadataRetrieverDelegator( + KafkaStoreIngestionService kafkaStoreIngestionService, + HeartbeatMonitoringService heartbeatMonitoringService) { + this.kafkaStoreIngestionService = kafkaStoreIngestionService; + this.heartbeatMonitoringService = heartbeatMonitoringService; + } + + @Override + public ByteBuffer getStoreVersionCompressionDictionary(String topicName) { + return kafkaStoreIngestionService.getStoreVersionCompressionDictionary(topicName); + } + + @Override + public AdminResponse getConsumptionSnapshots(String topicName, ComplementSet partitions) { + return kafkaStoreIngestionService.getConsumptionSnapshots(topicName, partitions); + } + + @Override + public ReplicaIngestionResponse getTopicPartitionIngestionContext( + String versionTopic, + String topicName, + int partitionNum) { + return kafkaStoreIngestionService.getTopicPartitionIngestionContext(versionTopic, topicName, partitionNum); + } + + @Override + public ReplicaIngestionResponse getHeartbeatLag( + String versionTopicName, + int partitionFilter, + boolean filterLagReplica) { + ReplicaIngestionResponse response = new ReplicaIngestionResponse(); + try { + byte[] topicPartitionInfo = replicaInfoJsonSerializer.serialize( + heartbeatMonitoringService.getHeartbeatInfo(versionTopicName, partitionFilter, filterLagReplica), + ""); + response.setPayload(topicPartitionInfo); + } catch (Exception e) { + response.setError(true); + response.setMessage(e.getMessage()); + LOGGER.error("Error on get topic partition ingestion context for all consumers", e); + } + return response; + } +} diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/ReplicaHeartbeatInfoTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/ReplicaHeartbeatInfoTest.java new file mode 100644 index 0000000000..ecc43e4c74 --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/ReplicaHeartbeatInfoTest.java @@ -0,0 +1,52 @@ +package com.linkedin.davinci.consumer; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType; +import com.linkedin.davinci.kafka.consumer.ReplicaHeartbeatInfo; +import com.linkedin.venice.helix.VeniceJsonSerializer; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class ReplicaHeartbeatInfoTest { + @Test + public void testJsonParse() throws Exception { + String storeName = "test_store"; + String topicName = Version.composeKafkaTopic(storeName, 1); + int partition = 10; + String region = "dc-1"; + long lag = TimeUnit.MINUTES.toMillis(11); + long heartbeat = System.currentTimeMillis() - lag; + LeaderFollowerStateType leaderFollowerStateType = LeaderFollowerStateType.LEADER; + String replicaId = Utils.getReplicaId(topicName, partition); + ReplicaHeartbeatInfo replicaHeartbeatInfo = new ReplicaHeartbeatInfo("a", "b", "c", true, 0L, 0L); + + replicaHeartbeatInfo.setReplicaId(replicaId); + replicaHeartbeatInfo.setRegion(region); + replicaHeartbeatInfo.setLeaderState(leaderFollowerStateType.name()); + replicaHeartbeatInfo.setHeartbeat(heartbeat); + replicaHeartbeatInfo.setLag(lag); + + Assert.assertEquals(replicaHeartbeatInfo.getHeartbeat(), heartbeat); + Assert.assertEquals(replicaHeartbeatInfo.getReplicaId(), replicaId); + Assert.assertEquals(replicaHeartbeatInfo.getRegion(), region); + Assert.assertEquals(replicaHeartbeatInfo.getLag(), lag); + Assert.assertEquals(replicaHeartbeatInfo.getLeaderState(), leaderFollowerStateType.name()); + + Map heartbeatInfoMap = new VeniceConcurrentHashMap<>(); + heartbeatInfoMap.put(replicaId + "-" + region, replicaHeartbeatInfo); + VeniceJsonSerializer> replicaInfoJsonSerializer = + new VeniceJsonSerializer<>(new TypeReference>() { + }); + + Assert.assertEquals( + replicaInfoJsonSerializer.deserialize(replicaInfoJsonSerializer.serialize(heartbeatInfoMap, ""), "") + .get(replicaId + "-" + region), + replicaHeartbeatInfo); + } +} diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java index 59419920a9..bc0dbc8ca0 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java @@ -1,5 +1,15 @@ package com.linkedin.davinci.stats.ingestion.heartbeat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.mock; + +import com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType; import com.linkedin.venice.meta.BufferReplayPolicy; import com.linkedin.venice.meta.DataReplicationPolicy; import com.linkedin.venice.meta.HybridStoreConfig; @@ -8,9 +18,14 @@ import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionImpl; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import io.tehuti.metrics.MetricsRepository; import java.util.HashSet; +import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; @@ -22,6 +37,111 @@ public class HeartbeatMonitoringServiceTest { private static final String TEST_STORE = "Vivaldi_store"; + @Test + public void testGetHeartbeatInfo() { + HeartbeatMonitoringService heartbeatMonitoringService = mock(HeartbeatMonitoringService.class); + doCallRealMethod().when(heartbeatMonitoringService).getHeartbeatInfo(anyString(), anyInt(), anyBoolean()); + heartbeatMonitoringService.getHeartbeatInfo("", -1, false); + Mockito.verify(heartbeatMonitoringService, Mockito.times(2)) + .getHeartbeatInfoFromMap(any(), anyString(), anyLong(), anyString(), anyInt(), anyBoolean()); + } + + @Test + public void testGetHeartbeatInfoFromMap() { + HeartbeatMonitoringService heartbeatMonitoringService = mock(HeartbeatMonitoringService.class); + doCallRealMethod().when(heartbeatMonitoringService) + .getHeartbeatInfoFromMap(anyMap(), anyString(), anyLong(), anyString(), anyInt(), anyBoolean()); + Map>>>> leaderMap = + new VeniceConcurrentHashMap<>(); + String store = "testStore"; + int version = 1; + int partition = 1; + String region = "dc-0"; + long timestamp = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5); + leaderMap.put(store, new VeniceConcurrentHashMap<>()); + leaderMap.get(store).put(version, new VeniceConcurrentHashMap<>()); + leaderMap.get(store).get(version).put(partition, new VeniceConcurrentHashMap<>()); + leaderMap.get(store).get(version).get(partition).put(region, new MutablePair<>(timestamp, true)); + Assert.assertEquals( + heartbeatMonitoringService + .getHeartbeatInfoFromMap( + leaderMap, + LeaderFollowerStateType.LEADER.name(), + System.currentTimeMillis(), + Version.composeKafkaTopic(store, version), + -1, + false) + .size(), + 1); + Assert.assertEquals( + heartbeatMonitoringService + .getHeartbeatInfoFromMap( + leaderMap, + LeaderFollowerStateType.LEADER.name(), + System.currentTimeMillis(), + Version.composeKafkaTopic(store, version), + -1, + true) + .size(), + 0); + Assert.assertEquals( + heartbeatMonitoringService + .getHeartbeatInfoFromMap( + leaderMap, + LeaderFollowerStateType.LEADER.name(), + System.currentTimeMillis(), + Version.composeKafkaTopic(store, version), + -1, + false) + .size(), + 1); + Assert.assertEquals( + heartbeatMonitoringService + .getHeartbeatInfoFromMap( + leaderMap, + LeaderFollowerStateType.LEADER.name(), + System.currentTimeMillis(), + Version.composeKafkaTopic(store, version), + 1, + false) + .size(), + 1); + Assert.assertEquals( + heartbeatMonitoringService + .getHeartbeatInfoFromMap( + leaderMap, + LeaderFollowerStateType.LEADER.name(), + System.currentTimeMillis(), + Version.composeKafkaTopic(store, version), + 2, + false) + .size(), + 0); + Assert.assertEquals( + heartbeatMonitoringService + .getHeartbeatInfoFromMap( + leaderMap, + LeaderFollowerStateType.LEADER.name(), + System.currentTimeMillis(), + Version.composeKafkaTopic(store, 2), + -1, + false) + .size(), + 0); + Assert.assertEquals( + heartbeatMonitoringService + .getHeartbeatInfoFromMap( + leaderMap, + LeaderFollowerStateType.LEADER.name(), + System.currentTimeMillis(), + Version.composeKafkaTopic(store, version), + 1, + false) + .size(), + 1); + + } + @Test public void testAddLeaderLagMonitor() { @@ -38,7 +158,7 @@ public void testAddLeaderLagMonitor() { currentVersion.setActiveActiveReplicationEnabled(true); - Store mockStore = Mockito.mock(Store.class); + Store mockStore = mock(Store.class); Mockito.when(mockStore.getName()).thenReturn(TEST_STORE); Mockito.when(mockStore.getCurrentVersion()).thenReturn(currentVersion.getNumber()); Mockito.when(mockStore.getHybridStoreConfig()).thenReturn(hybridStoreConfig); @@ -47,7 +167,7 @@ public void testAddLeaderLagMonitor() { Mockito.when(mockStore.getVersion(3)).thenReturn(futureVersion); MetricsRepository mockMetricsRepository = new MetricsRepository(); - ReadOnlyStoreRepository mockReadOnlyRepository = Mockito.mock(ReadOnlyStoreRepository.class); + ReadOnlyStoreRepository mockReadOnlyRepository = mock(ReadOnlyStoreRepository.class); Mockito.when(mockReadOnlyRepository.getStoreOrThrow(TEST_STORE)).thenReturn(mockStore); Set regions = new HashSet<>(); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/IngestionMetadataRetrieverDelegatorTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/IngestionMetadataRetrieverDelegatorTest.java new file mode 100644 index 0000000000..e24d333f02 --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/IngestionMetadataRetrieverDelegatorTest.java @@ -0,0 +1,32 @@ +package com.linkedin.davinci.storage; + +import static org.mockito.Mockito.mock; + +import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService; +import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService; +import com.linkedin.venice.utils.ComplementSet; +import org.mockito.Mockito; +import org.testng.annotations.Test; + + +public class IngestionMetadataRetrieverDelegatorTest { + @Test + public void testInterface() { + KafkaStoreIngestionService kafkaStoreIngestionService = mock(KafkaStoreIngestionService.class); + HeartbeatMonitoringService heartbeatMonitoringService = mock(HeartbeatMonitoringService.class); + IngestionMetadataRetrieverDelegator ingestionMetadataRetrieverDelegator = + new IngestionMetadataRetrieverDelegator(kafkaStoreIngestionService, heartbeatMonitoringService); + ingestionMetadataRetrieverDelegator.getHeartbeatLag("", -1, false); + Mockito.verify(heartbeatMonitoringService, Mockito.times(1)).getHeartbeatInfo("", -1, false); + ingestionMetadataRetrieverDelegator.getStoreVersionCompressionDictionary("dummy_v1"); + Mockito.verify(kafkaStoreIngestionService, Mockito.times(1)).getStoreVersionCompressionDictionary("dummy_v1"); + ingestionMetadataRetrieverDelegator.getTopicPartitionIngestionContext("dummy_v1", "dummy_rt", 1); + Mockito.verify(kafkaStoreIngestionService, Mockito.times(1)) + .getTopicPartitionIngestionContext("dummy_v1", "dummy_rt", 1); + ComplementSet integerComplementSet = ComplementSet.of(1); + ingestionMetadataRetrieverDelegator.getConsumptionSnapshots("dummy_v1", integerComplementSet); + Mockito.verify(kafkaStoreIngestionService, Mockito.times(1)) + .getConsumptionSnapshots("dummy_v1", integerComplementSet); + ; + } +} diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java index 58aa87318b..9dc39feb8d 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java @@ -15,7 +15,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.linkedin.davinci.listener.response.TopicPartitionIngestionContextResponse; +import com.linkedin.davinci.listener.response.ReplicaIngestionResponse; import com.linkedin.venice.admin.protocol.response.AdminResponseRecord; import com.linkedin.venice.client.exceptions.VeniceClientException; import com.linkedin.venice.client.store.ClientConfig; @@ -576,6 +576,9 @@ public static void main(String[] args) throws Exception { case AGGREGATED_HEALTH_STATUS: getAggregatedHealthStatus(cmd); break; + case DUMP_HOST_HEARTBEAT: + dumpHostHeartbeat(cmd); + break; default: StringJoiner availableCommands = new StringJoiner(", "); for (Command c: Command.values()) { @@ -3074,6 +3077,43 @@ private static void dumpTopicPartitionIngestionContext(CommandLine cmd) throws E } } + static void dumpHostHeartbeatLag( + TransportClient transportClient, + String topicFilter, + String partitionFilter, + String lagFilter) throws Exception { + String topicName = topicFilter == null ? "" : topicFilter; + String partition = partitionFilter == null ? "-1" : partitionFilter; + String filterLag = lagFilter == null ? "false" : lagFilter; + StringBuilder sb = new StringBuilder(QueryAction.HOST_HEARTBEAT_LAG.toString().toLowerCase()).append("/") + .append(topicName) + .append("/") + .append(partition) + .append("/") + .append(filterLag); + String requestUrl = sb.toString(); + byte[] responseBody; + TransportClientResponse transportClientResponse = transportClient.get(requestUrl).get(); + responseBody = transportClientResponse.getBody(); + ReplicaIngestionResponse currentVersionResponse = + OBJECT_MAPPER.readValue(responseBody, ReplicaIngestionResponse.class); + System.out.println(new String(currentVersionResponse.getPayload())); + } + + private static void dumpHostHeartbeat(CommandLine cmd) throws Exception { + TransportClient transportClient = null; + try { + transportClient = getTransportClientForServer("dummy", getRequiredArgument(cmd, Arg.SERVER_URL)); + dumpHostHeartbeatLag( + transportClient, + getRequiredArgument(cmd, Arg.KAFKA_TOPIC_NAME), + getOptionalArgument(cmd, Arg.PARTITION), + getOptionalArgument(cmd, Arg.LAG_FILTER_ENABLED)); + } finally { + Utils.closeQuietlyWithErrorLogged(transportClient); + } + } + private static void migrateVeniceZKPaths(CommandLine cmd) throws Exception { Set clusterNames = Utils.parseCommaSeparatedStringToSet(getRequiredArgument(cmd, Arg.CLUSTER_LIST)); String srcZKUrl = getRequiredArgument(cmd, Arg.SRC_ZOOKEEPER_URL); @@ -3166,9 +3206,9 @@ static void dumpTopicPartitionIngestionContext( byte[] responseBody; TransportClientResponse transportClientResponse = transportClient.get(requestUrl).get(); responseBody = transportClientResponse.getBody(); - TopicPartitionIngestionContextResponse currentVersionResponse = - OBJECT_MAPPER.readValue(responseBody, TopicPartitionIngestionContextResponse.class); - System.out.println(new String(currentVersionResponse.getTopicPartitionIngestionContext())); + ReplicaIngestionResponse currentVersionResponse = + OBJECT_MAPPER.readValue(responseBody, ReplicaIngestionResponse.class); + System.out.println(new String(currentVersionResponse.getPayload())); } static void getAndPrintRequestBasedMetadata( diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java index 3e8b22970b..c121c4e320 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java @@ -284,7 +284,8 @@ public enum Arg { "nearline-producer-count-per-writer", "npcpw", true, "How many producers will be used to write nearline workload in Server" ), INSTANCES("instances", "in", true, "Input list of helix ids of nodes to check if they can removed or not"), - TO_BE_STOPPED_NODES("to-be-stopped-nodes", "tbsn", true, "List of helix ids of nodes assumed to be stopped"); + TO_BE_STOPPED_NODES("to-be-stopped-nodes", "tbsn", true, "List of helix ids of nodes assumed to be stopped"), + LAG_FILTER_ENABLED("lag-filter-enabled", "lfe", true, "Enable heartbeat lag filter for a heartbeat request"); private final String argName; private final String first; diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java index 1526a1b9fa..51512e6053 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java @@ -63,6 +63,7 @@ import static com.linkedin.venice.Arg.KAFKA_TOPIC_RETENTION_IN_MS; import static com.linkedin.venice.Arg.KEY; import static com.linkedin.venice.Arg.KEY_SCHEMA; +import static com.linkedin.venice.Arg.LAG_FILTER_ENABLED; import static com.linkedin.venice.Arg.LARGEST_USED_VERSION_NUMBER; import static com.linkedin.venice.Arg.LATEST_SUPERSET_SCHEMA_ID; import static com.linkedin.venice.Arg.MAX_COMPACTION_LAG_SECONDS; @@ -551,6 +552,11 @@ public enum Command { "cluster-health-status", "Returns the set of instances which can be safely remove and instances which cannot be removed.", new Arg[] { URL, CLUSTER, INSTANCES, TO_BE_STOPPED_NODES } + ), + DUMP_HOST_HEARTBEAT( + "dump-host-heartbeat", + "Dump all heartbeat belong to a certain storage node. You can use topic/partition to filter specific resource, and you can choose to filter resources that are lagging.", + new Arg[] { SERVER_URL, KAFKA_TOPIC_NAME }, new Arg[] { PARTITION, LAG_FILTER_ENABLED } ); private final String commandName; @@ -624,7 +630,7 @@ public static Command getCommand(String name, CommandLine cmdLine) { List candidateCommands = Arrays.stream(Command.values()) .filter( command -> Arrays.stream(command.getRequiredArgs()).allMatch(arg -> cmdLine.hasOption(arg.toString()))) - .map(commmand -> "--" + commmand.toString()) + .map(command -> "--" + command) .collect(Collectors.toList()); if (!candidateCommands.isEmpty()) { throw new VeniceException( diff --git a/internal/venice-common/src/main/java/com/linkedin/davinci/listener/response/ReplicaIngestionResponse.java b/internal/venice-common/src/main/java/com/linkedin/davinci/listener/response/ReplicaIngestionResponse.java new file mode 100644 index 0000000000..474c1d217d --- /dev/null +++ b/internal/venice-common/src/main/java/com/linkedin/davinci/listener/response/ReplicaIngestionResponse.java @@ -0,0 +1,34 @@ +package com.linkedin.davinci.listener.response; + +public class ReplicaIngestionResponse { + private boolean isError; + private byte[] payload; + private String message; + + public ReplicaIngestionResponse() { + } + + public void setPayload(byte[] payload) { + this.payload = payload; + } + + public void setError(boolean error) { + this.isError = error; + } + + public boolean isError() { + return this.isError; + } + + public void setMessage(String message) { + this.message = message; + } + + public String getMessage() { + return this.message; + } + + public byte[] getPayload() { + return payload; + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/davinci/listener/response/TopicPartitionIngestionContextResponse.java b/internal/venice-common/src/main/java/com/linkedin/davinci/listener/response/TopicPartitionIngestionContextResponse.java deleted file mode 100644 index c30cff6d83..0000000000 --- a/internal/venice-common/src/main/java/com/linkedin/davinci/listener/response/TopicPartitionIngestionContextResponse.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.linkedin.davinci.listener.response; - -public class TopicPartitionIngestionContextResponse { - private boolean isError; - private byte[] topicPartitionIngestionContext; - private String message; - - public TopicPartitionIngestionContextResponse() { - } - - public void setTopicPartitionIngestionContext(byte[] topicPartitionIngestionContext) { - this.topicPartitionIngestionContext = topicPartitionIngestionContext; - } - - public void setError(boolean error) { - this.isError = error; - } - - public boolean isError() { - return this.isError; - } - - public void setMessage(String message) { - this.message = message; - } - - public String getMessage() { - return this.message; - } - - public byte[] getTopicPartitionIngestionContext() { - return topicPartitionIngestionContext; - } -} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/QueryAction.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/QueryAction.java index 3f6c643ae4..3591cd78ee 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/QueryAction.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/QueryAction.java @@ -26,5 +26,9 @@ public enum QueryAction { CURRENT_VERSION, // TOPIC_PARTITION_INGESTION_CONTEXT is a GET request to /version topic/topic/partition from server admin tool - TOPIC_PARTITION_INGESTION_CONTEXT + TOPIC_PARTITION_INGESTION_CONTEXT, + + // HOST_HEARTBEAT_LAG is a GET request to /(optional) version topic filter/(optional) partition filter/(optional) + // lagging replica filter from server admin tool. + HOST_HEARTBEAT_LAG } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java index b6dd7bd164..ebab678294 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java @@ -51,7 +51,7 @@ import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService; import com.linkedin.davinci.kafka.consumer.StoreIngestionTaskBackdoor; import com.linkedin.davinci.kafka.consumer.TopicPartitionIngestionInfo; -import com.linkedin.davinci.listener.response.TopicPartitionIngestionContextResponse; +import com.linkedin.davinci.listener.response.ReplicaIngestionResponse; import com.linkedin.davinci.replication.RmdWithValueSchemaId; import com.linkedin.davinci.replication.merge.RmdSerDe; import com.linkedin.davinci.replication.merge.StringAnnotatedStoreSchemaCache; @@ -1454,14 +1454,14 @@ private void verifyConsumerThreadPoolFor( .getVeniceServers()) { KafkaStoreIngestionService kafkaStoreIngestionService = serverWrapper.getVeniceServer().getKafkaStoreIngestionService(); - TopicPartitionIngestionContextResponse topicPartitionIngestionContextResponse = + ReplicaIngestionResponse replicaIngestionResponse = kafkaStoreIngestionService.getTopicPartitionIngestionContext( versionTopic.getName(), pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber()); try { - Map> topicPartitionIngestionContexts = VENICE_JSON_SERIALIZER - .deserialize(topicPartitionIngestionContextResponse.getTopicPartitionIngestionContext(), ""); + Map> topicPartitionIngestionContexts = + VENICE_JSON_SERIALIZER.deserialize(replicaIngestionResponse.getPayload(), ""); if (!topicPartitionIngestionContexts.isEmpty()) { int regionCount = 0; for (Map.Entry> entry: topicPartitionIngestionContexts diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestDumpIngestionContext.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestDumpIngestionContext.java new file mode 100644 index 0000000000..d6dc2bd365 --- /dev/null +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestDumpIngestionContext.java @@ -0,0 +1,161 @@ +package com.linkedin.venice.endToEnd; + +import static com.linkedin.venice.utils.TestUtils.assertCommand; +import static com.linkedin.venice.utils.TestWriteUtils.loadFileAsString; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; + +import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; +import com.linkedin.davinci.kafka.consumer.KafkaConsumerServiceDelegator; +import com.linkedin.davinci.kafka.consumer.ReplicaHeartbeatInfo; +import com.linkedin.venice.AdminTool; +import com.linkedin.venice.ConfigKeys; +import com.linkedin.venice.compression.CompressionStrategy; +import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.controllerapi.ControllerResponse; +import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; +import com.linkedin.venice.controllerapi.VersionCreationResponse; +import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.integration.utils.ServiceFactory; +import com.linkedin.venice.integration.utils.VeniceClusterWrapper; +import com.linkedin.venice.integration.utils.VeniceControllerWrapper; +import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper; +import com.linkedin.venice.integration.utils.VeniceServerWrapper; +import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.utils.TestUtils; +import com.linkedin.venice.utils.Utils; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.avro.Schema; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestDumpIngestionContext { + private static final Logger LOGGER = LogManager.getLogger(TestDumpIngestionContext.class); + private static final int NUMBER_OF_CHILD_DATACENTERS = 2; + private static final int NUMBER_OF_CLUSTERS = 1; + private static final int TEST_TIMEOUT_MS = 180_000; + + private static final int REPLICATION_FACTOR = 2; + private static final String CLUSTER_NAME = "venice-cluster0"; + + private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper; + private VeniceControllerWrapper parentController; + private List childDatacenters; + + @BeforeClass(alwaysRun = true) + public void setUp() { + Properties serverProperties = new Properties(); + serverProperties.put(ConfigKeys.SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED, true); + serverProperties.put( + ConfigKeys.SERVER_CONSUMER_POOL_ALLOCATION_STRATEGY, + KafkaConsumerServiceDelegator.ConsumerPoolStrategyType.CURRENT_VERSION_PRIORITIZATION.name()); + Properties controllerProps = new Properties(); + controllerProps.put(ConfigKeys.CONTROLLER_AUTO_MATERIALIZE_META_SYSTEM_STORE, false); + this.multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper( + NUMBER_OF_CHILD_DATACENTERS, + NUMBER_OF_CLUSTERS, + 1, + 1, + 2, + 1, + REPLICATION_FACTOR, + Optional.of(controllerProps), + Optional.of(controllerProps), + Optional.of(serverProperties), + false); + this.childDatacenters = multiRegionMultiClusterWrapper.getChildRegions(); + List parentControllers = multiRegionMultiClusterWrapper.getParentControllers(); + this.parentController = parentControllers.get(0); + } + + @Test(timeOut = TEST_TIMEOUT_MS) + public void testDumpHostHeartbeatLag() { + final String storeName = Utils.getUniqueString("dumpInfo"); + String parentControllerUrl = parentController.getControllerUrl(); + Schema keySchema = AvroCompatibilityHelper.parse(loadFileAsString("UserKey.avsc")); + Schema valueSchema = AvroCompatibilityHelper.parse(loadFileAsString("UserValue.avsc")); + + try (ControllerClient parentControllerClient = new ControllerClient(CLUSTER_NAME, parentControllerUrl)) { + assertCommand( + parentControllerClient.createNewStore(storeName, "test_owner", keySchema.toString(), valueSchema.toString())); + UpdateStoreQueryParams updateStoreParams = + new UpdateStoreQueryParams().setStorageQuotaInByte(Store.UNLIMITED_STORAGE_QUOTA) + .setCompressionStrategy(CompressionStrategy.NO_OP) + .setActiveActiveReplicationEnabled(true) + .setWriteComputationEnabled(true) + .setHybridRewindSeconds(86400L) + .setHybridOffsetLagThreshold(10L); + ControllerResponse updateStoreResponse = + parentControllerClient.retryableRequest(5, c -> c.updateStore(storeName, updateStoreParams)); + assertFalse(updateStoreResponse.isError(), "Update store got error: " + updateStoreResponse.getError()); + + VersionCreationResponse response = parentControllerClient.emptyPush(storeName, "test_push_id", 1000); + assertEquals(response.getVersion(), 1); + assertFalse(response.isError(), "Empty push to parent colo should succeed"); + TestUtils.waitForNonDeterministicPushCompletion( + Version.composeKafkaTopic(storeName, 1), + parentControllerClient, + 30, + TimeUnit.SECONDS); + + VeniceClusterWrapper veniceCluster = childDatacenters.get(0).getClusters().get(CLUSTER_NAME); + VeniceServerWrapper serverWrapper = veniceCluster.getVeniceServers().get(0); + + Map heartbeatInfoMap = serverWrapper.getVeniceServer() + .getHeartbeatMonitoringService() + .getHeartbeatInfo(Version.composeKafkaTopic(storeName, 1), -1, false); + LOGGER.info("Heartbeat Info:\n" + heartbeatInfoMap); + int totalReplicaCount = heartbeatInfoMap.size(); + + heartbeatInfoMap = serverWrapper.getVeniceServer() + .getHeartbeatMonitoringService() + .getHeartbeatInfo(Version.composeKafkaTopic(storeName, 1), -1, false); + LOGGER.info("Heartbeat Info with topic filtering:\n" + heartbeatInfoMap); + Assert.assertEquals(heartbeatInfoMap.keySet().stream().filter(x -> x.endsWith("dc-0")).count(), 3); + Assert.assertEquals( + heartbeatInfoMap.keySet().stream().filter(x -> x.endsWith("dc-1")).count() * 2, + heartbeatInfoMap.values().stream().filter(x -> x.getLeaderState().equals("LEADER")).count()); + + heartbeatInfoMap = serverWrapper.getVeniceServer() + .getHeartbeatMonitoringService() + .getHeartbeatInfo(Version.composeKafkaTopic(storeName, 1), 2, false); + LOGGER.info("Heartbeat Info with topic/partition filtering:\n" + heartbeatInfoMap); + Assert.assertTrue( + heartbeatInfoMap.keySet() + .stream() + .allMatch(x -> x.startsWith(Version.composeKafkaTopic(storeName, 1) + "-2"))); + + heartbeatInfoMap = serverWrapper.getVeniceServer() + .getHeartbeatMonitoringService() + .getHeartbeatInfo(Version.composeKafkaTopic(storeName, 1), 2, false); + Assert.assertNotEquals(heartbeatInfoMap.size(), totalReplicaCount); + + heartbeatInfoMap = serverWrapper.getVeniceServer() + .getHeartbeatMonitoringService() + .getHeartbeatInfo(Version.composeKafkaTopic(storeName, 1), -1, true); + LOGGER.info("Heartbeat Info with lag filtering:\n" + heartbeatInfoMap); + Assert.assertTrue(heartbeatInfoMap.isEmpty()); + + // Print out for display only. + String serverUrl = "http://" + serverWrapper.getHost() + ":" + serverWrapper.getPort(); + + String[] args = { "--dump-host-heartbeat", "--server-url", serverUrl, "--kafka-topic-name", + Version.composeKafkaTopic(storeName, 1) }; + try { + AdminTool.main(args); + } catch (Exception e) { + throw new VeniceException(e); + } + } + } +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java index 669abfa9fe..2f91fdc582 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java @@ -14,6 +14,7 @@ import static com.linkedin.venice.ConfigKeys.LISTENER_PORT; import static com.linkedin.venice.ConfigKeys.LOCAL_CONTROLLER_D2_SERVICE_NAME; import static com.linkedin.venice.ConfigKeys.LOCAL_D2_ZK_HOST; +import static com.linkedin.venice.ConfigKeys.LOCAL_REGION_NAME; import static com.linkedin.venice.ConfigKeys.MAX_ONLINE_OFFLINE_STATE_TRANSITION_THREAD_NUMBER; import static com.linkedin.venice.ConfigKeys.PARTICIPANT_MESSAGE_CONSUMPTION_DELAY_MS; import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE; @@ -224,6 +225,7 @@ static StatefulServiceProvider generateService( PropertyBuilder serverPropsBuilder = new PropertyBuilder().put(LISTENER_PORT, listenPort) .put(ADMIN_PORT, TestUtils.getFreePort()) .put(DATA_BASE_PATH, dataDirectory.getAbsolutePath()) + .put(LOCAL_REGION_NAME, regionName) .put(ENABLE_SERVER_ALLOW_LIST, enableServerAllowlist) .put(SERVER_REST_SERVICE_STORAGE_THREAD_NUM, 4) .put(MAX_ONLINE_OFFLINE_STATE_TRANSITION_THREAD_NUMBER, 100) diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/OutboundHttpWrapperHandler.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/OutboundHttpWrapperHandler.java index d047bd9828..12a3e8e5bf 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/OutboundHttpWrapperHandler.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/OutboundHttpWrapperHandler.java @@ -10,8 +10,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.davinci.listener.response.AdminResponse; import com.linkedin.davinci.listener.response.MetadataResponse; +import com.linkedin.davinci.listener.response.ReplicaIngestionResponse; import com.linkedin.davinci.listener.response.ServerCurrentVersionResponse; -import com.linkedin.davinci.listener.response.TopicPartitionIngestionContextResponse; import com.linkedin.venice.HttpConstants; import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.listener.response.AbstractReadResponse; @@ -140,13 +140,12 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) responseStatus = ((DefaultFullHttpResponse) msg).getStatus(); response = (DefaultFullHttpResponse) msg; body = response.content(); - } else if (msg instanceof TopicPartitionIngestionContextResponse) { - TopicPartitionIngestionContextResponse topicPartitionIngestionContextResponse = - (TopicPartitionIngestionContextResponse) msg; - if (!topicPartitionIngestionContextResponse.isError()) { - body = Unpooled.wrappedBuffer(OBJECT_MAPPER.writeValueAsBytes(topicPartitionIngestionContextResponse)); + } else if (msg instanceof ReplicaIngestionResponse) { + ReplicaIngestionResponse replicaIngestionResponse = (ReplicaIngestionResponse) msg; + if (!replicaIngestionResponse.isError()) { + body = Unpooled.wrappedBuffer(OBJECT_MAPPER.writeValueAsBytes(replicaIngestionResponse)); } else { - String errorMessage = topicPartitionIngestionContextResponse.getMessage(); + String errorMessage = replicaIngestionResponse.getMessage(); if (errorMessage == null) { errorMessage = "Unknown error"; } diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/RouterRequestHttpHandler.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/RouterRequestHttpHandler.java index 3b4918037a..4bf540dad2 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/RouterRequestHttpHandler.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/RouterRequestHttpHandler.java @@ -7,6 +7,7 @@ import com.linkedin.venice.listener.request.DictionaryFetchRequest; import com.linkedin.venice.listener.request.GetRouterRequest; import com.linkedin.venice.listener.request.HealthCheckRequest; +import com.linkedin.venice.listener.request.HeartbeatRequest; import com.linkedin.venice.listener.request.MetadataFetchRequest; import com.linkedin.venice.listener.request.MultiGetRouterRequestWrapper; import com.linkedin.venice.listener.request.RouterRequest; @@ -143,6 +144,12 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) thro statsHandler.setStoreName( Version.parseStoreFromVersionTopic(topicPartitionIngestionContextRequest.getVersionTopic())); ctx.fireChannelRead(topicPartitionIngestionContextRequest); + break; + case HOST_HEARTBEAT_LAG: + statsHandler.setMetadataRequest(true); + HeartbeatRequest heartbeatRequest = HeartbeatRequest.parseGetHttpRequest(uri.getPath(), requestParts); + ctx.fireChannelRead(heartbeatRequest); + break; default: throw new VeniceException("Unrecognized query action"); } diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerStoreAclHandler.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerStoreAclHandler.java index b5344aa264..3201fc8a89 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerStoreAclHandler.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerStoreAclHandler.java @@ -42,8 +42,8 @@ public class ServerStoreAclHandler extends AbstractStoreAclHandler private final static Logger LOGGER = LogManager.getLogger(ServerStoreAclHandler.class); /** - * Skip ACL for requests to /metadata, /admin, /current_version, /health and /topic_partition_ingestion_context - * as there's no sensitive information in the response. + * Skip ACL for requests to /metadata, /admin, /current_version, /health, /topic_partition_ingestion_context and + * /host_heartbeat_lag as there's no sensitive information in the response. */ private static final Set QUERIES_TO_SKIP_ACL = new HashSet<>( Arrays.asList( @@ -51,7 +51,8 @@ public class ServerStoreAclHandler extends AbstractStoreAclHandler QueryAction.ADMIN, QueryAction.HEALTH, QueryAction.CURRENT_VERSION, - QueryAction.TOPIC_PARTITION_INGESTION_CONTEXT)); + QueryAction.TOPIC_PARTITION_INGESTION_CONTEXT, + QueryAction.HOST_HEARTBEAT_LAG)); public ServerStoreAclHandler( IdentityParser identityParser, diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/StorageReadRequestHandler.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/StorageReadRequestHandler.java index 90ba8a49a1..cc9c0bc6e5 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/StorageReadRequestHandler.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/StorageReadRequestHandler.java @@ -7,8 +7,8 @@ import com.linkedin.davinci.listener.response.MetadataResponse; import com.linkedin.davinci.listener.response.ReadResponse; import com.linkedin.davinci.listener.response.ReadResponseStats; +import com.linkedin.davinci.listener.response.ReplicaIngestionResponse; import com.linkedin.davinci.listener.response.ServerCurrentVersionResponse; -import com.linkedin.davinci.listener.response.TopicPartitionIngestionContextResponse; import com.linkedin.davinci.storage.DiskHealthCheckService; import com.linkedin.davinci.storage.IngestionMetadataRetriever; import com.linkedin.davinci.storage.ReadMetadataRetriever; @@ -36,6 +36,7 @@ import com.linkedin.venice.listener.request.DictionaryFetchRequest; import com.linkedin.venice.listener.request.GetRouterRequest; import com.linkedin.venice.listener.request.HealthCheckRequest; +import com.linkedin.venice.listener.request.HeartbeatRequest; import com.linkedin.venice.listener.request.MetadataFetchRequest; import com.linkedin.venice.listener.request.MultiGetRouterRequestWrapper; import com.linkedin.venice.listener.request.MultiKeyRouterRequestWrapper; @@ -371,9 +372,12 @@ public void channelRead(ChannelHandlerContext context, Object message) throws Ex ServerCurrentVersionResponse response = handleCurrentVersionRequest((CurrentVersionRequest) message); context.writeAndFlush(response); } else if (message instanceof TopicPartitionIngestionContextRequest) { - TopicPartitionIngestionContextResponse response = + ReplicaIngestionResponse response = handleTopicPartitionIngestionContextRequest((TopicPartitionIngestionContextRequest) message); context.writeAndFlush(response); + } else if (message instanceof HeartbeatRequest) { + ReplicaIngestionResponse response = handleHeartbeatRequest((HeartbeatRequest) message); + context.writeAndFlush(response); } else { context.writeAndFlush( new HttpShortcutResponse( @@ -844,11 +848,18 @@ private AdminResponse handleServerAdminRequest(AdminRequest adminRequest) { } } - private TopicPartitionIngestionContextResponse handleTopicPartitionIngestionContextRequest( + private ReplicaIngestionResponse handleTopicPartitionIngestionContextRequest( TopicPartitionIngestionContextRequest topicPartitionIngestionContextRequest) { Integer partition = topicPartitionIngestionContextRequest.getPartition(); String versionTopic = topicPartitionIngestionContextRequest.getVersionTopic(); String topicName = topicPartitionIngestionContextRequest.getTopic(); return ingestionMetadataRetriever.getTopicPartitionIngestionContext(versionTopic, topicName, partition); } + + private ReplicaIngestionResponse handleHeartbeatRequest(HeartbeatRequest heartbeatRequest) { + return ingestionMetadataRetriever.getHeartbeatLag( + heartbeatRequest.getTopic(), + heartbeatRequest.getPartition(), + heartbeatRequest.isFilterLagReplica()); + } } diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/request/HeartbeatRequest.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/request/HeartbeatRequest.java new file mode 100644 index 0000000000..76d3e1afef --- /dev/null +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/request/HeartbeatRequest.java @@ -0,0 +1,45 @@ +package com.linkedin.venice.listener.request; + +import com.linkedin.venice.exceptions.VeniceException; + + +public class HeartbeatRequest { + private final String topic; + private final int partition; + private final boolean filterLagReplica; + + private HeartbeatRequest(String topic, int partition, boolean filterLagReplica) { + this.topic = topic; + this.partition = partition; + this.filterLagReplica = filterLagReplica; + } + + public String getTopic() { + return topic; + } + + public int getPartition() { + return partition; + } + + public boolean isFilterLagReplica() { + return filterLagReplica; + } + + public static HeartbeatRequest parseGetHttpRequest(String uri, String[] requestParts) { + if (requestParts.length == 5) { + // [0]""/[1]"action"/[2]"topic"/[3]"partition"/[4]"filter lagging flag" + try { + String topic = requestParts[2]; + int partition = Integer.parseInt(requestParts[3]); + boolean filterLagReplica = Boolean.parseBoolean(requestParts[4]); + return new HeartbeatRequest(topic, partition, filterLagReplica); + } catch (Exception e) { + throw new VeniceException("Unable to parse request for a HeartbeatRequest action: " + uri); + } + } else { + throw new VeniceException("not a valid request for a HeartbeatRequest action: " + uri); + } + } + +} diff --git a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java index d3a76cf791..2611f92dec 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java @@ -16,6 +16,7 @@ import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService; import com.linkedin.davinci.storage.DiskHealthCheckService; import com.linkedin.davinci.storage.IngestionMetadataRetriever; +import com.linkedin.davinci.storage.IngestionMetadataRetrieverDelegator; import com.linkedin.davinci.storage.ReadMetadataRetriever; import com.linkedin.davinci.storage.StorageEngineMetadataService; import com.linkedin.davinci.storage.StorageEngineRepository; @@ -432,7 +433,7 @@ private List createServices() { metadataRepo, storeValueSchemasCacheService, customizedViewFuture, - kafkaStoreIngestionService, + new IngestionMetadataRetrieverDelegator(kafkaStoreIngestionService, heartbeatMonitoringService), serverReadMetadataRepository, serverConfig, metricsRepository, @@ -547,6 +548,14 @@ public HelixParticipationService getHelixParticipationService() { throw new VeniceException("Cannot get helix participation service if server is not started"); } + public HeartbeatMonitoringService getHeartbeatMonitoringService() { + if (isStarted()) { + return heartbeatMonitoringService; + } else { + throw new VeniceException("Cannot get heartbeat monitoring service if server is not started"); + } + } + /** * @return true if the {@link VeniceServer} and all of its inner services are fully started * false if the {@link VeniceServer} was not started or if any of its inner services diff --git a/services/venice-server/src/test/java/com/linkedin/venice/listener/OutboundHttpWrapperHandlerTest.java b/services/venice-server/src/test/java/com/linkedin/venice/listener/OutboundHttpWrapperHandlerTest.java index 3478c32fff..0d3e3932e7 100644 --- a/services/venice-server/src/test/java/com/linkedin/venice/listener/OutboundHttpWrapperHandlerTest.java +++ b/services/venice-server/src/test/java/com/linkedin/venice/listener/OutboundHttpWrapperHandlerTest.java @@ -14,8 +14,8 @@ import com.google.protobuf.ByteString; import com.linkedin.davinci.listener.response.MetadataResponse; import com.linkedin.davinci.listener.response.ReadResponse; +import com.linkedin.davinci.listener.response.ReplicaIngestionResponse; import com.linkedin.davinci.listener.response.ServerCurrentVersionResponse; -import com.linkedin.davinci.listener.response.TopicPartitionIngestionContextResponse; import com.linkedin.venice.HttpConstants; import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.listener.grpc.GrpcRequestContext; @@ -194,14 +194,14 @@ public void testGrpcWrite() { @Test public void testWriteTopicPartitionIngestionContextResponse() throws JsonProcessingException { - TopicPartitionIngestionContextResponse msg = new TopicPartitionIngestionContextResponse(); + ReplicaIngestionResponse msg = new ReplicaIngestionResponse(); String topic = "test_store_v1"; int expectedPartitionId = 12345; String jsonStr = "{\n" + "\"kafkaUrl\" : {\n" + " TP(topic: \"" + topic + "\", partition: " + expectedPartitionId + ") : {\n" + " \"latestOffset\" : 0,\n" + " \"offsetLag\" : 1,\n" + " \"msgRate\" : 2.0,\n" + " \"byteRate\" : 4.0,\n" + " \"consumerIdx\" : 6,\n" + " \"elapsedTimeSinceLastPollInMs\" : 7\n" + " }\n" + " }\n" + "}"; - msg.setTopicPartitionIngestionContext(jsonStr.getBytes()); + msg.setPayload(jsonStr.getBytes()); StatsHandler statsHandler = mock(StatsHandler.class); ChannelHandlerContext mockCtx = mock(ChannelHandlerContext.class); ByteBuf body = Unpooled.wrappedBuffer(OBJECT_MAPPER.writeValueAsBytes(msg)); diff --git a/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerStoreAclHandlerTest.java b/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerStoreAclHandlerTest.java index d5976bfff4..a39db2d037 100644 --- a/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerStoreAclHandlerTest.java +++ b/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerStoreAclHandlerTest.java @@ -58,12 +58,13 @@ public class ServerStoreAclHandlerTest { // Store name can be in a version topic format private static final String TEST_STORE_NAME = "testStore_v1"; private static final String TEST_STORE_VERSION = Version.composeKafkaTopic(TEST_STORE_NAME, 1); + private static final int TEST_STORE_PARTITION = 1; /** * Mock access controller to verify basic request parsing and handling for {@link ServerStoreAclHandler} */ private static class MockAccessController implements DynamicAccessController { - private QueryAction queryAction; + private final QueryAction queryAction; public MockAccessController(QueryAction queryAction) { this.queryAction = queryAction; @@ -249,6 +250,7 @@ public void testAllRequestTypes() throws SSLPeerUnverifiedException, AclExceptio case HEALTH: case METADATA: case TOPIC_PARTITION_INGESTION_CONTEXT: + case HOST_HEARTBEAT_LAG: verify(spyMockAccessController, never()).hasAccess(any(), any(), any()); break; case STORAGE: @@ -282,6 +284,9 @@ private String buildTestURI(QueryAction queryAction) { case TOPIC_PARTITION_INGESTION_CONTEXT: return "/" + QueryAction.TOPIC_PARTITION_INGESTION_CONTEXT.toString().toLowerCase() + "/" + TEST_STORE_VERSION + "/" + TEST_STORE_VERSION + "/1"; + case HOST_HEARTBEAT_LAG: + return "/" + QueryAction.HOST_HEARTBEAT_LAG.toString().toLowerCase() + "/" + TEST_STORE_VERSION + "/" + + TEST_STORE_PARTITION + "/false"; default: throw new IllegalArgumentException("Invalid query action: " + queryAction); } diff --git a/services/venice-server/src/test/java/com/linkedin/venice/listener/StorageReadRequestHandlerTest.java b/services/venice-server/src/test/java/com/linkedin/venice/listener/StorageReadRequestHandlerTest.java index c0171a0d2d..58d5aa61cf 100644 --- a/services/venice-server/src/test/java/com/linkedin/venice/listener/StorageReadRequestHandlerTest.java +++ b/services/venice-server/src/test/java/com/linkedin/venice/listener/StorageReadRequestHandlerTest.java @@ -28,7 +28,7 @@ import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState; import com.linkedin.davinci.listener.response.AdminResponse; import com.linkedin.davinci.listener.response.MetadataResponse; -import com.linkedin.davinci.listener.response.TopicPartitionIngestionContextResponse; +import com.linkedin.davinci.listener.response.ReplicaIngestionResponse; import com.linkedin.davinci.storage.DiskHealthCheckService; import com.linkedin.davinci.storage.IngestionMetadataRetriever; import com.linkedin.davinci.storage.ReadMetadataRetriever; @@ -57,6 +57,7 @@ import com.linkedin.venice.listener.request.ComputeRouterRequestWrapper; import com.linkedin.venice.listener.request.GetRouterRequest; import com.linkedin.venice.listener.request.HealthCheckRequest; +import com.linkedin.venice.listener.request.HeartbeatRequest; import com.linkedin.venice.listener.request.MetadataFetchRequest; import com.linkedin.venice.listener.request.MultiGetRouterRequestWrapper; import com.linkedin.venice.listener.request.RouterRequest; @@ -577,28 +578,50 @@ public void testTopicPartitionIngestionContextRequestsPassInStorageExecutionHand .parseGetHttpRequest(uri, RequestHelper.getRequestParts(URI.create(httpRequest.uri()))); // Mock the TopicPartitionIngestionContextResponse from ingestion task - TopicPartitionIngestionContextResponse expectedTopicPartitionIngestionContextResponse = - new TopicPartitionIngestionContextResponse(); + ReplicaIngestionResponse expectedReplicaIngestionResponse = new ReplicaIngestionResponse(); String jsonStr = "{\n" + "\"kafkaUrl\" : {\n" + " TP(topic: \"" + topic + "\", partition: " + expectedPartitionId + ") : {\n" + " \"latestOffset\" : 0,\n" + " \"offsetLag\" : 1,\n" + " \"msgRate\" : 2.0,\n" + " \"byteRate\" : 4.0,\n" + " \"consumerIdx\" : 6,\n" + " \"elapsedTimeSinceLastPollInMs\" : 7\n" + " }\n" + " }\n" + "}"; byte[] expectedTopicPartitionContext = jsonStr.getBytes(); - expectedTopicPartitionIngestionContextResponse.setTopicPartitionIngestionContext(expectedTopicPartitionContext); - doReturn(expectedTopicPartitionIngestionContextResponse).when(ingestionMetadataRetriever) + expectedReplicaIngestionResponse.setPayload(expectedTopicPartitionContext); + doReturn(expectedReplicaIngestionResponse).when(ingestionMetadataRetriever) .getTopicPartitionIngestionContext(eq(topic), eq(topic), eq(expectedPartitionId)); StorageReadRequestHandler requestHandler = createStorageReadRequestHandler(); requestHandler.channelRead(context, request); verify(context, times(1)).writeAndFlush(argumentCaptor.capture()); - TopicPartitionIngestionContextResponse topicPartitionIngestionContextResponse = - (TopicPartitionIngestionContextResponse) argumentCaptor.getValue(); - String topicPartitionIngestionContextStr = - new String(topicPartitionIngestionContextResponse.getTopicPartitionIngestionContext()); + ReplicaIngestionResponse replicaIngestionResponse = (ReplicaIngestionResponse) argumentCaptor.getValue(); + String topicPartitionIngestionContextStr = new String(replicaIngestionResponse.getPayload()); assertTrue(topicPartitionIngestionContextStr.contains(topic)); - assertEquals( - topicPartitionIngestionContextResponse.getTopicPartitionIngestionContext(), - expectedTopicPartitionContext); + assertEquals(replicaIngestionResponse.getPayload(), expectedTopicPartitionContext); + } + + @Test + public void testHeartbeatLagRequestsPassInStorageExecutionHandler() throws Exception { + String topic = "test_store_v1"; + int expectedPartitionId = 12345; + boolean filterLag = true; + // [0]""/[1]"action"/[2]"optional topic filter"/[3]"optional partition filter"/[4]"optional lag filter" + String uri = "/" + QueryAction.HOST_HEARTBEAT_LAG.toString().toLowerCase() + "/" + topic + "/" + expectedPartitionId + + "/" + filterLag; + HttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri); + HeartbeatRequest request = + HeartbeatRequest.parseGetHttpRequest(uri, RequestHelper.getRequestParts(URI.create(httpRequest.uri()))); + System.out.println(request.getTopic() + " " + request.getPartition() + " " + request.isFilterLagReplica()); + // Mock the TopicPartitionIngestionContextResponse from heartbeat service + ReplicaIngestionResponse expectedReplicaIngestionResponse = new ReplicaIngestionResponse(); + String jsonStr = "{}"; + byte[] expectedTopicPartitionContext = jsonStr.getBytes(); + expectedReplicaIngestionResponse.setPayload(expectedTopicPartitionContext); + doReturn(expectedReplicaIngestionResponse).when(ingestionMetadataRetriever) + .getHeartbeatLag(eq(topic), eq(expectedPartitionId), eq(filterLag)); + + StorageReadRequestHandler requestHandler = createStorageReadRequestHandler(); + requestHandler.channelRead(context, request); + verify(context, times(1)).writeAndFlush(argumentCaptor.capture()); + ReplicaIngestionResponse replicaIngestionResponse = (ReplicaIngestionResponse) argumentCaptor.getValue(); + assertEquals(replicaIngestionResponse.getPayload(), expectedTopicPartitionContext); } @Test