Skip to content

Commit

Permalink
Adjust dual read monitoring data match logic and remove log rate limi…
Browse files Browse the repository at this point in the history
…ter (#968)

* adjust dual read monitoring data match logic and log rate limiter

* take version of -1 as an exception to allow data match
  • Loading branch information
bohhyang authored Jan 19, 2024
1 parent 24b1631 commit 089994f
Show file tree
Hide file tree
Showing 9 changed files with 319 additions and 66 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.49.7] - 2024-01-18
adjust dual read monitoring data match logic and log rate limiter

## [29.49.6] - 2024-01-12
- fix dualread monitoring log

Expand Down Expand Up @@ -5612,7 +5615,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.49.6...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.49.7...master
[29.49.7]: https://github.com/linkedin/rest.li/compare/v29.49.6...v29.49.7
[29.49.6]: https://github.com/linkedin/rest.li/compare/v29.49.5...v29.49.6
[29.49.5]: https://github.com/linkedin/rest.li/compare/v29.49.4...v29.49.5
[29.49.4]: https://github.com/linkedin/rest.li/compare/v29.49.3...v29.49.4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,14 @@
* For each service discovery properties, there will be a cache for old load balancer data, and a cache
* for new load balancer data.
*
* When a new service discovery data is reported, it will check the cache of the other data source
* and see if there is a match based on the property name and version. If there is a match, it will
* use the {@link DualReadLoadBalancerMonitor#isEqual(CacheEntry, CacheEntry)} method to compare whether
* the two entries are equal.
* When a new service discovery data is reported, it will check if the cache of the other data source
* has data for the same property name. If there is, it will compare whether the two data are equal.
*/
public abstract class DualReadLoadBalancerMonitor<T>
{
private static final Logger LOG = LoggerFactory.getLogger(DualReadLoadBalancerMonitor.class);
public final static String DEFAULT_DATE_FORMAT = "YYYY/MM/dd HH:mm:ss.SSS";
public final static String VERSION_FROM_FS = "-1";
private static final long ERROR_REPORT_PERIOD = 10 * 1000; // Limit error report logging to every 10 seconds
private static final int MAX_CACHE_SIZE = 10000;
private final Cache<String, CacheEntry<T>> _oldLbPropertyCache;
Expand All @@ -69,64 +68,108 @@ public DualReadLoadBalancerMonitor(Clock clock)

public void reportData(String propertyName, T property, String propertyVersion, boolean fromNewLb)
{
// compare with existing data in the cache to add to
Cache<String, CacheEntry<T>> cacheToAdd = fromNewLb ? _newLbPropertyCache : _oldLbPropertyCache;
CacheEntry<T> existingEntry = cacheToAdd.getIfPresent(propertyName);
if (existingEntry != null)
String propertyClassName = property.getClass().getSimpleName();
boolean isUriProp = property instanceof UriProperties;

if (existingEntry != null && existingEntry._data.equals(property))
{
if (existingEntry._version.equals(propertyVersion) && existingEntry._data.equals(property))
if (existingEntry._version.equals(propertyVersion))
{
_rateLimitedLogger.debug("Reported duplicate for {} LB for property: {}, version: {}, data: {}",
fromNewLb ? "New" : "Old", propertyName, propertyVersion, property);
LOG.debug("Reported duplicated {} for {} for {} LB, version: {}, data: {}",
propertyClassName, propertyName, fromNewLb ? "New" : "Old", propertyVersion, property);
return; // skip setting duplicate data to avoid incorrectly incrementing OutOfSync metric
}
else if (existingEntry._data.equals(property))
else
{
_rateLimitedLogger.warn("Reported data that only differs in version for {} LB for property: {}. "
+ "Old version: {}, New version: {}, with the same data: {}", fromNewLb ? "New" : "Old", propertyName,
existingEntry._version, propertyVersion, existingEntry._data);
// since the version is different, we don't skipping setting it to the cache
// Existing data is the same but with a different version. Some scenarios can cause this:
// 1) uris flip their status down-then-up quickly within an update receipt interval (ZK: ~10s,
// xDS: rate limiter ~0.5s) will end up with the same uri properties that only differs in the
// version.
// 2) uris data read from FS has a version of "-1|x". When read new data from ZK/xDS, the version will be
// different.
if (!isReadFromFS(existingEntry._version, propertyVersion))
{
warnByPropType(isUriProp,
String.format("Received same data of different versions in %s LB for %s: %s."
+ " Old version: %s, New version: %s, Data: %s",
fromNewLb ? "New" : "Old", propertyClassName, propertyName, existingEntry._version,
propertyVersion, property)
);
}
// still need to put in the cache, don't skip
}
}

// compare with data in the other cache
Cache<String, CacheEntry<T>> cacheToCompare = fromNewLb ? _oldLbPropertyCache : _newLbPropertyCache;

CacheEntry<T> entryToCompare = cacheToCompare.getIfPresent(propertyName);
boolean isVersionEqual = entryToCompare != null && Objects.equals(entryToCompare._version, propertyVersion);
boolean isDataEqual = entryToCompare != null && entryToCompare._data.equals(property);

CacheEntry<T> newEntry = new CacheEntry<>(propertyVersion, getTimestamp(), property);
String entriesLogMsg = getEntriesMessage(fromNewLb, entryToCompare, newEntry);

if (entryToCompare != null && Objects.equals(entryToCompare._version, propertyVersion))
{
if (!isEqual(entryToCompare, newEntry))
if (isDataEqual && (isVersionEqual || isReadFromFS(propertyVersion, entryToCompare._version)))
{ // data is the same AND version is the same or read from FS. it's a match
decrementEntryOutOfSyncCount(); // decrement the out-of-sync count for the entry received earlier
if (!isVersionEqual)
{
incrementEntryOutOfSyncCount(); // increment the out-of-sync count for the entry received later
_rateLimitedLogger.warn("Received mismatched properties from dual read. {}", entriesLogMsg);
LOG.debug("Matched {} for {} that only differ in version. {}",
propertyClassName, propertyName, entriesLogMsg);
}
else
{ // entries are in-sync, decrement the out-of-sync count for the entry received earlier
decrementEntryOutOfSyncCount();
_rateLimitedLogger.debug("Matched properties from dual read. {}", entriesLogMsg);
else {
LOG.debug("Matched {} for {}. {}", propertyClassName, propertyName, entriesLogMsg);
}
cacheToCompare.invalidate(propertyName);
}
else
{
else if (!isDataEqual && isVersionEqual)
{ // data is not the same but version is the same, a mismatch!
incrementPropertiesErrorCount();
incrementEntryOutOfSyncCount(); // increment the out-of-sync count for the entry received later
warnByPropType(isUriProp,
String.format("Received mismatched %s for %s. %s", propertyClassName, propertyName, entriesLogMsg));
cacheToCompare.invalidate(propertyName);
}
else {
if (isDataEqual)
{
warnByPropType(isUriProp,
String.format("Received same data of %s for %s but with different versions: %s",
propertyClassName, propertyName, entriesLogMsg)
);
}
cacheToAdd.put(propertyName, newEntry);
// if version is different, entries of both the old version and the new version will increment the out-of-sync count
incrementEntryOutOfSyncCount();
_rateLimitedLogger.debug("Added new entry to dual read cache for {} LB.",
fromNewLb ? "New" : "Old");
LOG.debug("Added new entry {} for {} for {} LB.", propertyClassName, propertyName, fromNewLb ? "New" : "Old");
}

// after cache entry add/delete above, re-log the latest entries on caches
entryToCompare = cacheToCompare.getIfPresent(propertyName);
newEntry = cacheToAdd.getIfPresent(propertyName);
entriesLogMsg = getEntriesMessage(fromNewLb, entryToCompare, newEntry);
_rateLimitedLogger.debug("Current entries on dual read caches: {}", entriesLogMsg);
LOG.debug("Current entries of {} for {}: {}", propertyClassName, propertyName, entriesLogMsg);
}

@VisibleForTesting
Cache<String, CacheEntry<T>> getOldLbCache()
{
return _oldLbPropertyCache;
}

@VisibleForTesting
Cache<String, CacheEntry<T>> getNewLbCache()
{
return _newLbPropertyCache;
}

abstract void incrementEntryOutOfSyncCount();

abstract void decrementEntryOutOfSyncCount();

abstract boolean isEqual(CacheEntry<T> oldLbEntry, CacheEntry<T> newLbEntry);
abstract void incrementPropertiesErrorCount();

abstract void onEvict();

Expand All @@ -140,13 +183,31 @@ private Cache<String, CacheEntry<T>> buildCache()
// receives update from one source
if (notification.getCause().equals(RemovalCause.SIZE))
{
_rateLimitedLogger.warn("Cache entry evicted since cache is full: {}", notification.getValue());
LOG.debug("Cache entry evicted since cache is full: {}", notification.getValue());
onEvict();
}
})
.build();
}

private boolean isReadFromFS(String v1, String v2)
{
// uri prop version from FS is "-1|x", where x is the number of uris, so we use startsWith here
return v1.startsWith(VERSION_FROM_FS) || v2.startsWith(VERSION_FROM_FS);
}

private void warnByPropType(boolean isUriProp, String msg)
{
if (isUriProp)
{
_rateLimitedLogger.warn(msg);
}
else
{
LOG.warn(msg);
}
}

@VisibleForTesting
String getEntriesMessage(boolean fromNewLb, CacheEntry<T> oldE, CacheEntry<T> newE)
{
Expand Down Expand Up @@ -201,15 +262,9 @@ void decrementEntryOutOfSyncCount() {
}

@Override
boolean isEqual(CacheEntry<ClusterProperties> oldLbEntry, CacheEntry<ClusterProperties> newLbEntry)
void incrementPropertiesErrorCount()
{
if (!oldLbEntry._data.equals(newLbEntry._data))
{
_dualReadLoadBalancerJmx.incrementClusterPropertiesErrorCount();
return false;
}

return true;
_dualReadLoadBalancerJmx.incrementClusterPropertiesErrorCount();
}

@Override
Expand Down Expand Up @@ -240,15 +295,9 @@ void decrementEntryOutOfSyncCount() {
}

@Override
boolean isEqual(CacheEntry<ServiceProperties> oldLbEntry, CacheEntry<ServiceProperties> newLbEntry)
void incrementPropertiesErrorCount()
{
if (!oldLbEntry._data.equals(newLbEntry._data))
{
_dualReadLoadBalancerJmx.incrementServicePropertiesErrorCount();
return false;
}

return true;
_dualReadLoadBalancerJmx.incrementServicePropertiesErrorCount();
}

@Override
Expand Down Expand Up @@ -279,15 +328,9 @@ void decrementEntryOutOfSyncCount() {
}

@Override
boolean isEqual(CacheEntry<UriProperties> oldLbEntry, CacheEntry<UriProperties> newLbEntry)
void incrementPropertiesErrorCount()
{
if (oldLbEntry._data.Uris().size() != newLbEntry._data.Uris().size())
{
_dualReadLoadBalancerJmx.incrementUriPropertiesErrorCount();
return false;
}

return true;
_dualReadLoadBalancerJmx.incrementUriPropertiesErrorCount();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ public Set<URI> getUriBySchemeAndPartition(String scheme, int partitionId)
public String toString()
{
return "UriProperties [_clusterName=" + _clusterName + ", _urisBySchemeAndPartition="
+ _urisBySchemeAndPartition + ", _partitions=" + _partitionDesc + ", _uriSpecificProperties=" + _uriSpecificProperties + "]";
+ _urisBySchemeAndPartition + ", _partitions=" + _partitionDesc + ", _uriSpecificProperties="
+ _uriSpecificProperties + "]";
}

@Override
Expand Down Expand Up @@ -218,9 +219,18 @@ else if (!_urisBySchemeAndPartition.equals(other._urisBySchemeAndPartition))
return false;
}
else if (!_uriSpecificProperties.equals(other._uriSpecificProperties))
return false;
{
// only two effectively empty uri specific properties maps are equal
return isEffectivelyEmpty(_uriSpecificProperties)
&& isEffectivelyEmpty(other._uriSpecificProperties);
}

return true;
}

private static boolean isEffectivelyEmpty(Map<URI, Map<String, Object>> m)
{
// the map is empty OR all inner maps are actually empty
return m.isEmpty() || m.values().stream().allMatch(Map::isEmpty);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ public UriProperties merge(String propertyName, Collection<UriProperties> proper
}
}

if (maxVersion == -1)
{
LOG.warn("Merged Uri properties for cluster {} has invalid version -1. It should be > -1.", propertyName);
}
return new UriProperties(clusterName, partitionData, uriSpecificProperties, maxVersion);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ public void processResult(int rc, String s, Object o, byte[] bytes, Stat stat)
long version = stat.getMzxid();
if (version <= 0)
{
LOG.warn("ZK data from {} has invalid version: {}", s, version);
LOG.warn("ZK data has invalid version: {}, from path {}", version, s);
}
T value = _serializer.fromBytes(bytes, version);
_properties.put(childPath, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,10 @@ public void onChanged(XdsClient.D2URIMapUpdate update)
private void mergeAndPublishUris(String clusterName)
{
UriProperties mergedUriProperties = _uriPropertiesMerger.merge(clusterName, _currentData.values());
if (mergedUriProperties.getVersion() == -1)
{
LOG.warn("xDS UriProperties has invalid version -1. Raw uris: {}", _currentData.values());
}
_uriEventBus.publishInitialize(clusterName, mergedUriProperties);

if (_dualReadStateManager != null)
Expand Down
Loading

0 comments on commit 089994f

Please sign in to comment.