diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceStats.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceStats.java index 9bbf2dee894..a1ffc7b92aa 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceStats.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceStats.java @@ -136,7 +136,7 @@ protected Sensor registerSensor( } } } else { - String metricName = sensorFullName + "." + metricNameSuffix(stat); + String metricName = getMetricFullName(sensor, stat); if (metricsRepository.getMetric(metricName) == null) { sensor.add(metricName, stat, config); } @@ -147,6 +147,10 @@ protected Sensor registerSensor( }); } + protected String getMetricFullName(Sensor sensor, MeasurableStat stat) { + return sensor.name() + "." + metricNameSuffix(stat); + } + /** * N.B.: {@link LongAdderRateGauge} is just an implementation detail, and we do not wish to alter metric names * due to it, so we call it the same as {@link Rate}. Same for {@link AsyncGauge}, we don't want to alter any existing diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceDelegateMode.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceDelegateMode.java index d543cc95fc2..b111c995325 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceDelegateMode.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceDelegateMode.java @@ -513,6 +513,8 @@ public , K, R> Scatter scatter( Map> hostMap = new HashMap<>(); int helixGroupNum = getHelixGroupNum(); int assignedHelixGroupId = getAssignedHelixGroupId(venicePath); + // This is used to record the request start time for the whole Router request. + venicePath.recordOriginalRequestStartTimestamp(); currentPartition = 0; try { for (; currentPartition < partitionCount; currentPartition++) { diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceResponseAggregator.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceResponseAggregator.java index 338d4b63b49..c8f7610064f 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceResponseAggregator.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceResponseAggregator.java @@ -184,7 +184,10 @@ public FullHttpResponse buildResponse( * 3. HelixGroupId is valid since Helix-assisted routing is only enabled for multi-key request. */ if (!venicePath.isRetryRequest() && helixGroupSelector != null && venicePath.getHelixGroupId() >= 0) { - helixGroupSelector.finishRequest(venicePath.getRequestId(), venicePath.getHelixGroupId()); + helixGroupSelector.finishRequest( + venicePath.getRequestId(), + venicePath.getHelixGroupId(), + LatencyUtils.getElapsedTimeFromMsToMs(venicePath.getOriginalRequestStartTs())); } RequestType requestType = venicePath.getRequestType(); AggRouterHttpRequestStats stats = routerStats.getStatsByType(requestType); diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/routing/helix/HelixGroupLeastLoadedStrategy.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/routing/helix/HelixGroupLeastLoadedStrategy.java index eaef61b58fb..2d3bdc683fb 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/routing/helix/HelixGroupLeastLoadedStrategy.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/routing/helix/HelixGroupLeastLoadedStrategy.java @@ -2,6 +2,7 @@ import com.linkedin.alpini.base.concurrency.TimeoutProcessor; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.router.stats.HelixGroupStats; import com.linkedin.venice.utils.Pair; import java.util.HashMap; import java.util.Map; @@ -24,18 +25,18 @@ public class HelixGroupLeastLoadedStrategy implements HelixGroupSelectionStrateg public static final int MAX_ALLOWED_GROUP = 100; private final int[] counters = new int[MAX_ALLOWED_GROUP]; - /** - * The group count could potentially change during the runtime since the storage node cluster can be expanded - * without bouncing Routers. - */ - private int currentGroupCount = 0; private final TimeoutProcessor timeoutProcessor; private final long timeoutInMS; private final Map> requestTimeoutFutureMap = new HashMap<>(); + private final HelixGroupStats helixGroupStats; - public HelixGroupLeastLoadedStrategy(TimeoutProcessor timeoutProcessor, long timeoutInMS) { + public HelixGroupLeastLoadedStrategy( + TimeoutProcessor timeoutProcessor, + long timeoutInMS, + HelixGroupStats helixGroupStats) { this.timeoutProcessor = timeoutProcessor; this.timeoutInMS = timeoutInMS; + this.helixGroupStats = helixGroupStats; } @Override @@ -44,8 +45,8 @@ public int selectGroup(long requestId, int groupCount) { throw new VeniceException( "The valid group num must fail into this range: [1, " + MAX_ALLOWED_GROUP + "], but received: " + groupCount); } - this.currentGroupCount = groupCount; - long smallestCounter = Integer.MAX_VALUE; + int smallestCounter = Integer.MAX_VALUE; + double lowestAvgLatency = Double.MAX_VALUE; int leastLoadedGroup = 0; int startGroupId = (int) (requestId % groupCount); /** @@ -60,10 +61,21 @@ public int selectGroup(long requestId, int groupCount) { } for (int i = 0; i < groupCount; ++i) { int currentGroup = (i + startGroupId) % groupCount; - long currentGroupCounter = counters[currentGroup]; + int currentGroupCounter = counters[currentGroup]; if (currentGroupCounter < smallestCounter) { smallestCounter = currentGroupCounter; leastLoadedGroup = currentGroup; + lowestAvgLatency = helixGroupStats.getGroupResponseWaitingTimeAvg(currentGroup); + } else if (currentGroupCounter == smallestCounter) { + double currentGroupAvgLatency = helixGroupStats.getGroupResponseWaitingTimeAvg(currentGroup); + /** + * Here we don't check whether {@link #currentGroupAvgLatency} is less than 0 or not, as when the group is not + * being used at all, the average latency will be -1.0. + */ + if (currentGroupAvgLatency < lowestAvgLatency) { + lowestAvgLatency = currentGroupAvgLatency; + leastLoadedGroup = currentGroup; + } } } final int finalLeastLoadedGroup = leastLoadedGroup; @@ -82,6 +94,7 @@ public int selectGroup(long requestId, int groupCount) { ++counters[leastLoadedGroup]; } + helixGroupStats.recordGroupPendingRequest(leastLoadedGroup, counters[leastLoadedGroup]); return leastLoadedGroup; } @@ -100,6 +113,10 @@ private void timeoutRequest(long requestId, int groupId, boolean cancelTimeoutFu "The allowed group id must fail into this range: [0, " + (MAX_ALLOWED_GROUP - 1) + "], but received: " + groupId); } + if (!cancelTimeoutFuture) { + // Timeout request + helixGroupStats.recordGroupResponseWaitingTime(groupId, timeoutInMS); + } synchronized (this) { Pair timeoutFuturePair = requestTimeoutFutureMap.get(requestId); if (timeoutFuturePair == null) { @@ -133,47 +150,8 @@ private void timeoutRequest(long requestId, int groupId, boolean cancelTimeoutFu } @Override - public void finishRequest(long requestId, int groupId) { + public void finishRequest(long requestId, int groupId, double latency) { timeoutRequest(requestId, groupId, true); - } - - @Override - public int getMaxGroupPendingRequest() { - if (currentGroupCount == 0) { - return 0; - } - int maxPendingRequest = 0; - for (int i = 0; i < currentGroupCount; ++i) { - if (counters[i] > maxPendingRequest) { - maxPendingRequest = counters[i]; - } - } - return maxPendingRequest; - } - - @Override - public int getMinGroupPendingRequest() { - if (currentGroupCount == 0) { - return 0; - } - int minPendingRequest = Integer.MAX_VALUE; - for (int i = 0; i < currentGroupCount; ++i) { - if (counters[i] < minPendingRequest) { - minPendingRequest = counters[i]; - } - } - return minPendingRequest; - } - - @Override - public int getAvgGroupPendingRequest() { - if (currentGroupCount == 0) { - return 0; - } - int totalPendingRequest = 0; - for (int i = 0; i < currentGroupCount; ++i) { - totalPendingRequest += counters[i]; - } - return totalPendingRequest / currentGroupCount; + helixGroupStats.recordGroupResponseWaitingTime(groupId, latency); } } diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/routing/helix/HelixGroupRoundRobinStrategy.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/routing/helix/HelixGroupRoundRobinStrategy.java index cba210ec7d8..59cb7547ea1 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/routing/helix/HelixGroupRoundRobinStrategy.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/routing/helix/HelixGroupRoundRobinStrategy.java @@ -14,25 +14,8 @@ public int selectGroup(long requestId, int groupNum) { } @Override - public void finishRequest(long requestId, int groupId) { + public void finishRequest(long requestId, int groupId, double latency) { // do nothing } - @Override - public int getMaxGroupPendingRequest() { - // Not supported - return -1; - } - - @Override - public int getMinGroupPendingRequest() { - // Not supported - return -1; - } - - @Override - public int getAvgGroupPendingRequest() { - // Not supported - return -1; - } } diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/routing/helix/HelixGroupSelectionStrategy.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/routing/helix/HelixGroupSelectionStrategy.java index c60b70b1863..c513e9c604f 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/routing/helix/HelixGroupSelectionStrategy.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/routing/helix/HelixGroupSelectionStrategy.java @@ -10,20 +10,6 @@ public interface HelixGroupSelectionStrategy { * Notify the corresponding Helix Group that the request is completed, and the implementation will decide whether * any cleanup is required or not. */ - void finishRequest(long requestId, int groupId); + void finishRequest(long requestId, int groupId, double latency); - /** - * Get the maximum of the pending requests among all the groups - */ - int getMaxGroupPendingRequest(); - - /** - * Get the minimum of the pending requests among all the groups - */ - int getMinGroupPendingRequest(); - - /** - * Get the average of the pending requests among all the groups - */ - int getAvgGroupPendingRequest(); } diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/routing/helix/HelixGroupSelector.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/routing/helix/HelixGroupSelector.java index c36023b87b1..36eb2529e13 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/routing/helix/HelixGroupSelector.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/routing/helix/HelixGroupSelector.java @@ -30,11 +30,12 @@ public HelixGroupSelector( HelixInstanceConfigRepository instanceConfigRepository, HelixGroupSelectionStrategyEnum strategyEnum, TimeoutProcessor timeoutProcessor) { - this.helixGroupStats = new HelixGroupStats(metricsRepository, this); + this.helixGroupStats = new HelixGroupStats(metricsRepository); this.instanceConfigRepository = instanceConfigRepository; Class strategyClass = strategyEnum.getStrategyClass(); if (strategyClass.equals(HelixGroupLeastLoadedStrategy.class)) { - this.selectionStrategy = new HelixGroupLeastLoadedStrategy(timeoutProcessor, HELIX_GROUP_COUNTER_TIMEOUT_MS); + this.selectionStrategy = + new HelixGroupLeastLoadedStrategy(timeoutProcessor, HELIX_GROUP_COUNTER_TIMEOUT_MS, helixGroupStats); } else { try { this.selectionStrategy = strategyClass.getDeclaredConstructor().newInstance(); @@ -57,27 +58,11 @@ public int selectGroup(long requestId, int groupNum) { helixGroupStats.recordGroupNum(groupNum); int assignedGroupId = selectionStrategy.selectGroup(requestId, groupNum); helixGroupStats.recordGroupRequest(assignedGroupId); - return assignedGroupId; } @Override - public void finishRequest(long requestId, int groupId) { - selectionStrategy.finishRequest(requestId, groupId); - } - - @Override - public int getMaxGroupPendingRequest() { - return selectionStrategy.getMaxGroupPendingRequest(); - } - - @Override - public int getMinGroupPendingRequest() { - return selectionStrategy.getMinGroupPendingRequest(); - } - - @Override - public int getAvgGroupPendingRequest() { - return selectionStrategy.getAvgGroupPendingRequest(); + public void finishRequest(long requestId, int groupId, double latency) { + selectionStrategy.finishRequest(requestId, groupId, latency); } } diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/stats/HelixGroupStats.java b/services/venice-router/src/main/java/com/linkedin/venice/router/stats/HelixGroupStats.java index a248de31cf2..e53d8c57b4e 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/stats/HelixGroupStats.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/stats/HelixGroupStats.java @@ -1,35 +1,28 @@ package com.linkedin.venice.router.stats; -import com.linkedin.venice.router.api.routing.helix.HelixGroupSelectionStrategy; import com.linkedin.venice.stats.AbstractVeniceStats; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import io.tehuti.Metric; +import io.tehuti.metrics.MeasurableStat; import io.tehuti.metrics.MetricsRepository; import io.tehuti.metrics.Sensor; -import io.tehuti.metrics.stats.AsyncGauge; import io.tehuti.metrics.stats.Avg; import io.tehuti.metrics.stats.OccurrenceRate; public class HelixGroupStats extends AbstractVeniceStats { private final VeniceConcurrentHashMap groupCounterSensorMap = new VeniceConcurrentHashMap<>(); - private final HelixGroupSelectionStrategy strategy; - + private final VeniceConcurrentHashMap pendingRequestSensorMap = new VeniceConcurrentHashMap<>(); + private final VeniceConcurrentHashMap groupResponseWaitingTimeSensorMap = + new VeniceConcurrentHashMap<>(); + private final VeniceConcurrentHashMap groupResponseWaitingTimeAvgMap = + new VeniceConcurrentHashMap<>(); private final Sensor groupCountSensor; - private final Sensor maxGroupPendingRequest; - private final Sensor minGroupPendingRequest; - private final Sensor avgGroupPendingRequest; - public HelixGroupStats(MetricsRepository metricsRepository, HelixGroupSelectionStrategy strategy) { + public HelixGroupStats(MetricsRepository metricsRepository) { super(metricsRepository, "HelixGroupStats"); - this.strategy = strategy; this.groupCountSensor = registerSensor("group_count", new Avg()); - this.maxGroupPendingRequest = registerSensor( - new AsyncGauge((ignored, ignored2) -> strategy.getMaxGroupPendingRequest(), "max_group_pending_request")); - this.minGroupPendingRequest = registerSensor( - new AsyncGauge((ignored, ignored2) -> strategy.getMinGroupPendingRequest(), "min_group_pending_request")); - this.avgGroupPendingRequest = registerSensor( - new AsyncGauge((ignored, ignored2) -> strategy.getAvgGroupPendingRequest(), "avg_group_pending_request")); } public void recordGroupNum(int groupNum) { @@ -41,4 +34,33 @@ public void recordGroupRequest(int groupId) { .computeIfAbsent(groupId, id -> registerSensor("group_" + groupId + "_request", new OccurrenceRate())); groupSensor.record(); } + + public void recordGroupPendingRequest(int groupId, int pendingRequest) { + Sensor pendingRequestSensor = pendingRequestSensorMap + .computeIfAbsent(groupId, id -> registerSensor("group_" + groupId + "_pending_request", new Avg())); + pendingRequestSensor.record(pendingRequest); + } + + public void recordGroupResponseWaitingTime(int groupId, double responseWaitingTime) { + Sensor groupResponseWaitingTimeSensor = groupResponseWaitingTimeSensorMap.computeIfAbsent(groupId, id -> { + MeasurableStat avg = new Avg(); + Sensor sensor = registerSensor("group_" + groupId + "_response_waiting_time", avg); + groupResponseWaitingTimeAvgMap.put(groupId, getMetricsRepository().getMetric(getMetricFullName(sensor, avg))); + + return sensor; + }); + groupResponseWaitingTimeSensor.record(responseWaitingTime); + } + + public double getGroupResponseWaitingTimeAvg(int groupId) { + Metric groupResponseWaitingTimeAvgMetric = groupResponseWaitingTimeAvgMap.get(groupId); + if (groupResponseWaitingTimeAvgMetric == null) { + return -1; + } + double avgLatency = groupResponseWaitingTimeAvgMetric.value(); + if (Double.isNaN(avgLatency)) { + return -1; + } + return avgLatency; + } } diff --git a/services/venice-router/src/test/java/com/linkedin/venice/router/api/routing/helix/TestHelixGroupLeastLoadedStrategy.java b/services/venice-router/src/test/java/com/linkedin/venice/router/api/routing/helix/TestHelixGroupLeastLoadedStrategy.java index 9dc92bbe9fd..6a27e7c8112 100644 --- a/services/venice-router/src/test/java/com/linkedin/venice/router/api/routing/helix/TestHelixGroupLeastLoadedStrategy.java +++ b/services/venice-router/src/test/java/com/linkedin/venice/router/api/routing/helix/TestHelixGroupLeastLoadedStrategy.java @@ -6,6 +6,8 @@ import static org.mockito.Mockito.mock; import com.linkedin.alpini.base.concurrency.TimeoutProcessor; +import com.linkedin.venice.router.stats.HelixGroupStats; +import io.tehuti.metrics.MetricsRepository; import org.testng.Assert; import org.testng.annotations.Test; @@ -15,24 +17,39 @@ public class TestHelixGroupLeastLoadedStrategy { public void testSelectGroup() { TimeoutProcessor timeoutProcessor = mock(TimeoutProcessor.class); doReturn(mock(TimeoutProcessor.TimeoutFuture.class)).when(timeoutProcessor).schedule(any(), anyLong(), any()); - HelixGroupLeastLoadedStrategy strategy = new HelixGroupLeastLoadedStrategy(timeoutProcessor, 10000); + HelixGroupLeastLoadedStrategy strategy = + new HelixGroupLeastLoadedStrategy(timeoutProcessor, 10000, mock(HelixGroupStats.class)); int groupNum = 3; // Group 0 is slow. Assert.assertEquals(strategy.selectGroup(0, groupNum), 0); Assert.assertEquals(strategy.selectGroup(1, groupNum), 1); Assert.assertEquals(strategy.selectGroup(2, groupNum), 2); - Assert.assertEquals(strategy.getMaxGroupPendingRequest(), 1); - Assert.assertEquals(strategy.getMinGroupPendingRequest(), 1); - Assert.assertEquals(strategy.getAvgGroupPendingRequest(), 1); - strategy.finishRequest(1, 1); - strategy.finishRequest(2, 2); + strategy.finishRequest(1, 1, 1); + strategy.finishRequest(2, 2, 1); Assert.assertEquals(strategy.selectGroup(3, groupNum), 1); Assert.assertEquals(strategy.selectGroup(4, groupNum), 2); - strategy.finishRequest(0, 0); - strategy.finishRequest(3, 1); - strategy.finishRequest(4, 2); + strategy.finishRequest(0, 0, 1); + strategy.finishRequest(3, 1, 1); + strategy.finishRequest(4, 2, 1); // Group 0 is recovered Assert.assertEquals(strategy.selectGroup(5, groupNum), 2); Assert.assertEquals(strategy.selectGroup(6, groupNum), 0); } + + @Test + public void testLatencyBasedGroupSelection() { + TimeoutProcessor timeoutProcessor = mock(TimeoutProcessor.class); + doReturn(mock(TimeoutProcessor.TimeoutFuture.class)).when(timeoutProcessor).schedule(any(), anyLong(), any()); + HelixGroupStats stats = new HelixGroupStats(new MetricsRepository()); + HelixGroupLeastLoadedStrategy strategy = new HelixGroupLeastLoadedStrategy(timeoutProcessor, 10000, stats); + int groupNum = 3; + Assert.assertEquals(strategy.selectGroup(0, groupNum), 0); + Assert.assertEquals(strategy.selectGroup(1, groupNum), 1); + Assert.assertEquals(strategy.selectGroup(2, groupNum), 2); + // Group 2 is the fastest one + strategy.finishRequest(0, 0, 2); + strategy.finishRequest(1, 1, 3); + strategy.finishRequest(2, 2, 1); + Assert.assertEquals(strategy.selectGroup(3, groupNum), 2); + } } diff --git a/services/venice-router/src/test/java/com/linkedin/venice/router/api/routing/helix/TestHelixGroupRoundRobinStrategy.java b/services/venice-router/src/test/java/com/linkedin/venice/router/api/routing/helix/TestHelixGroupRoundRobinStrategy.java index 9d2a877d0c4..c98c91759a3 100644 --- a/services/venice-router/src/test/java/com/linkedin/venice/router/api/routing/helix/TestHelixGroupRoundRobinStrategy.java +++ b/services/venice-router/src/test/java/com/linkedin/venice/router/api/routing/helix/TestHelixGroupRoundRobinStrategy.java @@ -10,16 +10,16 @@ public void testSelectGroup() { HelixGroupRoundRobinStrategy strategy = new HelixGroupRoundRobinStrategy(); int groupNum = 3; Assert.assertEquals(strategy.selectGroup(0, groupNum), 0); - strategy.finishRequest(0, 0); + strategy.finishRequest(0, 0, 1); Assert.assertEquals(strategy.selectGroup(1, groupNum), 1); - strategy.finishRequest(1, 1); + strategy.finishRequest(1, 1, 1); Assert.assertEquals(strategy.selectGroup(2, groupNum), 2); - strategy.finishRequest(2, 2); + strategy.finishRequest(2, 2, 1); Assert.assertEquals(strategy.selectGroup(3, groupNum), 0); - strategy.finishRequest(3, 0); + strategy.finishRequest(3, 0, 1); Assert.assertEquals(strategy.selectGroup(4, groupNum), 1); - strategy.finishRequest(4, 1); + strategy.finishRequest(4, 1, 1); Assert.assertEquals(strategy.selectGroup(5, groupNum), 2); - strategy.finishRequest(5, 2); + strategy.finishRequest(5, 2, 1); } } diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/HttpChannelInitializer.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/HttpChannelInitializer.java index fdca044c42a..b25ac43629a 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/HttpChannelInitializer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/HttpChannelInitializer.java @@ -176,10 +176,9 @@ public void initChannel(SocketChannel ch) { sslInitializer.setIdentityParser(identityParser::parseIdentityFromCert); ch.pipeline().addLast(sslInitializer); } + ch.pipeline() + .addLast(new ServerConnectionStatsHandler(serverConnectionStats, serverConfig.getRouterPrincipalName())); ChannelPipelineConsumer httpPipelineInitializer = (pipeline, whetherNeedServerCodec) -> { - ServerConnectionStatsHandler serverConnectionStatsHandler = - new ServerConnectionStatsHandler(serverConnectionStats, serverConfig.getRouterPrincipalName()); - pipeline.addLast(serverConnectionStatsHandler); StatsHandler statsHandler = new StatsHandler(singleGetStats, multiGetStats, computeStats); pipeline.addLast(statsHandler); if (whetherNeedServerCodec) {