Skip to content

Commit

Permalink
[router][server] Fixed the conn metric in server and improve HAR rout…
Browse files Browse the repository at this point in the history
…ing in Router

This code change includes two main changes:
1. Update Venice Server to track the connection-level metrics truly and previously it was
   actually tracking the request-level metrics.
2. Update Venice Router HAR least-loaded algo to take group latency into account and here
   are the reasons:
   a. When the total qps is relatively low, the pending request count based load rebalancing
      algo doesn't work well as most of the time, the pending count is 0. One example: let us
      say, the avg latency of one group is 10ms, and if the group qps is 10, that means very likely
      only 10-20% of the time, there are some pending requests and even another group latency is
      much lower: 1-2ms, the faster group won't be selected, and the latency-based LB algo will kick
      in when the above case happens to prefer the faster group if the pending count is equal among all
      the groups.
   b. Completely getting rid of pending request count based LB will result in another issue as the relatively
      slower group will get almost zero request.
   c. A combination of pending request and latency will select the faster groups when all groups are busy or idle,
      and it will still send some amount of requests to the slower groups in case it is too idle and it will help
      bring back the slower group into the rotation when it is recovered from the slowness.
  • Loading branch information
gaojieliu committed Dec 20, 2024
1 parent aca711b commit d82217c
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ protected Sensor registerSensor(
}
}
} else {
String metricName = sensorFullName + "." + metricNameSuffix(stat);
String metricName = getMetricFullName(sensor, stat);
if (metricsRepository.getMetric(metricName) == null) {
sensor.add(metricName, stat, config);
}
Expand All @@ -147,6 +147,10 @@ protected Sensor registerSensor(
});
}

protected String getMetricFullName(Sensor sensor, MeasurableStat stat) {
return sensor.name() + "." + metricNameSuffix(stat);
}

