From 089994fd2b7b28b1a9ea1882996e72d369941c79 Mon Sep 17 00:00:00 2001 From: Bohan Yang Date: Fri, 19 Jan 2024 10:43:35 -0800 Subject: [PATCH] Adjust dual read monitoring data match logic and remove log rate limiter (#968) * adjust dual read monitoring data match logic and log rate limiter * take version of -1 as an exception to allow data match --- CHANGELOG.md | 6 +- .../dualread/DualReadLoadBalancerMonitor.java | 153 +++++++++------ .../d2/balancer/properties/UriProperties.java | 14 +- .../properties/UriPropertiesMerger.java | 4 - .../stores/zk/ZooKeeperEphemeralStore.java | 2 +- .../d2/xds/XdsToD2PropertiesAdaptor.java | 4 + .../DualReadLoadBalancerMonitorTest.java | 177 +++++++++++++++++- .../com/linkedin/d2/util/TestDataHelper.java | 23 +++ gradle.properties | 2 +- 9 files changed, 319 insertions(+), 66 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 245401a4c0..1f2da9bea8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.49.7] - 2024-01-18 +adjust dual read monitoring data match logic and log rate limiter + ## [29.49.6] - 2024-01-12 - fix dualread monitoring log @@ -5612,7 +5615,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.49.6...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.49.7...master +[29.49.7]: https://github.com/linkedin/rest.li/compare/v29.49.6...v29.49.7 [29.49.6]: https://github.com/linkedin/rest.li/compare/v29.49.5...v29.49.6 [29.49.5]: https://github.com/linkedin/rest.li/compare/v29.49.4...v29.49.5 [29.49.4]: https://github.com/linkedin/rest.li/compare/v29.49.3...v29.49.4 diff --git a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerMonitor.java b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerMonitor.java index f0f7f84292..be101f7418 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerMonitor.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerMonitor.java @@ -40,15 +40,14 @@ * For each service discovery properties, there will be a cache for old load balancer data, and a cache * for new load balancer data. * - * When a new service discovery data is reported, it will check the cache of the other data source - * and see if there is a match based on the property name and version. If there is a match, it will - * use the {@link DualReadLoadBalancerMonitor#isEqual(CacheEntry, CacheEntry)} method to compare whether - * the two entries are equal. + * When a new service discovery data is reported, it will check if the cache of the other data source + * has data for the same property name. If there is, it will compare whether the two data are equal. */ public abstract class DualReadLoadBalancerMonitor { private static final Logger LOG = LoggerFactory.getLogger(DualReadLoadBalancerMonitor.class); public final static String DEFAULT_DATE_FORMAT = "YYYY/MM/dd HH:mm:ss.SSS"; + public final static String VERSION_FROM_FS = "-1"; private static final long ERROR_REPORT_PERIOD = 10 * 1000; // Limit error report logging to every 10 seconds private static final int MAX_CACHE_SIZE = 10000; private final Cache> _oldLbPropertyCache; @@ -69,64 +68,108 @@ public DualReadLoadBalancerMonitor(Clock clock) public void reportData(String propertyName, T property, String propertyVersion, boolean fromNewLb) { + // compare with existing data in the cache to add to Cache> cacheToAdd = fromNewLb ? _newLbPropertyCache : _oldLbPropertyCache; CacheEntry existingEntry = cacheToAdd.getIfPresent(propertyName); - if (existingEntry != null) + String propertyClassName = property.getClass().getSimpleName(); + boolean isUriProp = property instanceof UriProperties; + + if (existingEntry != null && existingEntry._data.equals(property)) { - if (existingEntry._version.equals(propertyVersion) && existingEntry._data.equals(property)) + if (existingEntry._version.equals(propertyVersion)) { - _rateLimitedLogger.debug("Reported duplicate for {} LB for property: {}, version: {}, data: {}", - fromNewLb ? "New" : "Old", propertyName, propertyVersion, property); + LOG.debug("Reported duplicated {} for {} for {} LB, version: {}, data: {}", + propertyClassName, propertyName, fromNewLb ? "New" : "Old", propertyVersion, property); return; // skip setting duplicate data to avoid incorrectly incrementing OutOfSync metric } - else if (existingEntry._data.equals(property)) + else { - _rateLimitedLogger.warn("Reported data that only differs in version for {} LB for property: {}. " - + "Old version: {}, New version: {}, with the same data: {}", fromNewLb ? "New" : "Old", propertyName, - existingEntry._version, propertyVersion, existingEntry._data); - // since the version is different, we don't skipping setting it to the cache + // Existing data is the same but with a different version. Some scenarios can cause this: + // 1) uris flip their status down-then-up quickly within an update receipt interval (ZK: ~10s, + // xDS: rate limiter ~0.5s) will end up with the same uri properties that only differs in the + // version. + // 2) uris data read from FS has a version of "-1|x". When read new data from ZK/xDS, the version will be + // different. + if (!isReadFromFS(existingEntry._version, propertyVersion)) + { + warnByPropType(isUriProp, + String.format("Received same data of different versions in %s LB for %s: %s." + + " Old version: %s, New version: %s, Data: %s", + fromNewLb ? "New" : "Old", propertyClassName, propertyName, existingEntry._version, + propertyVersion, property) + ); + } + // still need to put in the cache, don't skip } } + // compare with data in the other cache Cache> cacheToCompare = fromNewLb ? _oldLbPropertyCache : _newLbPropertyCache; - CacheEntry entryToCompare = cacheToCompare.getIfPresent(propertyName); + boolean isVersionEqual = entryToCompare != null && Objects.equals(entryToCompare._version, propertyVersion); + boolean isDataEqual = entryToCompare != null && entryToCompare._data.equals(property); + CacheEntry newEntry = new CacheEntry<>(propertyVersion, getTimestamp(), property); String entriesLogMsg = getEntriesMessage(fromNewLb, entryToCompare, newEntry); - if (entryToCompare != null && Objects.equals(entryToCompare._version, propertyVersion)) - { - if (!isEqual(entryToCompare, newEntry)) + if (isDataEqual && (isVersionEqual || isReadFromFS(propertyVersion, entryToCompare._version))) + { // data is the same AND version is the same or read from FS. it's a match + decrementEntryOutOfSyncCount(); // decrement the out-of-sync count for the entry received earlier + if (!isVersionEqual) { - incrementEntryOutOfSyncCount(); // increment the out-of-sync count for the entry received later - _rateLimitedLogger.warn("Received mismatched properties from dual read. {}", entriesLogMsg); + LOG.debug("Matched {} for {} that only differ in version. {}", + propertyClassName, propertyName, entriesLogMsg); } - else - { // entries are in-sync, decrement the out-of-sync count for the entry received earlier - decrementEntryOutOfSyncCount(); - _rateLimitedLogger.debug("Matched properties from dual read. {}", entriesLogMsg); + else { + LOG.debug("Matched {} for {}. {}", propertyClassName, propertyName, entriesLogMsg); } cacheToCompare.invalidate(propertyName); } - else - { + else if (!isDataEqual && isVersionEqual) + { // data is not the same but version is the same, a mismatch! + incrementPropertiesErrorCount(); + incrementEntryOutOfSyncCount(); // increment the out-of-sync count for the entry received later + warnByPropType(isUriProp, + String.format("Received mismatched %s for %s. %s", propertyClassName, propertyName, entriesLogMsg)); + cacheToCompare.invalidate(propertyName); + } + else { + if (isDataEqual) + { + warnByPropType(isUriProp, + String.format("Received same data of %s for %s but with different versions: %s", + propertyClassName, propertyName, entriesLogMsg) + ); + } cacheToAdd.put(propertyName, newEntry); - // if version is different, entries of both the old version and the new version will increment the out-of-sync count incrementEntryOutOfSyncCount(); - _rateLimitedLogger.debug("Added new entry to dual read cache for {} LB.", - fromNewLb ? "New" : "Old"); + LOG.debug("Added new entry {} for {} for {} LB.", propertyClassName, propertyName, fromNewLb ? "New" : "Old"); } + + // after cache entry add/delete above, re-log the latest entries on caches entryToCompare = cacheToCompare.getIfPresent(propertyName); newEntry = cacheToAdd.getIfPresent(propertyName); entriesLogMsg = getEntriesMessage(fromNewLb, entryToCompare, newEntry); - _rateLimitedLogger.debug("Current entries on dual read caches: {}", entriesLogMsg); + LOG.debug("Current entries of {} for {}: {}", propertyClassName, propertyName, entriesLogMsg); + } + + @VisibleForTesting + Cache> getOldLbCache() + { + return _oldLbPropertyCache; + } + + @VisibleForTesting + Cache> getNewLbCache() + { + return _newLbPropertyCache; } abstract void incrementEntryOutOfSyncCount(); abstract void decrementEntryOutOfSyncCount(); - abstract boolean isEqual(CacheEntry oldLbEntry, CacheEntry newLbEntry); + abstract void incrementPropertiesErrorCount(); abstract void onEvict(); @@ -140,13 +183,31 @@ private Cache> buildCache() // receives update from one source if (notification.getCause().equals(RemovalCause.SIZE)) { - _rateLimitedLogger.warn("Cache entry evicted since cache is full: {}", notification.getValue()); + LOG.debug("Cache entry evicted since cache is full: {}", notification.getValue()); onEvict(); } }) .build(); } + private boolean isReadFromFS(String v1, String v2) + { + // uri prop version from FS is "-1|x", where x is the number of uris, so we use startsWith here + return v1.startsWith(VERSION_FROM_FS) || v2.startsWith(VERSION_FROM_FS); + } + + private void warnByPropType(boolean isUriProp, String msg) + { + if (isUriProp) + { + _rateLimitedLogger.warn(msg); + } + else + { + LOG.warn(msg); + } + } + @VisibleForTesting String getEntriesMessage(boolean fromNewLb, CacheEntry oldE, CacheEntry newE) { @@ -201,15 +262,9 @@ void decrementEntryOutOfSyncCount() { } @Override - boolean isEqual(CacheEntry oldLbEntry, CacheEntry newLbEntry) + void incrementPropertiesErrorCount() { - if (!oldLbEntry._data.equals(newLbEntry._data)) - { - _dualReadLoadBalancerJmx.incrementClusterPropertiesErrorCount(); - return false; - } - - return true; + _dualReadLoadBalancerJmx.incrementClusterPropertiesErrorCount(); } @Override @@ -240,15 +295,9 @@ void decrementEntryOutOfSyncCount() { } @Override - boolean isEqual(CacheEntry oldLbEntry, CacheEntry newLbEntry) + void incrementPropertiesErrorCount() { - if (!oldLbEntry._data.equals(newLbEntry._data)) - { - _dualReadLoadBalancerJmx.incrementServicePropertiesErrorCount(); - return false; - } - - return true; + _dualReadLoadBalancerJmx.incrementServicePropertiesErrorCount(); } @Override @@ -279,15 +328,9 @@ void decrementEntryOutOfSyncCount() { } @Override - boolean isEqual(CacheEntry oldLbEntry, CacheEntry newLbEntry) + void incrementPropertiesErrorCount() { - if (oldLbEntry._data.Uris().size() != newLbEntry._data.Uris().size()) - { - _dualReadLoadBalancerJmx.incrementUriPropertiesErrorCount(); - return false; - } - - return true; + _dualReadLoadBalancerJmx.incrementUriPropertiesErrorCount(); } @Override diff --git a/d2/src/main/java/com/linkedin/d2/balancer/properties/UriProperties.java b/d2/src/main/java/com/linkedin/d2/balancer/properties/UriProperties.java index 3508bee44d..5bc6a17efd 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/properties/UriProperties.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/properties/UriProperties.java @@ -163,7 +163,8 @@ public Set getUriBySchemeAndPartition(String scheme, int partitionId) public String toString() { return "UriProperties [_clusterName=" + _clusterName + ", _urisBySchemeAndPartition=" - + _urisBySchemeAndPartition + ", _partitions=" + _partitionDesc + ", _uriSpecificProperties=" + _uriSpecificProperties + "]"; + + _urisBySchemeAndPartition + ", _partitions=" + _partitionDesc + ", _uriSpecificProperties=" + + _uriSpecificProperties + "]"; } @Override @@ -218,9 +219,18 @@ else if (!_urisBySchemeAndPartition.equals(other._urisBySchemeAndPartition)) return false; } else if (!_uriSpecificProperties.equals(other._uriSpecificProperties)) - return false; + { + // only two effectively empty uri specific properties maps are equal + return isEffectivelyEmpty(_uriSpecificProperties) + && isEffectivelyEmpty(other._uriSpecificProperties); + } return true; } + private static boolean isEffectivelyEmpty(Map> m) + { + // the map is empty OR all inner maps are actually empty + return m.isEmpty() || m.values().stream().allMatch(Map::isEmpty); + } } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/properties/UriPropertiesMerger.java b/d2/src/main/java/com/linkedin/d2/balancer/properties/UriPropertiesMerger.java index 9f313d72cd..bd4a80d926 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/properties/UriPropertiesMerger.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/properties/UriPropertiesMerger.java @@ -53,10 +53,6 @@ public UriProperties merge(String propertyName, Collection proper } } - if (maxVersion == -1) - { - LOG.warn("Merged Uri properties for cluster {} has invalid version -1. It should be > -1.", propertyName); - } return new UriProperties(clusterName, partitionData, uriSpecificProperties, maxVersion); } diff --git a/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/ZooKeeperEphemeralStore.java b/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/ZooKeeperEphemeralStore.java index ce889d9f96..5bb85dcb49 100644 --- a/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/ZooKeeperEphemeralStore.java +++ b/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/ZooKeeperEphemeralStore.java @@ -1029,7 +1029,7 @@ public void processResult(int rc, String s, Object o, byte[] bytes, Stat stat) long version = stat.getMzxid(); if (version <= 0) { - LOG.warn("ZK data from {} has invalid version: {}", s, version); + LOG.warn("ZK data has invalid version: {}, from path {}", version, s); } T value = _serializer.fromBytes(bytes, version); _properties.put(childPath, value); diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java b/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java index c6498a8501..ccf87b4078 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java @@ -474,6 +474,10 @@ public void onChanged(XdsClient.D2URIMapUpdate update) private void mergeAndPublishUris(String clusterName) { UriProperties mergedUriProperties = _uriPropertiesMerger.merge(clusterName, _currentData.values()); + if (mergedUriProperties.getVersion() == -1) + { + LOG.warn("xDS UriProperties has invalid version -1. Raw uris: {}", _currentData.values()); + } _uriEventBus.publishInitialize(clusterName, mergedUriProperties); if (_dualReadStateManager != null) diff --git a/d2/src/test/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerMonitorTest.java b/d2/src/test/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerMonitorTest.java index 86350c165f..c6fed50145 100644 --- a/d2/src/test/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerMonitorTest.java +++ b/d2/src/test/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerMonitorTest.java @@ -1,16 +1,190 @@ package com.linkedin.d2.balancer.dualread; +import com.google.common.cache.Cache; +import com.linkedin.d2.balancer.properties.ServiceProperties; +import com.linkedin.d2.balancer.properties.ServiceStoreProperties; import com.linkedin.d2.balancer.properties.UriProperties; import com.linkedin.d2.util.TestDataHelper; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static com.linkedin.d2.util.TestDataHelper.*; +import static org.mockito.Mockito.*; public class DualReadLoadBalancerMonitorTest { + private static class DualReadLoadBalancerMonitorTestFixure + { + @Mock + DualReadLoadBalancerJmx _mockJmx; + + DualReadLoadBalancerMonitorTestFixure() + { + MockitoAnnotations.initMocks(this); + doNothing().when(_mockJmx).incrementServicePropertiesOutOfSyncCount(); + doNothing().when(_mockJmx).decrementUriPropertiesOutOfSyncCount(); + doNothing().when(_mockJmx).incrementServicePropertiesErrorCount(); + } + + DualReadLoadBalancerMonitor.ServicePropertiesDualReadMonitor getServiceMonitor() + { + return new DualReadLoadBalancerMonitor.ServicePropertiesDualReadMonitor(_mockJmx, TestDataHelper.getClock()); + } + + DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor getUriMonitor() + { + return new DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor(_mockJmx, TestDataHelper.getClock()); + } + } + + @Test + public void testPut() + { + DualReadLoadBalancerMonitorTestFixure fixture = new DualReadLoadBalancerMonitorTestFixure(); + DualReadLoadBalancerMonitor.ServicePropertiesDualReadMonitor monitor = fixture.getServiceMonitor(); + + // put in one entry for new lb + putInService(monitor, SERVICE_NAME, SERVICE_STORE_PROPERTIES_1, "1", true, 1); + verify(fixture._mockJmx).incrementServicePropertiesOutOfSyncCount(); + verifyNoMoreInteractions(fixture._mockJmx); + + // put in duplicate entry will be skipped + monitor.reportData(SERVICE_NAME, SERVICE_STORE_PROPERTIES_1, "1", true); + verifyServiceOnCache(monitor.getNewLbCache(), SERVICE_NAME, SERVICE_STORE_PROPERTIES_1, "1"); + verifyNoMoreInteractions(fixture._mockJmx); + + // put in same data with a different version will succeed + monitor.reportData(SERVICE_NAME, SERVICE_STORE_PROPERTIES_1, "2", true); + verifyServiceOnCache(monitor.getNewLbCache(), SERVICE_NAME, SERVICE_STORE_PROPERTIES_1, "2"); + verify(fixture._mockJmx, times(2)).incrementServicePropertiesOutOfSyncCount(); + verifyNoMoreInteractions(fixture._mockJmx); + + // put in an entry to old lb with the same data but a different version (not read from FS) will succeed + monitor.reportData(SERVICE_NAME, SERVICE_STORE_PROPERTIES_1, "1", false); + verifyServiceOnCache(monitor.getOldLbCache(), SERVICE_NAME, SERVICE_STORE_PROPERTIES_1, "1"); + verifyServiceOnCache(monitor.getNewLbCache(), SERVICE_NAME, SERVICE_STORE_PROPERTIES_1, "2"); + verify(fixture._mockJmx, times(3)).incrementServicePropertiesOutOfSyncCount(); + verifyNoMoreInteractions(fixture._mockJmx); + } + + @Test + public void testServiceDataMatch() + { + DualReadLoadBalancerMonitorTestFixure fixture = new DualReadLoadBalancerMonitorTestFixure(); + DualReadLoadBalancerMonitor.ServicePropertiesDualReadMonitor monitor = fixture.getServiceMonitor(); + + // put in one new lb entry and one old lb entry, with different data and version + putInService(monitor, SERVICE_NAME, SERVICE_STORE_PROPERTIES_1, "-1", true, 1); + putInService(monitor, SERVICE_NAME, SERVICE_STORE_PROPERTIES_2, "1", false, 1); + verify(fixture._mockJmx, times(2)).incrementServicePropertiesOutOfSyncCount(); + verifyNoMoreInteractions(fixture._mockJmx); + + // Exact match (data and version all the same): put in an old lb entry that matches the new lb one + // will remove the new lb one + monitor.reportData(SERVICE_NAME, SERVICE_STORE_PROPERTIES_1, "-1", false); + Assert.assertEquals(monitor.getNewLbCache().size(), 0); + verifyServiceOnCache(monitor.getOldLbCache(), SERVICE_NAME, SERVICE_STORE_PROPERTIES_2, "1"); + verify(fixture._mockJmx).decrementServicePropertiesOutOfSyncCount(); + verifyNoMoreInteractions(fixture._mockJmx); + + // put the new lb one back in + putInService(monitor, SERVICE_NAME, SERVICE_STORE_PROPERTIES_1, "-1", true, 1); + verify(fixture._mockJmx, times(3)).incrementServicePropertiesOutOfSyncCount(); + verifyNoMoreInteractions(fixture._mockJmx); + + // Data match, version differs but "-1" will be matched as an exception: put in an old lb entry that matches + // the data of the new lb one will remove the new lb one + monitor.reportData(SERVICE_NAME, SERVICE_STORE_PROPERTIES_1, "2", false); + Assert.assertEquals(monitor.getNewLbCache().size(), 0); + verifyServiceOnCache(monitor.getOldLbCache(), SERVICE_NAME, SERVICE_STORE_PROPERTIES_2, "1"); + verify(fixture._mockJmx, times(2)).decrementServicePropertiesOutOfSyncCount(); + verifyNoMoreInteractions(fixture._mockJmx); + } + + @Test // since uri version read from FS is "-1|x", test for this case specifically + public void testUriDataMatch() + { + DualReadLoadBalancerMonitorTestFixure fixture = new DualReadLoadBalancerMonitorTestFixure(); + DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor monitor = fixture.getUriMonitor(); + + // put in one new lb entry and one old lb entry, with different data and version + putInUri(monitor, CLUSTER_NAME, PROPERTIES_1, "1|2", true, 1); + putInUri(monitor, CLUSTER_NAME, PROPERTIES_2, "-1|2", false, 1); + verify(fixture._mockJmx, times(2)).incrementUriPropertiesOutOfSyncCount(); + verifyNoMoreInteractions(fixture._mockJmx); + + // data match, version differs with version read FS + monitor.reportData(CLUSTER_NAME, PROPERTIES_2, "2|2", true); + Assert.assertEquals(monitor.getOldLbCache().size(), 0); + verifyUriOnCache(monitor.getNewLbCache(), CLUSTER_NAME, PROPERTIES_1, "1|2"); + verify(fixture._mockJmx).decrementUriPropertiesOutOfSyncCount(); + verifyNoMoreInteractions(fixture._mockJmx); + } + + @Test + public void testMismatch() + { + DualReadLoadBalancerMonitorTestFixure fixture = new DualReadLoadBalancerMonitorTestFixure(); + DualReadLoadBalancerMonitor.ServicePropertiesDualReadMonitor monitor = fixture.getServiceMonitor(); + + // put in one new lb entry and one old lb entry, with different data and version + putInService(monitor, SERVICE_NAME, SERVICE_STORE_PROPERTIES_1, "-1", true, 1); + putInService(monitor, SERVICE_NAME, SERVICE_STORE_PROPERTIES_2, "1", false, 1); + verify(fixture._mockJmx, times(2)).incrementServicePropertiesOutOfSyncCount(); + verifyNoMoreInteractions(fixture._mockJmx); + + // Mismatch (version is the same but data differs): put in a new lb entry that mismatch the old lb one + // will remove the old lb one + monitor.reportData(SERVICE_NAME, SERVICE_STORE_PROPERTIES_3, "1", true); + Assert.assertEquals(monitor.getOldLbCache().size(), 0); + verifyServiceOnCache(monitor.getNewLbCache(), SERVICE_NAME, SERVICE_STORE_PROPERTIES_1, "-1"); + verify(fixture._mockJmx).incrementServicePropertiesErrorCount(); + verify(fixture._mockJmx, times(3)).incrementServicePropertiesOutOfSyncCount(); + verifyNoMoreInteractions(fixture._mockJmx); + } + + private void putInService(DualReadLoadBalancerMonitor.ServicePropertiesDualReadMonitor monitor, + String name, ServiceStoreProperties prop, String version, boolean isFromNewLb, int expectedSizeAfter) + { + monitor.reportData(name, prop, version, isFromNewLb); + Cache> cache = + isFromNewLb ? monitor.getNewLbCache() : monitor.getOldLbCache(); + Assert.assertEquals(cache.size(), expectedSizeAfter); + verifyServiceOnCache(cache, name, prop, version); + } + + private void verifyServiceOnCache(Cache> cache, + String name, ServiceProperties prop, String v) + { + DualReadLoadBalancerMonitor.CacheEntry entry = cache.getIfPresent(name); + Assert.assertNotNull(entry); + Assert.assertEquals(entry._data, prop); + Assert.assertEquals(entry._version, v); + } + + private void putInUri(DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor monitor, + String name, UriProperties prop, String version, boolean isFromNewLb, int expectedSizeAfter) + { + monitor.reportData(name, prop, version, isFromNewLb); + Cache> cache = + isFromNewLb ? monitor.getNewLbCache() : monitor.getOldLbCache(); + Assert.assertEquals(cache.size(), expectedSizeAfter); + verifyUriOnCache(cache, name, prop, version); + } + + private void verifyUriOnCache(Cache> cache, + String name, UriProperties prop, String v) + { + DualReadLoadBalancerMonitor.CacheEntry entry = cache.getIfPresent(name); + Assert.assertNotNull(entry); + Assert.assertEquals(entry._data, prop); + Assert.assertEquals(entry._version, v); + } + @DataProvider public Object[][] getEntriesMessageDataProvider() { @@ -47,8 +221,7 @@ public void testGetEntriesMessage(Boolean isFromNewLb, String expected) { DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor monitor = - new DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor( - new DualReadLoadBalancerJmx(), TestDataHelper.getClock()); + new DualReadLoadBalancerMonitorTestFixure().getUriMonitor(); Assert.assertEquals(monitor.getEntriesMessage(isFromNewLb, oldE, newE), expected, diff --git a/d2/src/test/java/com/linkedin/d2/util/TestDataHelper.java b/d2/src/test/java/com/linkedin/d2/util/TestDataHelper.java index 472712879e..05cc6f7751 100644 --- a/d2/src/test/java/com/linkedin/d2/util/TestDataHelper.java +++ b/d2/src/test/java/com/linkedin/d2/util/TestDataHelper.java @@ -2,6 +2,9 @@ import com.google.common.collect.ImmutableMap; import com.linkedin.d2.balancer.properties.PartitionData; +import com.linkedin.d2.balancer.properties.PropertyKeys; +import com.linkedin.d2.balancer.properties.ServiceProperties; +import com.linkedin.d2.balancer.properties.ServiceStoreProperties; import com.linkedin.d2.balancer.properties.UriProperties; import com.linkedin.d2.discovery.event.D2ServiceDiscoveryEventHelper; import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter; @@ -24,7 +27,17 @@ public class TestDataHelper { + public static final String SERVICE_NAME = "testService"; + public static final String PATH = "/testService"; + public static final List STRATEGY_LIST_1 = Collections.singletonList("relative"); + public static final List STRATEGY_LIST_2 = Collections.singletonList("degrader"); public static final String CLUSTER_NAME = "TestCluster"; + public static final ServiceProperties SERVICE_PROPERTIES_1; + public static final ServiceProperties SERVICE_PROPERTIES_2; + public static final ServiceProperties SERVICE_PROPERTIES_3; + public static final ServiceStoreProperties SERVICE_STORE_PROPERTIES_1; + public static final ServiceStoreProperties SERVICE_STORE_PROPERTIES_2; + public static final ServiceStoreProperties SERVICE_STORE_PROPERTIES_3; public static final String HOST_1 = "google.com"; public static final String HOST_2 = "linkedin.com"; public static final String HOST_3 = "youtube.com"; @@ -59,6 +72,16 @@ public class TestDataHelper { 1, new PartitionData(3)); static { + SERVICE_PROPERTIES_1 = new ServiceProperties(SERVICE_NAME, CLUSTER_NAME, PATH, STRATEGY_LIST_1); + SERVICE_STORE_PROPERTIES_1 = new ServiceStoreProperties(SERVICE_PROPERTIES_1, null, null); + + SERVICE_PROPERTIES_2 = new ServiceProperties(SERVICE_NAME, CLUSTER_NAME, PATH, STRATEGY_LIST_2); + SERVICE_STORE_PROPERTIES_2 = new ServiceStoreProperties(SERVICE_PROPERTIES_2, null, null); + + SERVICE_PROPERTIES_3 = new ServiceProperties(SERVICE_NAME, CLUSTER_NAME, PATH, STRATEGY_LIST_1, + Collections.singletonMap(PropertyKeys.RELATIVE_LATENCY_HIGH_THRESHOLD_FACTOR, 8.0)); + SERVICE_STORE_PROPERTIES_3 = new ServiceStoreProperties(SERVICE_PROPERTIES_3, null, null); + PROPERTIES_1 = new UriProperties(CLUSTER_NAME, Collections.singletonMap(URI_1, MAP_1)); PROPERTIES_2 = new UriProperties(CLUSTER_NAME, Collections.singletonMap(URI_2, MAP_2)); PROPERTIES_3 = new UriProperties(CLUSTER_NAME, Collections.singletonMap(URI_3, MAP_3)); diff --git a/gradle.properties b/gradle.properties index 37cbb67157..84c571bc27 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.49.6 +version=29.49.7 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true