Skip to content

Commit

Permalink
check and warn for invalid partition weight (#1033)
Browse files Browse the repository at this point in the history
* check and warn for invalid partition weight

* set configurable actions on weight validation breach

* rebase master

* complete doc

* adjust param type

* fix edge case that breaches both max and decimal places

* add jmx methods for weight breach count and markup status

* adjust var names and types

* add test case of negative weight

* update jmx and mbean method names

* add a test case of valid weight

* make var final
  • Loading branch information
bohhyang authored Nov 4, 2024
1 parent 6ca54b8 commit 6664ccc
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 24 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.62.0] - 2024-10-28
- Check and take configurable action for invalid partition weight

## [29.61.0] - 2024-10-24
- Disable dark traffic dispatching during dark warmup

Expand Down Expand Up @@ -5752,7 +5755,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.61.0...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.62.0...master
[29.62.0]: https://github.com/linkedin/rest.li/compare/v29.61.0...v29.62.0
[29.61.0]: https://github.com/linkedin/rest.li/compare/v29.60.0...v29.61.0
[29.60.0]: https://github.com/linkedin/rest.li/compare/v29.59.0...v29.60.0
[29.59.0]: https://github.com/linkedin/rest.li/compare/v29.58.11...v29.59.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

package com.linkedin.d2.balancer.servers;

import com.google.common.annotations.VisibleForTesting;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.net.URI;
import java.util.ArrayDeque;
import java.util.Collections;
Expand All @@ -44,6 +47,8 @@
import com.linkedin.util.ArgumentUtil;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -93,6 +98,23 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper
private final AtomicLong _markDownStartAtRef = new AtomicLong(Long.MAX_VALUE);

private volatile Map<Integer, PartitionData> _partitionDataMap;
/**
* If not null, it defines two rules for d2 weight validation:
* 1. The maximum d2 weight allowed.
* 2. The maximum number of decimal places allowed. Use 0s on decimal places to indicate it.
* For example, 100.00 means the max weight allowed is 100 and the max number of decimal places is 2.
* CAUTION: BigDecimal yields accurate scale when constructed with a string of the number, instead of a double/float.
* E.g: new BigDecimal("100.00") instead of new BigDecimal(100.00).
*/
private final BigDecimal _maxWeight;
/**
* The action to take when d2 weight breaches validation rules.
*/
private final ActionOnWeightBreach _actionOnWeightBreach;

private final AtomicInteger _maxWeightBreachedCount = new AtomicInteger(0);
private final AtomicInteger _weightDecimalPlacesBreachedCount = new AtomicInteger(0);

private volatile Map<String, Object> _uriSpecificProperties;

private ServiceDiscoveryEventEmitter _eventEmitter;
Expand All @@ -102,6 +124,12 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper
* it will try to bring up the server again on ZK if the connection goes down, or a new store is set
*/
private boolean _isUp;
/**
* Whether the announcer has completed sending a markup intent. NOTE THAT a mark-up intent sent does NOT mean the
* announcement status on service discovery registry is up. Service discovery registry may further process the host
* and determine its status. Check on service discovery registry for the final status.
*/
private final AtomicBoolean _isMarkUpIntentSent = new AtomicBoolean(false);

// Field to indicate if warm up was started. If it is true, it will try to end the warm up
// by marking down on ZK if the connection goes down
Expand All @@ -123,13 +151,17 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper
private volatile boolean _markUpFailed;

// ScheduledExecutorService to schedule the end of dark warm-up, defaults to null
private ScheduledExecutorService _executorService;
private final ScheduledExecutorService _executorService;

// Boolean flag to indicate if dark warm-up is enabled, defaults to false
private boolean _isDarkWarmupEnabled;
private final boolean _isDarkWarmupEnabled;
/**
* Whether the announcer has completed sending a dark warmup cluster markup intent.
*/
private final AtomicBoolean _isDarkWarmupMarkUpIntentSent = new AtomicBoolean(false);

// String to store the name of the dark warm-up cluster, defaults to null
private String _warmupClusterName;
private final String _warmupClusterName;
// Similar as _znodePath and _znodeData above but for the warm up cluster.
private final AtomicReference<String> _warmupClusterZnodePathRef = new AtomicReference<>();
private final AtomicReference<String> _warmupClusterZnodeDataRef = new AtomicReference<>();
Expand All @@ -139,7 +171,18 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper
private final AtomicLong _warmupClusterMarkDownStartAtRef = new AtomicLong(Long.MAX_VALUE);

// Field to store the dark warm-up time duration in seconds, defaults to zero
private int _warmupDuration;
private final int _warmupDuration;

public enum ActionOnWeightBreach {
// Ignore and no op.
IGNORE,
// only log warnings
WARN,
// throw exceptions
THROW,
// rectify the invalid weight (e.g: cap to the max, round to the nearest valid decimal places)
RECTIFY
}