/**
* N.B.: {@link LongAdderRateGauge} is just an implementation detail, and we do not wish to alter metric names
* due to it, so we call it the same as {@link Rate}. Same for {@link AsyncGauge}, we don't want to alter any existing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,8 @@ public <H, P extends ResourcePath<K>, K, R> Scatter<H, P, K> scatter(
Map<Instance, KeyPartitionSet<Instance, RouterKey>> hostMap = new HashMap<>();
int helixGroupNum = getHelixGroupNum();
int assignedHelixGroupId = getAssignedHelixGroupId(venicePath);
// This is used to record the request start time for the whole Router request.
venicePath.recordOriginalRequestStartTimestamp();
currentPartition = 0;
try {
for (; currentPartition < partitionCount; currentPartition++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,10 @@ public FullHttpResponse buildResponse(
* 3. HelixGroupId is valid since Helix-assisted routing is only enabled for multi-key request.
*/
if (!venicePath.isRetryRequest() && helixGroupSelector != null && venicePath.getHelixGroupId() >= 0) {
helixGroupSelector.finishRequest(venicePath.getRequestId(), venicePath.getHelixGroupId());
helixGroupSelector.finishRequest(
venicePath.getRequestId(),
venicePath.getHelixGroupId(),
LatencyUtils.getElapsedTimeFromMsToMs(venicePath.getOriginalRequestStartTs()));
}
RequestType requestType = venicePath.getRequestType();
AggRouterHttpRequestStats stats = routerStats.getStatsByType(requestType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.alpini.base.concurrency.TimeoutProcessor;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.router.stats.HelixGroupStats;
import com.linkedin.venice.utils.Pair;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -24,18 +25,18 @@ public class HelixGroupLeastLoadedStrategy implements HelixGroupSelectionStrateg

public static final int MAX_ALLOWED_GROUP = 100;
private final int[] counters = new int[MAX_ALLOWED_GROUP];
/**
* The group count could potentially change during the runtime since the storage node cluster can be expanded
* without bouncing Routers.
*/
private int currentGroupCount = 0;
private final TimeoutProcessor timeoutProcessor;
private final long timeoutInMS;
private final Map<Long, Pair<Integer, TimeoutProcessor.TimeoutFuture>> requestTimeoutFutureMap = new HashMap<>();
private final HelixGroupStats helixGroupStats;

public HelixGroupLeastLoadedStrategy(TimeoutProcessor timeoutProcessor, long timeoutInMS) {
public HelixGroupLeastLoadedStrategy(
TimeoutProcessor timeoutProcessor,
long timeoutInMS,
HelixGroupStats helixGroupStats) {
this.timeoutProcessor = timeoutProcessor;
this.timeoutInMS = timeoutInMS;
this.helixGroupStats = helixGroupStats;
}

@Override
Expand All @@ -44,8 +45,8 @@ public int selectGroup(long requestId, int groupCount) {
throw new VeniceException(
"The valid group num must fail into this range: [1, " + MAX_ALLOWED_GROUP + "], but received: " + groupCount);
}
this.currentGroupCount = groupCount;
long smallestCounter = Integer.MAX_VALUE;
int smallestCounter = Integer.MAX_VALUE;
double lowestAvgLatency = Double.MAX_VALUE;
int leastLoadedGroup = 0;
int startGroupId = (int) (requestId % groupCount);
/**
Expand All @@ -60,10 +61,21 @@ public int selectGroup(long requestId, int groupCount) {
}
for (int i = 0; i < groupCount; ++i) {
int currentGroup = (i + startGroupId) % groupCount;
long currentGroupCounter = counters[currentGroup];
int currentGroupCounter = counters[currentGroup];
if (currentGroupCounter < smallestCounter) {
smallestCounter = currentGroupCounter;
leastLoadedGroup = currentGroup;
lowestAvgLatency = helixGroupStats.getGroupResponseWaitingTimeAvg(currentGroup);
} else if (currentGroupCounter == smallestCounter) {
double currentGroupAvgLatency = helixGroupStats.getGroupResponseWaitingTimeAvg(currentGroup);
/**
* Here we don't check whether {@link #currentGroupAvgLatency} is less than 0 or not, as when the group is not
* being used at all, the average latency will be -1.0.
*/
if (currentGroupAvgLatency < lowestAvgLatency) {
lowestAvgLatency = currentGroupAvgLatency;
leastLoadedGroup = currentGroup;
}
}
}
final int finalLeastLoadedGroup = leastLoadedGroup;
Expand All @@ -82,6 +94,7 @@ public int selectGroup(long requestId, int groupCount) {

++counters[leastLoadedGroup];
}
helixGroupStats.recordGroupPendingRequest(leastLoadedGroup, counters[leastLoadedGroup]);

