Skip to content

Commit

Permalink
[server] SN read quota versioned stats not initialized after restart (#…
Browse files Browse the repository at this point in the history
…1312)

The currentVersion and backupVersion of ServerReadQuotaUsageStats are not
set after server restart because handleStoreChanged is invoked for all
stores when the store repo undergoing refresh before we initialize and
register store change listener in ReadQuotaEnforcementHandler (part
of the ListenerService). As a result metrics that depend on current
and backup versions will not show up properly until store is updated.

The fix is to during initialization of ReadQuotaEnforcementHandler we
will invoke handleStoreChanged for all stores after we register store
change listener.

The bug is actually reproducible in existing integration test. However,
it was not caught because the test was broken/misconfigured...
  • Loading branch information
xunyin8 authored Nov 15, 2024
1 parent 697ccf9 commit d125933
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ public void testServerReadQuota() throws Exception {
LOGGER.info("RESTARTING servers");
veniceCluster.stopAndRestartVeniceServer(veniceServerWrapper.getPort());
}
serverMetrics.clear();
for (int i = 0; i < veniceCluster.getVeniceServers().size(); i++) {
serverMetrics.add(veniceCluster.getVeniceServers().get(i).getMetricsRepository());
}
for (int j = 0; j < 5; j++) {
for (int i = 0; i < recordCnt; i++) {
String key = keyPrefix + i;
Expand All @@ -198,7 +202,7 @@ public void testServerReadQuota() throws Exception {
quotaRequestedQPSSum += serverMetric.getMetric(readQuotaRequestedQPSString).value();
assertEquals(serverMetric.getMetric(readQuotaAllowedUnintentionally).value(), 0d);
}
assertTrue(quotaRequestedQPSSum >= 0, "Quota request sum: " + quotaRequestedQPSSum);
assertTrue(quotaRequestedQPSSum > 0, "Quota request sum: " + quotaRequestedQPSSum);
}

@Test(timeOut = TIME_OUT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ public final void init() {
for (Version version: versions) {
customizedViewRepository.subscribeRoutingDataChange(version.kafkaTopicName(), this);
}
// also invoke handle store change to ensure corresponding token bucket and stats are initialized.
handleStoreChanged(store);
}
this.initializedVolatile = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ public void setBackupVersion(int version) {
}
}

public int getCurrentVersion() {
return currentVersion.get();
}

public int getBackupVersion() {
return backupVersion.get();
}

public void removeVersion(int version) {
versionedStats.remove(version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class ReadQuotaEnforcementHandlerTest {
private MetricsRepository metricsRepository;
private RouterRequest routerRequest;
private VeniceServerGrpcHandler mockNextHandler;
private VeniceServerConfig serverConfig;

@BeforeMethod
public void setUp() {
Expand All @@ -97,7 +98,7 @@ public void setUp() {
customizedViewRepository = mock(HelixCustomizedViewOfflinePushRepository.class);
stats = mock(AggServerQuotaUsageStats.class);
metricsRepository = new MetricsRepository();
VeniceServerConfig serverConfig = mock(VeniceServerConfig.class);
serverConfig = mock(VeniceServerConfig.class);
when(serverConfig.getQuotaEnforcementIntervalInMs()).thenReturn(10000);
when(serverConfig.getQuotaEnforcementCapacityMultiple()).thenReturn(5);
doReturn(nodeCapacity).when(serverConfig).getNodeCapacityInRcu();
Expand Down Expand Up @@ -366,18 +367,34 @@ public void testGetRCU() {

@Test
public void testInitWithPreExistingResource() {
// Do initialize the quota enforcement handler with a real stats for this test
quotaEnforcementHandler = new ReadQuotaEnforcementHandler(
serverConfig,
storeRepository,
CompletableFuture.completedFuture(customizedViewRepository),
thisNodeId,
new AggServerQuotaUsageStats(metricsRepository),
metricsRepository,
clock);
String storeName = "testStore";
String topic = Version.composeKafkaTopic(storeName, 1);
Version version = mock(Version.class);
doReturn(topic).when(version).kafkaTopicName();
Store store = setUpStoreMock(storeName, 1, Collections.singletonList(version), 100, true);
doReturn(store).when(storeRepository).getStore(storeName);
doReturn(Collections.singletonList(store)).when(storeRepository).getAllStores();

Instance thisInstance = mock(Instance.class);
doReturn(thisNodeId).when(thisInstance).getNodeId();
Partition partition = setUpPartitionMock(topic, thisInstance, true, 0);
doReturn(0).when(partition).getId();
PartitionAssignment pa = setUpPartitionAssignmentMock(topic, Collections.singletonList(partition));
doReturn(pa).when(customizedViewRepository).getPartitionAssignments(topic);
quotaEnforcementHandler.init();

verify(storeRepository, atLeastOnce()).registerStoreDataChangedListener(any());
verify(customizedViewRepository, atLeastOnce()).subscribeRoutingDataChange(eq(topic), any());
Assert.assertEquals(quotaEnforcementHandler.getStats().getStoreStats(storeName).getCurrentVersion(), 1);
Assert.assertEquals(quotaEnforcementHandler.getStats().getStoreStats(storeName).getBackupVersion(), 0);
}

/**
Expand Down

0 comments on commit d125933

Please sign in to comment.