/**
* @deprecated Use the constructor {@link #ZooKeeperAnnouncer(LoadBalancerServer)} instead.
Expand Down Expand Up @@ -195,26 +238,18 @@ public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp,
boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService, ServiceDiscoveryEventEmitter eventEmitter)
{
_server = server;
// initialIsUp is used for delay mark up. If it's false, there won't be markup when the announcer is started.
_isUp = initialIsUp;
_isWarmingUp = false;
_isRetryWarmup = false;
_pendingMarkDown = new ArrayDeque<>();
_pendingMarkUp = new ArrayDeque<>();
_pendingWarmupMarkDown = new ArrayDeque<>();

_isDarkWarmupEnabled = isDarkWarmupEnabled;
_warmupClusterName = warmupClusterName;
_warmupDuration = warmupDuration;
_executorService = executorService;
_eventEmitter = eventEmitter;

server.setServiceDiscoveryEventHelper(this);
this(server, initialIsUp, isDarkWarmupEnabled, warmupClusterName, warmupDuration, executorService, eventEmitter, null, ActionOnWeightBreach.IGNORE);
}

public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService, ServiceDiscoveryEventEmitter eventEmitter)
{
this(server, initialIsUp, isDarkWarmupEnabled, warmupClusterName, warmupDuration, executorService, eventEmitter, null, ActionOnWeightBreach.IGNORE);
}

public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService,
ServiceDiscoveryEventEmitter eventEmitter, BigDecimal maxWeight, ActionOnWeightBreach actionOnWeightBreach)
{
_server = server;
// initialIsUp is used for delay mark up. If it's false, there won't be markup when the announcer is started.
Expand All @@ -231,6 +266,9 @@ public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
_executorService = executorService;
_eventEmitter = eventEmitter;

_maxWeight = maxWeight;
_actionOnWeightBreach = actionOnWeightBreach != null ? actionOnWeightBreach : ActionOnWeightBreach.IGNORE;

if (server instanceof ZooKeeperServer)
{
((ZooKeeperServer) server).setServiceDiscoveryEventHelper(this);
Expand Down Expand Up @@ -354,6 +392,7 @@ public void onError(Throwable e)
@Override
public void onSuccess(None result)
{
_isMarkUpIntentSent.set(true);
emitSDStatusActiveUpdateIntentAndWriteEvents(_cluster, true, true, _markUpStartAtRef.get());
_markUpFailed = false;
_log.info("markUp for uri = {} on cluster {} succeeded.", _uri, _cluster);
Expand Down Expand Up @@ -413,6 +452,7 @@ public void onError(Throwable e)
@Override
public void onSuccess(None result)
{
_isDarkWarmupMarkUpIntentSent.set(false);
emitSDStatusActiveUpdateIntentAndWriteEvents(_warmupClusterName, false, true, _warmupClusterMarkDownStartAtRef.get());
// Mark _isWarmingUp to false to indicate warm up has completed
_isWarmingUp = false;
Expand Down Expand Up @@ -463,6 +503,7 @@ public void onError(Throwable e)
@Override
public void onSuccess(None result)
{
_isDarkWarmupMarkUpIntentSent.set(true);
emitSDStatusActiveUpdateIntentAndWriteEvents(_warmupClusterName, true, true, _warmupClusterMarkUpStartAtRef.get());
_log.info("markUp for uri {} on warm-up cluster {} succeeded", _uri, _warmupClusterName);
// Mark _isWarmingUp to true to indicate warm up is in progress
Expand Down Expand Up @@ -537,6 +578,7 @@ public void onError(Throwable e)
@Override
public void onSuccess(None result)
{
_isMarkUpIntentSent.set(false);
emitSDStatusActiveUpdateIntentAndWriteEvents(_cluster, false, true, _markDownStartAtRef.get());
_log.info("markDown for uri = {} succeeded.", _uri);
// Note that the pending callbacks we see at this point are
Expand Down Expand Up @@ -750,12 +792,12 @@ public void setWeight(double weight)

Map<Integer, PartitionData> partitionDataMap = new HashMap<>(1);
partitionDataMap.put(partitionId, new PartitionData(weight));
_partitionDataMap = Collections.unmodifiableMap(partitionDataMap);
setPartitionData(partitionDataMap);
}

public void setPartitionData(Map<Integer, PartitionData> partitionData)
{
_partitionDataMap = Collections.unmodifiableMap(new HashMap<>(partitionData));
_partitionDataMap = Collections.unmodifiableMap(new HashMap<>(validatePartitionData(partitionData)));
}

public Map<Integer, PartitionData> getPartitionData()
Expand All @@ -774,6 +816,26 @@ public boolean isMarkUpFailed()
return _markUpFailed;
}

public boolean isMarkUpIntentSent()
{
return _isMarkUpIntentSent.get();
}

public boolean isDarkWarmupMarkUpIntentSent()
{
return _isDarkWarmupMarkUpIntentSent.get();
}

public int getMaxWeightBreachedCount()
{
return _maxWeightBreachedCount.get();
}

public int getWeightDecimalPlacesBreachedCount()
{
return _weightDecimalPlacesBreachedCount.get();
}

public void setEventEmitter(ServiceDiscoveryEventEmitter emitter) {
_eventEmitter = emitter;
}
Expand Down Expand Up @@ -833,4 +895,70 @@ private ImmutablePair<String, String> getZnodePathAndData(String cluster) {
public boolean isWarmingUp() {
return _isWarmingUp;
}

@VisibleForTesting
Map<Integer, PartitionData> validatePartitionData(Map<Integer, PartitionData> partitionData) {
Map<Integer, PartitionData> res = new HashMap<>(partitionData); // modifiable copy in case the input is unmodifiable
for (Map.Entry<Integer, PartitionData> entry : res.entrySet()) {
BigDecimal weight = BigDecimal.valueOf(entry.getValue().getWeight());
// check negative weight
if (weight.compareTo(BigDecimal.ZERO) < 0) {
throw new IllegalArgumentException(String.format("Weight %s in Partition %d is negative. Please correct it.",
weight, entry.getKey()));
}

if (_maxWeight == null) {
break;
}

// check max weight
if (weight.compareTo(_maxWeight) > 0) {
_maxWeightBreachedCount.incrementAndGet();
switch (_actionOnWeightBreach) {
case WARN:
_log.warn("", getMaxWeightBreachException(weight, entry.getKey()));
break;
case THROW:
throw getMaxWeightBreachException(weight, entry.getKey());
case RECTIFY:
entry.setValue(new PartitionData(_maxWeight.intValue()));
weight = _maxWeight;
_log.warn("Capped weight {} in Partition {} to the max weight allowed: {}.", weight, entry.getKey(),
_maxWeight);
break;
case IGNORE:
default:
break;
}
}

// check decimal places
if (weight.scale() > _maxWeight.scale()) {
_weightDecimalPlacesBreachedCount.incrementAndGet();
switch (_actionOnWeightBreach) {
case WARN: // both WARN and THROW only log the warning. Don't throw exception for decimal places.
case THROW:
_log.warn("", new IllegalArgumentException(String.format("Weight %s in Partition %d has more than %d"
+ " decimal places. It will be rounded in the future.", weight, entry.getKey(), _maxWeight.scale())));
break;
case RECTIFY:
double newWeight = weight.setScale(_maxWeight.scale(), RoundingMode.HALF_UP).doubleValue();
entry.setValue(new PartitionData(newWeight));
_log.warn("Rounded weight {} in Partition {} to {} decimal places: {}.", weight, entry.getKey(),
_maxWeight.scale(), newWeight);
break;
case IGNORE:
default:
break;
}
}
}
return res;
}

private IllegalArgumentException getMaxWeightBreachException(BigDecimal weight, int partition) {
return new IllegalArgumentException(String.format("[ACTION NEEDED] Weight %s in Partition %d is greater"
+ " than the max weight allowed: %s. Please correct the weight. It will be force-capped to the max weight "
+ "in the future.", weight, partition, _maxWeight));
}
}
23 changes: 23 additions & 0 deletions d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmx.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,27 @@ public Map<Integer, PartitionData> getPartitionData()
public boolean isMarkUpFailed() {
return _announcer.isMarkUpFailed();
}

@Override
public boolean isMarkUpIntentSent()
{
return _announcer.isMarkUpIntentSent();
}

@Override
public boolean isDarkWarmupMarkUpIntentSent() {
return _announcer.isDarkWarmupMarkUpIntentSent();
}

@Override
public int getMaxWeightBreachedCount()
{
return _announcer.getMaxWeightBreachedCount();
}

@Override
public int getWeightDecimalPlacesBreachedCount()
{
return _announcer.getWeightDecimalPlacesBreachedCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,27 @@ void setPartitionDataUsingJson(String partitionDataJson)
void setPartitionData(Map<Integer, PartitionData> partitionData);

boolean isMarkUpFailed();

/**
* @return true if the announcer has completed sending a markup intent. NOTE THAT a mark-up intent sent does NOT mean the
* announcement status on service discovery registry is up. Service discovery registry may further process the host
* and determine its status. Check on service discovery registry for the final status.
*/
boolean isMarkUpIntentSent();

/**
* @return true if the announcer has completed sending a dark warmup cluster markup intent.
*/
boolean isDarkWarmupMarkUpIntentSent();

/**
* @return the times that the max weight has been breached.
*/
int getMaxWeightBreachedCount();

/**
*
* @return the times that the max number of decimal places on weight has been breached.
*/
int getWeightDecimalPlacesBreachedCount();
}
Loading

0 comments on commit 6664ccc

Please sign in to comment.