return leastLoadedGroup;
}
Expand All @@ -100,6 +113,10 @@ private void timeoutRequest(long requestId, int groupId, boolean cancelTimeoutFu
"The allowed group id must fail into this range: [0, " + (MAX_ALLOWED_GROUP - 1) + "], but received: "
+ groupId);
}
if (!cancelTimeoutFuture) {
// Timeout request
helixGroupStats.recordGroupResponseWaitingTime(groupId, timeoutInMS);
}
synchronized (this) {
Pair<Integer, TimeoutProcessor.TimeoutFuture> timeoutFuturePair = requestTimeoutFutureMap.get(requestId);
if (timeoutFuturePair == null) {
Expand Down Expand Up @@ -133,47 +150,8 @@ private void timeoutRequest(long requestId, int groupId, boolean cancelTimeoutFu
}

@Override
public void finishRequest(long requestId, int groupId) {
public void finishRequest(long requestId, int groupId, double latency) {
timeoutRequest(requestId, groupId, true);
}

@Override
public int getMaxGroupPendingRequest() {
if (currentGroupCount == 0) {
return 0;
}
int maxPendingRequest = 0;
for (int i = 0; i < currentGroupCount; ++i) {
if (counters[i] > maxPendingRequest) {
maxPendingRequest = counters[i];
}
}
return maxPendingRequest;
}

@Override
public int getMinGroupPendingRequest() {
if (currentGroupCount == 0) {
return 0;
}
int minPendingRequest = Integer.MAX_VALUE;
for (int i = 0; i < currentGroupCount; ++i) {
if (counters[i] < minPendingRequest) {
minPendingRequest = counters[i];
}
}
return minPendingRequest;
}

@Override
public int getAvgGroupPendingRequest() {
if (currentGroupCount == 0) {
return 0;
}
int totalPendingRequest = 0;
for (int i = 0; i < currentGroupCount; ++i) {
totalPendingRequest += counters[i];
}
return totalPendingRequest / currentGroupCount;
helixGroupStats.recordGroupResponseWaitingTime(groupId, latency);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,8 @@ public int selectGroup(long requestId, int groupNum) {
}

@Override
public void finishRequest(long requestId, int groupId) {
public void finishRequest(long requestId, int groupId, double latency) {
// do nothing
}

@Override
public int getMaxGroupPendingRequest() {
// Not supported
return -1;
}

@Override
public int getMinGroupPendingRequest() {
// Not supported
return -1;
}

@Override
public int getAvgGroupPendingRequest() {
// Not supported
return -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,6 @@ public interface HelixGroupSelectionStrategy {
* Notify the corresponding Helix Group that the request is completed, and the implementation will decide whether
* any cleanup is required or not.
*/
void finishRequest(long requestId, int groupId);
void finishRequest(long requestId, int groupId, double latency);

/**
* Get the maximum of the pending requests among all the groups
*/
int getMaxGroupPendingRequest();

/**
* Get the minimum of the pending requests among all the groups
*/
int getMinGroupPendingRequest();

/**
* Get the average of the pending requests among all the groups
*/
int getAvgGroupPendingRequest();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ public HelixGroupSelector(
HelixInstanceConfigRepository instanceConfigRepository,
HelixGroupSelectionStrategyEnum strategyEnum,
TimeoutProcessor timeoutProcessor) {
this.helixGroupStats = new HelixGroupStats(metricsRepository, this);
this.helixGroupStats = new HelixGroupStats(metricsRepository);
this.instanceConfigRepository = instanceConfigRepository;
Class<? extends HelixGroupSelectionStrategy> strategyClass = strategyEnum.getStrategyClass();
if (strategyClass.equals(HelixGroupLeastLoadedStrategy.class)) {
this.selectionStrategy = new HelixGroupLeastLoadedStrategy(timeoutProcessor, HELIX_GROUP_COUNTER_TIMEOUT_MS);
this.selectionStrategy =
new HelixGroupLeastLoadedStrategy(timeoutProcessor, HELIX_GROUP_COUNTER_TIMEOUT_MS, helixGroupStats);
} else {
try {
this.selectionStrategy = strategyClass.getDeclaredConstructor().newInstance();
Expand All @@ -57,27 +58,11 @@ public int selectGroup(long requestId, int groupNum) {
helixGroupStats.recordGroupNum(groupNum);
int assignedGroupId = selectionStrategy.selectGroup(requestId, groupNum);
helixGroupStats.recordGroupRequest(assignedGroupId);

return assignedGroupId;
}

@Override
public void finishRequest(long requestId, int groupId) {
selectionStrategy.finishRequest(requestId, groupId);
}

@Override
public int getMaxGroupPendingRequest() {
return selectionStrategy.getMaxGroupPendingRequest();
}

@Override
public int getMinGroupPendingRequest() {
return selectionStrategy.getMinGroupPendingRequest();
}

@Override
public int getAvgGroupPendingRequest() {
return selectionStrategy.getAvgGroupPendingRequest();
public void finishRequest(long requestId, int groupId, double latency) {
selectionStrategy.finishRequest(requestId, groupId, latency);
}
}
Original file line number Diff line number Diff line change
@@ -1,35 +1,28 @@
package com.linkedin.venice.router.stats;

import com.linkedin.venice.router.api.routing.helix.HelixGroupSelectionStrategy;
import com.linkedin.venice.stats.AbstractVeniceStats;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.tehuti.Metric;
import io.tehuti.metrics.MeasurableStat;
import io.tehuti.metrics.MetricsRepository;
import io.tehuti.metrics.Sensor;
import io.tehuti.metrics.stats.AsyncGauge;
import io.tehuti.metrics.stats.Avg;
import io.tehuti.metrics.stats.OccurrenceRate;


public class HelixGroupStats extends AbstractVeniceStats {
private final VeniceConcurrentHashMap<Integer, Sensor> groupCounterSensorMap = new VeniceConcurrentHashMap<>();
private final HelixGroupSelectionStrategy strategy;

private final VeniceConcurrentHashMap<Integer, Sensor> pendingRequestSensorMap = new VeniceConcurrentHashMap<>();
private final VeniceConcurrentHashMap<Integer, Sensor> groupResponseWaitingTimeSensorMap =
new VeniceConcurrentHashMap<>();
private final VeniceConcurrentHashMap<Integer, Metric> groupResponseWaitingTimeAvgMap =
new VeniceConcurrentHashMap<>();
private final Sensor groupCountSensor;
private final Sensor maxGroupPendingRequest;
private final Sensor minGroupPendingRequest;
private final Sensor avgGroupPendingRequest;

public HelixGroupStats(MetricsRepository metricsRepository, HelixGroupSelectionStrategy strategy) {
public HelixGroupStats(MetricsRepository metricsRepository) {
super(metricsRepository, "HelixGroupStats");
this.strategy = strategy;

this.groupCountSensor = registerSensor("group_count", new Avg());
this.maxGroupPendingRequest = registerSensor(
new AsyncGauge((ignored, ignored2) -> strategy.getMaxGroupPendingRequest(), "max_group_pending_request"));
this.minGroupPendingRequest = registerSensor(
new AsyncGauge((ignored, ignored2) -> strategy.getMinGroupPendingRequest(), "min_group_pending_request"));
this.avgGroupPendingRequest = registerSensor(
new AsyncGauge((ignored, ignored2) -> strategy.getAvgGroupPendingRequest(), "avg_group_pending_request"));
}

public void recordGroupNum(int groupNum) {
Expand All @@ -41,4 +34,33 @@ public void recordGroupRequest(int groupId) {
.computeIfAbsent(groupId, id -> registerSensor("group_" + groupId + "_request", new OccurrenceRate()));
groupSensor.record();
}

public void recordGroupPendingRequest(int groupId, int pendingRequest) {
Sensor pendingRequestSensor = pendingRequestSensorMap
.computeIfAbsent(groupId, id -> registerSensor("group_" + groupId + "_pending_request", new Avg()));
pendingRequestSensor.record(pendingRequest);
}

public void recordGroupResponseWaitingTime(int groupId, double responseWaitingTime) {
Sensor groupResponseWaitingTimeSensor = groupResponseWaitingTimeSensorMap.computeIfAbsent(groupId, id -> {
MeasurableStat avg = new Avg();
Sensor sensor = registerSensor("group_" + groupId + "_response_waiting_time", avg);
groupResponseWaitingTimeAvgMap.put(groupId, getMetricsRepository().getMetric(getMetricFullName(sensor, avg)));

return sensor;
});
groupResponseWaitingTimeSensor.record(responseWaitingTime);
}

public double getGroupResponseWaitingTimeAvg(int groupId) {
Metric groupResponseWaitingTimeAvgMetric = groupResponseWaitingTimeAvgMap.get(groupId);
if (groupResponseWaitingTimeAvgMetric == null) {
return -1;
}
double avgLatency = groupResponseWaitingTimeAvgMetric.value();
if (Double.isNaN(avgLatency)) {
return -1;
}
return avgLatency;
}
}
Loading

0 comments on commit d82217c

Please sign in to comment.