Skip to content

Commit

Permalink
[router] Add router current version metric (#793)
Browse files Browse the repository at this point in the history
* [router] Add store current version metric

This PR adds current version metric in router which gives us visibility of in-memory vcurrent version being served for a store in the route

---------

Co-authored-by: Sourav Maji <[email protected]>
  • Loading branch information
majisourav99 and Sourav Maji authored Dec 11, 2023
1 parent 9a3d18c commit a91cabe
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,8 @@ public boolean startInner() throws Exception {
storeConfigRepository,
config.getClusterToD2Map(),
config.getClusterName(),
compressorFactory);
compressorFactory,
metricsRepository);
VenicePathParser pathParser = new VenicePathParser(
versionFinder,
partitionFinder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
import com.linkedin.venice.meta.StoreConfig;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.router.stats.RouterCurrentVersionStats;
import com.linkedin.venice.router.stats.StaleVersionReason;
import com.linkedin.venice.router.stats.StaleVersionStats;
import com.linkedin.venice.utils.RedundantExceptionFilter;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.tehuti.metrics.MetricsRepository;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -42,24 +45,31 @@ public class VeniceVersionFinder {
private final Map<String, String> clusterToD2Map;
private final String clusterName;
private final ConcurrentMap<String, Integer> lastCurrentVersionMap = new ConcurrentHashMap<>();

protected final Map<String, RouterCurrentVersionStats> storeStats = new VeniceConcurrentHashMap<>();

private final HelixBaseRoutingRepository routingDataRepository;
private final CompressorFactory compressorFactory;

private final MetricsRepository metricsRepository;

public VeniceVersionFinder(
ReadOnlyStoreRepository metadataRepository,
HelixBaseRoutingRepository routingDataRepository,
StaleVersionStats stats,
HelixReadOnlyStoreConfigRepository storeConfigRepo,
Map<String, String> clusterToD2Map,
String clusterName,
CompressorFactory compressorFactory) {
CompressorFactory compressorFactory,
MetricsRepository metricsRepository) {
this.metadataRepository = metadataRepository;
this.routingDataRepository = routingDataRepository;
this.stats = stats;
this.storeConfigRepo = storeConfigRepo;
this.clusterToD2Map = clusterToD2Map;
this.clusterName = clusterName;
this.compressorFactory = compressorFactory;
this.metricsRepository = metricsRepository;
}

public int getVersion(String storeName, BasicFullHttpRequest request) throws VeniceException {
Expand Down Expand Up @@ -101,7 +111,11 @@ public int getVersion(String storeName, BasicFullHttpRequest request) throws Ven
stats.recordNotStale();
return metadataCurrentVersion;
}
return maybeServeNewCurrentVersion(store, lastCurrentVersion, metadataCurrentVersion);
int currentVersion = maybeServeNewCurrentVersion(store, lastCurrentVersion, metadataCurrentVersion);

storeStats.computeIfAbsent(storeName, k -> new RouterCurrentVersionStats(metricsRepository, storeName))
.updateCurrentVersion(currentVersion);
return currentVersion;
}

private int maybeServeNewCurrentVersion(Store store, int lastCurrentVersion, int newCurrentVersion) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.linkedin.venice.router.stats;

import com.linkedin.venice.stats.AbstractVeniceStats;
import com.linkedin.venice.stats.Gauge;
import io.tehuti.metrics.MetricsRepository;
import io.tehuti.metrics.Sensor;


public class RouterCurrentVersionStats extends AbstractVeniceStats {
private final Sensor currentVersionNumberSensor;
private int currentVersion = -1;

public RouterCurrentVersionStats(MetricsRepository metricsRepository, String name) {
super(metricsRepository, name);
this.currentVersionNumberSensor = registerSensor("current_version", new Gauge(() -> this.currentVersion));
}

public void updateCurrentVersion(int currentVersion) {
this.currentVersion = currentVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ VeniceVersionFinder getVersionFinder() {
storeConfigRepo,
clusterToD2Map,
CLUSTER,
compressorFactory);
compressorFactory,
null);
}

RouterStats<AggRouterHttpRequestStats> getMockedStats() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.router.api;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -31,6 +32,8 @@
import com.linkedin.venice.utils.Utils;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.tehuti.metrics.MetricsRepository;
import io.tehuti.metrics.Sensor;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -71,7 +74,8 @@ public void throws404onMissingStore() {
storeConfigRepo,
clusterToD2Map,
CLUSTER,
compressorFactory);
compressorFactory,
mock(MetricsRepository.class));
try {
versionFinder.getVersion("", request);
Assert.fail(
Expand Down Expand Up @@ -110,7 +114,8 @@ public void throws301onMigratedStore() {
storeConfigRepo,
clusterToD2Map,
CLUSTER,
compressorFactory);
compressorFactory,
mock(MetricsRepository.class));
try {
request.headers().add(HttpConstants.VENICE_ALLOW_REDIRECT, "1");
versionFinder.getVersion("store", request);
Expand Down Expand Up @@ -144,7 +149,8 @@ public void returnNonExistingVersionOnceStoreIsDisabled() {
storeConfigRepo,
clusterToD2Map,
CLUSTER,
compressorFactory);
compressorFactory,
mock(MetricsRepository.class));
try {
versionFinder.getVersion(storeName, request);
Assert.fail("Store should be disabled and forbidden to read.");
Expand Down Expand Up @@ -184,6 +190,9 @@ public void testSwapsVersionWhenAllPartitionsAreOnline() {
HelixReadOnlyStoreConfigRepository storeConfigRepo = mock(HelixReadOnlyStoreConfigRepository.class);

CompressorFactory compressorFactory = mock(CompressorFactory.class);
MetricsRepository mockMetricsRepository = mock(MetricsRepository.class);
final Sensor mockSensor = mock(Sensor.class);
doReturn(mockSensor).when(mockMetricsRepository).sensor(anyString(), any());

// Object under test
VeniceVersionFinder versionFinder = new VeniceVersionFinder(
Expand All @@ -193,7 +202,8 @@ public void testSwapsVersionWhenAllPartitionsAreOnline() {
storeConfigRepo,
clusterToD2Map,
CLUSTER,
compressorFactory);
compressorFactory,
mockMetricsRepository);

// for a new store, the versionFinder returns the current version, no matter the online replicas
Assert.assertEquals(versionFinder.getVersion(storeName, request), firstVersion);
Expand Down Expand Up @@ -266,7 +276,8 @@ public void returnsCurrentVersionWhenTheDictionaryExists() {
storeConfigRepo,
clusterToD2Map,
CLUSTER,
compressorFactory);
compressorFactory,
mock(MetricsRepository.class));

String firstVersionKafkaTopic = Version.composeKafkaTopic(storeName, firstVersion);

Expand Down Expand Up @@ -314,7 +325,8 @@ public void returnsCurrentVersionWhenItIsTheOnlyOption() {
storeConfigRepo,
clusterToD2Map,
CLUSTER,
compressorFactory);
compressorFactory,
mock(MetricsRepository.class));

String firstVersionKafkaTopic = Version.composeKafkaTopic(storeName, firstVersion);

Expand Down Expand Up @@ -349,6 +361,9 @@ public void returnsPreviousVersionWhenDictionaryNotDownloaded() {
doReturn(true).when(routingDataRepo).containsKafkaTopic(anyString());

CompressorFactory compressorFactory = mock(CompressorFactory.class);
MetricsRepository mockMetricsRepository = mock(MetricsRepository.class);
final Sensor mockSensor = mock(Sensor.class);
doReturn(mockSensor).when(mockMetricsRepository).sensor(anyString(), any());

// Object under test
VeniceVersionFinder versionFinder = new VeniceVersionFinder(
Expand All @@ -358,7 +373,8 @@ public void returnsPreviousVersionWhenDictionaryNotDownloaded() {
storeConfigRepo,
clusterToD2Map,
CLUSTER,
compressorFactory);
compressorFactory,
mockMetricsRepository);

String firstVersionKafkaTopic = Version.composeKafkaTopic(storeName, firstVersion);
String secondVersionKafkaTopic = Version.composeKafkaTopic(storeName, secondVersion);
Expand Down Expand Up @@ -403,6 +419,9 @@ public void returnsNewVersionWhenDictionaryDownloads() {
doReturn(3).when(routingDataRepo).getNumberOfPartitions(anyString());
doReturn(instances).when(routingDataRepo).getReadyToServeInstances(anyString(), anyInt());
doReturn(true).when(routingDataRepo).containsKafkaTopic(anyString());
MetricsRepository mockMetricsRepository = mock(MetricsRepository.class);
final Sensor mockSensor = mock(Sensor.class);
doReturn(mockSensor).when(mockMetricsRepository).sensor(anyString(), any());

try (CompressorFactory compressorFactory = new CompressorFactory()) {
// Object under test
Expand All @@ -413,7 +432,8 @@ public void returnsNewVersionWhenDictionaryDownloads() {
storeConfigRepo,
clusterToD2Map,
CLUSTER,
compressorFactory);
compressorFactory,
mockMetricsRepository);

String firstVersionKafkaTopic = Version.composeKafkaTopic(storeName, firstVersion);
String secondVersionKafkaTopic = Version.composeKafkaTopic(storeName, secondVersion);
Expand Down

0 comments on commit a91cabe

Please sign in to comment.