diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/RouterServer.java b/services/venice-router/src/main/java/com/linkedin/venice/router/RouterServer.java index 713322af55..fb4d55f1d9 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/RouterServer.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/RouterServer.java @@ -491,7 +491,8 @@ public boolean startInner() throws Exception { storeConfigRepository, config.getClusterToD2Map(), config.getClusterName(), - compressorFactory); + compressorFactory, + metricsRepository); VenicePathParser pathParser = new VenicePathParser( versionFinder, partitionFinder, diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceVersionFinder.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceVersionFinder.java index f1039d291e..2e5eb5ebbc 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceVersionFinder.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceVersionFinder.java @@ -16,9 +16,12 @@ import com.linkedin.venice.meta.StoreConfig; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionStatus; +import com.linkedin.venice.router.stats.RouterCurrentVersionStats; import com.linkedin.venice.router.stats.StaleVersionReason; import com.linkedin.venice.router.stats.StaleVersionStats; import com.linkedin.venice.utils.RedundantExceptionFilter; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import io.tehuti.metrics.MetricsRepository; import java.util.List; import java.util.Map; import java.util.Optional; @@ -42,9 +45,14 @@ public class VeniceVersionFinder { private final Map clusterToD2Map; private final String clusterName; private final ConcurrentMap lastCurrentVersionMap = new ConcurrentHashMap<>(); + + protected final Map storeStats = new VeniceConcurrentHashMap<>(); + private final HelixBaseRoutingRepository routingDataRepository; private final CompressorFactory compressorFactory; + private final MetricsRepository metricsRepository; + public VeniceVersionFinder( ReadOnlyStoreRepository metadataRepository, HelixBaseRoutingRepository routingDataRepository, @@ -52,7 +60,8 @@ public VeniceVersionFinder( HelixReadOnlyStoreConfigRepository storeConfigRepo, Map clusterToD2Map, String clusterName, - CompressorFactory compressorFactory) { + CompressorFactory compressorFactory, + MetricsRepository metricsRepository) { this.metadataRepository = metadataRepository; this.routingDataRepository = routingDataRepository; this.stats = stats; @@ -60,6 +69,7 @@ public VeniceVersionFinder( this.clusterToD2Map = clusterToD2Map; this.clusterName = clusterName; this.compressorFactory = compressorFactory; + this.metricsRepository = metricsRepository; } public int getVersion(String storeName, BasicFullHttpRequest request) throws VeniceException { @@ -101,7 +111,11 @@ public int getVersion(String storeName, BasicFullHttpRequest request) throws Ven stats.recordNotStale(); return metadataCurrentVersion; } - return maybeServeNewCurrentVersion(store, lastCurrentVersion, metadataCurrentVersion); + int currentVersion = maybeServeNewCurrentVersion(store, lastCurrentVersion, metadataCurrentVersion); + + storeStats.computeIfAbsent(storeName, k -> new RouterCurrentVersionStats(metricsRepository, storeName)) + .updateCurrentVersion(currentVersion); + return currentVersion; } private int maybeServeNewCurrentVersion(Store store, int lastCurrentVersion, int newCurrentVersion) { diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/stats/RouterCurrentVersionStats.java b/services/venice-router/src/main/java/com/linkedin/venice/router/stats/RouterCurrentVersionStats.java new file mode 100644 index 0000000000..7658b2ca60 --- /dev/null +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/stats/RouterCurrentVersionStats.java @@ -0,0 +1,21 @@ +package com.linkedin.venice.router.stats; + +import com.linkedin.venice.stats.AbstractVeniceStats; +import com.linkedin.venice.stats.Gauge; +import io.tehuti.metrics.MetricsRepository; +import io.tehuti.metrics.Sensor; + + +public class RouterCurrentVersionStats extends AbstractVeniceStats { + private final Sensor currentVersionNumberSensor; + private int currentVersion = -1; + + public RouterCurrentVersionStats(MetricsRepository metricsRepository, String name) { + super(metricsRepository, name); + this.currentVersionNumberSensor = registerSensor("current_version", new Gauge(() -> this.currentVersion)); + } + + public void updateCurrentVersion(int currentVersion) { + this.currentVersion = currentVersion; + } +} diff --git a/services/venice-router/src/test/java/com/linkedin/venice/router/api/TestVenicePathParser.java b/services/venice-router/src/test/java/com/linkedin/venice/router/api/TestVenicePathParser.java index 8b54bc23c3..c036454053 100644 --- a/services/venice-router/src/test/java/com/linkedin/venice/router/api/TestVenicePathParser.java +++ b/services/venice-router/src/test/java/com/linkedin/venice/router/api/TestVenicePathParser.java @@ -84,7 +84,8 @@ VeniceVersionFinder getVersionFinder() { storeConfigRepo, clusterToD2Map, CLUSTER, - compressorFactory); + compressorFactory, + null); } RouterStats getMockedStats() { diff --git a/services/venice-router/src/test/java/com/linkedin/venice/router/api/TestVeniceVersionFinder.java b/services/venice-router/src/test/java/com/linkedin/venice/router/api/TestVeniceVersionFinder.java index 7dae0550d1..436ac12ffe 100644 --- a/services/venice-router/src/test/java/com/linkedin/venice/router/api/TestVeniceVersionFinder.java +++ b/services/venice-router/src/test/java/com/linkedin/venice/router/api/TestVeniceVersionFinder.java @@ -1,5 +1,6 @@ package com.linkedin.venice.router.api; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.doReturn; @@ -31,6 +32,8 @@ import com.linkedin.venice.utils.Utils; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpVersion; +import io.tehuti.metrics.MetricsRepository; +import io.tehuti.metrics.Sensor; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.LinkedList; @@ -71,7 +74,8 @@ public void throws404onMissingStore() { storeConfigRepo, clusterToD2Map, CLUSTER, - compressorFactory); + compressorFactory, + mock(MetricsRepository.class)); try { versionFinder.getVersion("", request); Assert.fail( @@ -110,7 +114,8 @@ public void throws301onMigratedStore() { storeConfigRepo, clusterToD2Map, CLUSTER, - compressorFactory); + compressorFactory, + mock(MetricsRepository.class)); try { request.headers().add(HttpConstants.VENICE_ALLOW_REDIRECT, "1"); versionFinder.getVersion("store", request); @@ -144,7 +149,8 @@ public void returnNonExistingVersionOnceStoreIsDisabled() { storeConfigRepo, clusterToD2Map, CLUSTER, - compressorFactory); + compressorFactory, + mock(MetricsRepository.class)); try { versionFinder.getVersion(storeName, request); Assert.fail("Store should be disabled and forbidden to read."); @@ -184,6 +190,9 @@ public void testSwapsVersionWhenAllPartitionsAreOnline() { HelixReadOnlyStoreConfigRepository storeConfigRepo = mock(HelixReadOnlyStoreConfigRepository.class); CompressorFactory compressorFactory = mock(CompressorFactory.class); + MetricsRepository mockMetricsRepository = mock(MetricsRepository.class); + final Sensor mockSensor = mock(Sensor.class); + doReturn(mockSensor).when(mockMetricsRepository).sensor(anyString(), any()); // Object under test VeniceVersionFinder versionFinder = new VeniceVersionFinder( @@ -193,7 +202,8 @@ public void testSwapsVersionWhenAllPartitionsAreOnline() { storeConfigRepo, clusterToD2Map, CLUSTER, - compressorFactory); + compressorFactory, + mockMetricsRepository); // for a new store, the versionFinder returns the current version, no matter the online replicas Assert.assertEquals(versionFinder.getVersion(storeName, request), firstVersion); @@ -266,7 +276,8 @@ public void returnsCurrentVersionWhenTheDictionaryExists() { storeConfigRepo, clusterToD2Map, CLUSTER, - compressorFactory); + compressorFactory, + mock(MetricsRepository.class)); String firstVersionKafkaTopic = Version.composeKafkaTopic(storeName, firstVersion); @@ -314,7 +325,8 @@ public void returnsCurrentVersionWhenItIsTheOnlyOption() { storeConfigRepo, clusterToD2Map, CLUSTER, - compressorFactory); + compressorFactory, + mock(MetricsRepository.class)); String firstVersionKafkaTopic = Version.composeKafkaTopic(storeName, firstVersion); @@ -349,6 +361,9 @@ public void returnsPreviousVersionWhenDictionaryNotDownloaded() { doReturn(true).when(routingDataRepo).containsKafkaTopic(anyString()); CompressorFactory compressorFactory = mock(CompressorFactory.class); + MetricsRepository mockMetricsRepository = mock(MetricsRepository.class); + final Sensor mockSensor = mock(Sensor.class); + doReturn(mockSensor).when(mockMetricsRepository).sensor(anyString(), any()); // Object under test VeniceVersionFinder versionFinder = new VeniceVersionFinder( @@ -358,7 +373,8 @@ public void returnsPreviousVersionWhenDictionaryNotDownloaded() { storeConfigRepo, clusterToD2Map, CLUSTER, - compressorFactory); + compressorFactory, + mockMetricsRepository); String firstVersionKafkaTopic = Version.composeKafkaTopic(storeName, firstVersion); String secondVersionKafkaTopic = Version.composeKafkaTopic(storeName, secondVersion); @@ -403,6 +419,9 @@ public void returnsNewVersionWhenDictionaryDownloads() { doReturn(3).when(routingDataRepo).getNumberOfPartitions(anyString()); doReturn(instances).when(routingDataRepo).getReadyToServeInstances(anyString(), anyInt()); doReturn(true).when(routingDataRepo).containsKafkaTopic(anyString()); + MetricsRepository mockMetricsRepository = mock(MetricsRepository.class); + final Sensor mockSensor = mock(Sensor.class); + doReturn(mockSensor).when(mockMetricsRepository).sensor(anyString(), any()); try (CompressorFactory compressorFactory = new CompressorFactory()) { // Object under test @@ -413,7 +432,8 @@ public void returnsNewVersionWhenDictionaryDownloads() { storeConfigRepo, clusterToD2Map, CLUSTER, - compressorFactory); + compressorFactory, + mockMetricsRepository); String firstVersionKafkaTopic = Version.composeKafkaTopic(storeName, firstVersion); String secondVersionKafkaTopic = Version.composeKafkaTopic(storeName, secondVersion);