diff --git a/CHANGELOG.md b/CHANGELOG.md index 9de306a724..5bfa211c9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.62.0] - 2024-10-28 +- Check and take configurable action for invalid partition weight + ## [29.61.0] - 2024-10-24 - Disable dark traffic dispatching during dark warmup @@ -5752,7 +5755,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.61.0...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.62.0...master +[29.62.0]: https://github.com/linkedin/rest.li/compare/v29.61.0...v29.62.0 [29.61.0]: https://github.com/linkedin/rest.li/compare/v29.60.0...v29.61.0 [29.60.0]: https://github.com/linkedin/rest.li/compare/v29.59.0...v29.60.0 [29.59.0]: https://github.com/linkedin/rest.li/compare/v29.58.11...v29.59.0 diff --git a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java index 04152ca744..78fbfb0b49 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java @@ -20,6 +20,9 @@ package com.linkedin.d2.balancer.servers; +import com.google.common.annotations.VisibleForTesting; +import java.math.BigDecimal; +import java.math.RoundingMode; import java.net.URI; import java.util.ArrayDeque; import java.util.Collections; @@ -44,6 +47,8 @@ import com.linkedin.util.ArgumentUtil; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; @@ -93,6 +98,23 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper private final AtomicLong _markDownStartAtRef = new AtomicLong(Long.MAX_VALUE); private volatile Map _partitionDataMap; + /** + * If not null, it defines two rules for d2 weight validation: + * 1. The maximum d2 weight allowed. + * 2. The maximum number of decimal places allowed. Use 0s on decimal places to indicate it. + * For example, 100.00 means the max weight allowed is 100 and the max number of decimal places is 2. + * CAUTION: BigDecimal yields accurate scale when constructed with a string of the number, instead of a double/float. + * E.g: new BigDecimal("100.00") instead of new BigDecimal(100.00). + */ + private final BigDecimal _maxWeight; + /** + * The action to take when d2 weight breaches validation rules. + */ + private final ActionOnWeightBreach _actionOnWeightBreach; + + private final AtomicInteger _maxWeightBreachedCount = new AtomicInteger(0); + private final AtomicInteger _weightDecimalPlacesBreachedCount = new AtomicInteger(0); + private volatile Map _uriSpecificProperties; private ServiceDiscoveryEventEmitter _eventEmitter; @@ -102,6 +124,12 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper * it will try to bring up the server again on ZK if the connection goes down, or a new store is set */ private boolean _isUp; + /** + * Whether the announcer has completed sending a markup intent. NOTE THAT a mark-up intent sent does NOT mean the + * announcement status on service discovery registry is up. Service discovery registry may further process the host + * and determine its status. Check on service discovery registry for the final status. + */ + private final AtomicBoolean _isMarkUpIntentSent = new AtomicBoolean(false); // Field to indicate if warm up was started. If it is true, it will try to end the warm up // by marking down on ZK if the connection goes down @@ -123,13 +151,17 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper private volatile boolean _markUpFailed; // ScheduledExecutorService to schedule the end of dark warm-up, defaults to null - private ScheduledExecutorService _executorService; + private final ScheduledExecutorService _executorService; // Boolean flag to indicate if dark warm-up is enabled, defaults to false - private boolean _isDarkWarmupEnabled; + private final boolean _isDarkWarmupEnabled; + /** + * Whether the announcer has completed sending a dark warmup cluster markup intent. + */ + private final AtomicBoolean _isDarkWarmupMarkUpIntentSent = new AtomicBoolean(false); // String to store the name of the dark warm-up cluster, defaults to null - private String _warmupClusterName; + private final String _warmupClusterName; // Similar as _znodePath and _znodeData above but for the warm up cluster. private final AtomicReference _warmupClusterZnodePathRef = new AtomicReference<>(); private final AtomicReference _warmupClusterZnodeDataRef = new AtomicReference<>(); @@ -139,7 +171,18 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper private final AtomicLong _warmupClusterMarkDownStartAtRef = new AtomicLong(Long.MAX_VALUE); // Field to store the dark warm-up time duration in seconds, defaults to zero - private int _warmupDuration; + private final int _warmupDuration; + + public enum ActionOnWeightBreach { + // Ignore and no op. + IGNORE, + // only log warnings + WARN, + // throw exceptions + THROW, + // rectify the invalid weight (e.g: cap to the max, round to the nearest valid decimal places) + RECTIFY + } /** * @deprecated Use the constructor {@link #ZooKeeperAnnouncer(LoadBalancerServer)} instead. @@ -195,26 +238,18 @@ public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp, public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp, boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService, ServiceDiscoveryEventEmitter eventEmitter) { - _server = server; - // initialIsUp is used for delay mark up. If it's false, there won't be markup when the announcer is started. - _isUp = initialIsUp; - _isWarmingUp = false; - _isRetryWarmup = false; - _pendingMarkDown = new ArrayDeque<>(); - _pendingMarkUp = new ArrayDeque<>(); - _pendingWarmupMarkDown = new ArrayDeque<>(); - - _isDarkWarmupEnabled = isDarkWarmupEnabled; - _warmupClusterName = warmupClusterName; - _warmupDuration = warmupDuration; - _executorService = executorService; - _eventEmitter = eventEmitter; - - server.setServiceDiscoveryEventHelper(this); + this(server, initialIsUp, isDarkWarmupEnabled, warmupClusterName, warmupDuration, executorService, eventEmitter, null, ActionOnWeightBreach.IGNORE); } public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp, boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService, ServiceDiscoveryEventEmitter eventEmitter) + { + this(server, initialIsUp, isDarkWarmupEnabled, warmupClusterName, warmupDuration, executorService, eventEmitter, null, ActionOnWeightBreach.IGNORE); + } + + public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp, + boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService, + ServiceDiscoveryEventEmitter eventEmitter, BigDecimal maxWeight, ActionOnWeightBreach actionOnWeightBreach) { _server = server; // initialIsUp is used for delay mark up. If it's false, there won't be markup when the announcer is started. @@ -231,6 +266,9 @@ public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp, _executorService = executorService; _eventEmitter = eventEmitter; + _maxWeight = maxWeight; + _actionOnWeightBreach = actionOnWeightBreach != null ? actionOnWeightBreach : ActionOnWeightBreach.IGNORE; + if (server instanceof ZooKeeperServer) { ((ZooKeeperServer) server).setServiceDiscoveryEventHelper(this); @@ -354,6 +392,7 @@ public void onError(Throwable e) @Override public void onSuccess(None result) { + _isMarkUpIntentSent.set(true); emitSDStatusActiveUpdateIntentAndWriteEvents(_cluster, true, true, _markUpStartAtRef.get()); _markUpFailed = false; _log.info("markUp for uri = {} on cluster {} succeeded.", _uri, _cluster); @@ -413,6 +452,7 @@ public void onError(Throwable e) @Override public void onSuccess(None result) { + _isDarkWarmupMarkUpIntentSent.set(false); emitSDStatusActiveUpdateIntentAndWriteEvents(_warmupClusterName, false, true, _warmupClusterMarkDownStartAtRef.get()); // Mark _isWarmingUp to false to indicate warm up has completed _isWarmingUp = false; @@ -463,6 +503,7 @@ public void onError(Throwable e) @Override public void onSuccess(None result) { + _isDarkWarmupMarkUpIntentSent.set(true); emitSDStatusActiveUpdateIntentAndWriteEvents(_warmupClusterName, true, true, _warmupClusterMarkUpStartAtRef.get()); _log.info("markUp for uri {} on warm-up cluster {} succeeded", _uri, _warmupClusterName); // Mark _isWarmingUp to true to indicate warm up is in progress @@ -537,6 +578,7 @@ public void onError(Throwable e) @Override public void onSuccess(None result) { + _isMarkUpIntentSent.set(false); emitSDStatusActiveUpdateIntentAndWriteEvents(_cluster, false, true, _markDownStartAtRef.get()); _log.info("markDown for uri = {} succeeded.", _uri); // Note that the pending callbacks we see at this point are @@ -750,12 +792,12 @@ public void setWeight(double weight) Map partitionDataMap = new HashMap<>(1); partitionDataMap.put(partitionId, new PartitionData(weight)); - _partitionDataMap = Collections.unmodifiableMap(partitionDataMap); + setPartitionData(partitionDataMap); } public void setPartitionData(Map partitionData) { - _partitionDataMap = Collections.unmodifiableMap(new HashMap<>(partitionData)); + _partitionDataMap = Collections.unmodifiableMap(new HashMap<>(validatePartitionData(partitionData))); } public Map getPartitionData() @@ -774,6 +816,26 @@ public boolean isMarkUpFailed() return _markUpFailed; } + public boolean isMarkUpIntentSent() + { + return _isMarkUpIntentSent.get(); + } + + public boolean isDarkWarmupMarkUpIntentSent() + { + return _isDarkWarmupMarkUpIntentSent.get(); + } + + public int getMaxWeightBreachedCount() + { + return _maxWeightBreachedCount.get(); + } + + public int getWeightDecimalPlacesBreachedCount() + { + return _weightDecimalPlacesBreachedCount.get(); + } + public void setEventEmitter(ServiceDiscoveryEventEmitter emitter) { _eventEmitter = emitter; } @@ -833,4 +895,70 @@ private ImmutablePair getZnodePathAndData(String cluster) { public boolean isWarmingUp() { return _isWarmingUp; } + + @VisibleForTesting + Map validatePartitionData(Map partitionData) { + Map res = new HashMap<>(partitionData); // modifiable copy in case the input is unmodifiable + for (Map.Entry entry : res.entrySet()) { + BigDecimal weight = BigDecimal.valueOf(entry.getValue().getWeight()); + // check negative weight + if (weight.compareTo(BigDecimal.ZERO) < 0) { + throw new IllegalArgumentException(String.format("Weight %s in Partition %d is negative. Please correct it.", + weight, entry.getKey())); + } + + if (_maxWeight == null) { + break; + } + + // check max weight + if (weight.compareTo(_maxWeight) > 0) { + _maxWeightBreachedCount.incrementAndGet(); + switch (_actionOnWeightBreach) { + case WARN: + _log.warn("", getMaxWeightBreachException(weight, entry.getKey())); + break; + case THROW: + throw getMaxWeightBreachException(weight, entry.getKey()); + case RECTIFY: + entry.setValue(new PartitionData(_maxWeight.intValue())); + weight = _maxWeight; + _log.warn("Capped weight {} in Partition {} to the max weight allowed: {}.", weight, entry.getKey(), + _maxWeight); + break; + case IGNORE: + default: + break; + } + } + + // check decimal places + if (weight.scale() > _maxWeight.scale()) { + _weightDecimalPlacesBreachedCount.incrementAndGet(); + switch (_actionOnWeightBreach) { + case WARN: // both WARN and THROW only log the warning. Don't throw exception for decimal places. + case THROW: + _log.warn("", new IllegalArgumentException(String.format("Weight %s in Partition %d has more than %d" + + " decimal places. It will be rounded in the future.", weight, entry.getKey(), _maxWeight.scale()))); + break; + case RECTIFY: + double newWeight = weight.setScale(_maxWeight.scale(), RoundingMode.HALF_UP).doubleValue(); + entry.setValue(new PartitionData(newWeight)); + _log.warn("Rounded weight {} in Partition {} to {} decimal places: {}.", weight, entry.getKey(), + _maxWeight.scale(), newWeight); + break; + case IGNORE: + default: + break; + } + } + } + return res; + } + + private IllegalArgumentException getMaxWeightBreachException(BigDecimal weight, int partition) { + return new IllegalArgumentException(String.format("[ACTION NEEDED] Weight %s in Partition %d is greater" + + " than the max weight allowed: %s. Please correct the weight. It will be force-capped to the max weight " + + "in the future.", weight, partition, _maxWeight)); + } } diff --git a/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmx.java b/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmx.java index f10211e35f..0fcbe33bfd 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmx.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmx.java @@ -183,4 +183,27 @@ public Map getPartitionData() public boolean isMarkUpFailed() { return _announcer.isMarkUpFailed(); } + + @Override + public boolean isMarkUpIntentSent() + { + return _announcer.isMarkUpIntentSent(); + } + + @Override + public boolean isDarkWarmupMarkUpIntentSent() { + return _announcer.isDarkWarmupMarkUpIntentSent(); + } + + @Override + public int getMaxWeightBreachedCount() + { + return _announcer.getMaxWeightBreachedCount(); + } + + @Override + public int getWeightDecimalPlacesBreachedCount() + { + return _announcer.getWeightDecimalPlacesBreachedCount(); + } } diff --git a/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmxMXBean.java b/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmxMXBean.java index 53c85c0794..70cd1c7165 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmxMXBean.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmxMXBean.java @@ -72,4 +72,27 @@ void setPartitionDataUsingJson(String partitionDataJson) void setPartitionData(Map partitionData); boolean isMarkUpFailed(); + + /** + * @return true if the announcer has completed sending a markup intent. NOTE THAT a mark-up intent sent does NOT mean the + * announcement status on service discovery registry is up. Service discovery registry may further process the host + * and determine its status. Check on service discovery registry for the final status. + */ + boolean isMarkUpIntentSent(); + + /** + * @return true if the announcer has completed sending a dark warmup cluster markup intent. + */ + boolean isDarkWarmupMarkUpIntentSent(); + + /** + * @return the times that the max weight has been breached. + */ + int getMaxWeightBreachedCount(); + + /** + * + * @return the times that the max number of decimal places on weight has been breached. + */ + int getWeightDecimalPlacesBreachedCount(); } diff --git a/d2/src/test/java/com/linkedin/d2/balancer/servers/TestZooKeeperAnnouncer.java b/d2/src/test/java/com/linkedin/d2/balancer/servers/TestZooKeeperAnnouncer.java index 00ade0b0bc..2af6532e13 100644 --- a/d2/src/test/java/com/linkedin/d2/balancer/servers/TestZooKeeperAnnouncer.java +++ b/d2/src/test/java/com/linkedin/d2/balancer/servers/TestZooKeeperAnnouncer.java @@ -2,13 +2,20 @@ import com.linkedin.common.callback.Callback; import com.linkedin.common.util.None; +import com.linkedin.d2.balancer.properties.PartitionData; import com.linkedin.d2.balancer.properties.PropertyKeys; +import com.linkedin.d2.discovery.event.LogOnlyServiceDiscoveryEventEmitter; +import java.math.BigDecimal; +import java.util.Collections; +import java.util.Map; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -24,6 +31,15 @@ public class TestZooKeeperAnnouncer @Mock private Callback _callback; + private static final Map MAX_WEIGHT_BREACH_PARTITION_DATA = + Collections.singletonMap(0, new PartitionData(1000)); + private static final Map DECIMAL_PLACES_BREACH_PARTITION_DATA = + Collections.singletonMap(0, new PartitionData(5.345)); + private static final Map MAX_WEIGHT_AND_DECIMAL_PLACES_BREACH_PARTITION_DATA = + Collections.singletonMap(0, new PartitionData(10.89)); + private static final Map VALID_PARTITION_DATA = + Collections.singletonMap(0, new PartitionData(2.3)); + @BeforeMethod public void setUp() { @@ -43,4 +59,89 @@ public void testSetDoNotLoadBalance() verify(_server).addUriSpecificProperty(any(), any(), any(), any(), eq(PropertyKeys.DO_NOT_LOAD_BALANCE), eq(false), any()); } + + @DataProvider(name = "validatePartitionDataDataProvider") + public Object[][] getValidatePartitionDataDataProvider() + { + return new Object[][] { + { + // no weight rules + null, null, MAX_WEIGHT_BREACH_PARTITION_DATA, MAX_WEIGHT_BREACH_PARTITION_DATA, null, 0, 0 + }, + { + // negative weight throws + null, null, Collections.singletonMap(0, new PartitionData(-1.0)), null, + new IllegalArgumentException("Weight -1.0 in Partition 0 is negative. Please correct it."), 0, 0 + }, + { + // valid weight + "3.0", null, VALID_PARTITION_DATA, VALID_PARTITION_DATA, null, 0, 0 + }, + { + // no action default to IGNORE, which won't correct the value BUT will increment the counts + "10.0", null, MAX_WEIGHT_BREACH_PARTITION_DATA, MAX_WEIGHT_BREACH_PARTITION_DATA, null, 1, 0 + }, + { + // warn action won't correct the value + "10.0", ZooKeeperAnnouncer.ActionOnWeightBreach.WARN, MAX_WEIGHT_BREACH_PARTITION_DATA, + MAX_WEIGHT_BREACH_PARTITION_DATA, null, 1, 0 + }, + { + // max weight breach, correct the value + "10.0", ZooKeeperAnnouncer.ActionOnWeightBreach.RECTIFY, MAX_WEIGHT_BREACH_PARTITION_DATA, + Collections.singletonMap(0, new PartitionData(10)), null, 1, 0 + }, + { + // decimal places breach, correct the value + "10.0", ZooKeeperAnnouncer.ActionOnWeightBreach.RECTIFY, DECIMAL_PLACES_BREACH_PARTITION_DATA, + Collections.singletonMap(0, new PartitionData(5.3)), null, 0, 1 + }, + { + // max weight and decimal places breach, correct the value + "10.0", ZooKeeperAnnouncer.ActionOnWeightBreach.RECTIFY, MAX_WEIGHT_AND_DECIMAL_PLACES_BREACH_PARTITION_DATA, + Collections.singletonMap(0, new PartitionData(10)), null, 1, 0 + }, + { + // throw action throws for max weight breach + "10.0", ZooKeeperAnnouncer.ActionOnWeightBreach.THROW, MAX_WEIGHT_BREACH_PARTITION_DATA, null, + new IllegalArgumentException("[ACTION NEEDED] Weight 1000.0 in Partition 0 is greater than the max weight " + + "allowed: 10.0. Please correct the weight. It will be force-capped to the max weight in the future."), + 1, 0 + }, + { + // throw action does not throw for decimal places breach + "10.0", ZooKeeperAnnouncer.ActionOnWeightBreach.THROW, DECIMAL_PLACES_BREACH_PARTITION_DATA, + DECIMAL_PLACES_BREACH_PARTITION_DATA, null, 0, 1 + } + }; + } + @Test(dataProvider = "validatePartitionDataDataProvider") + public void testValidatePartitionData(String maxWeight, ZooKeeperAnnouncer.ActionOnWeightBreach action, + Map input, Map expected, Exception expectedException, + int expectedMaxWeightBreachedCount, int expectedWeightDecimalPlacesBreachedCount) + { + ZooKeeperAnnouncer announcer = new ZooKeeperAnnouncer(_server, true, false, null, 0, + null, new LogOnlyServiceDiscoveryEventEmitter(), + maxWeight == null ? null : new BigDecimal(maxWeight), action); + + if (expectedException != null) + { + try + { + announcer.validatePartitionData(input); + fail("Expected exception not thrown"); + } + catch (Exception ex) + { + assertTrue(ex instanceof IllegalArgumentException); + assertEquals(expectedException.getMessage(), ex.getMessage()); + } + } + else + { + assertEquals(expected, announcer.validatePartitionData(input)); + } + assertEquals(expectedMaxWeightBreachedCount, announcer.getMaxWeightBreachedCount()); + assertEquals(expectedWeightDecimalPlacesBreachedCount, announcer.getWeightDecimalPlacesBreachedCount()); + } } diff --git a/gradle.properties b/gradle.properties index b3a11e91d4..be86bdb5db 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.61.0 +version=29.62.0 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true