From bb7d60176d6f7fcda09fc988e8199cf426fe7190 Mon Sep 17 00:00:00 2001 From: Zac Policzer Date: Fri, 15 Nov 2024 15:07:41 -0800 Subject: [PATCH] [server][metrics] Fix sawtooth behavior on follower lag monitoring (#1305) * [server][metrics] Fix sawtooth behavior on follower lag monitoring For AA stores heartbeats from all peer colos are forwarded to the local follower replica, and the last received timestamp heartbeat is reported. The effect of this is that if one colo is lagging, this will cause the metric to jump around as unhealthy heartbeats are interleaved with healthy ones. This change addresses that by having the follower strictly report only the highest timestamp amongst heartbeats it's recieved. --- .../heartbeat/HeartbeatMonitoringService.java | 84 +++++++++++-------- .../heartbeat/HeartbeatTimeStampEntry.java | 24 ++++++ .../heartbeat/HeartbeatVersionedStats.java | 9 +- .../HeartbeatMonitoringServiceTest.java | 79 +++++++++-------- 4 files changed, 113 insertions(+), 83 deletions(-) create mode 100644 clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatTimeStampEntry.java diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java index cab71342953..7fd9f177e5e 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java @@ -12,8 +12,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.tuple.MutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -51,8 +49,8 @@ public class HeartbeatMonitoringService extends AbstractVeniceService { private final String localRegionName; // store -> version -> partition -> region -> (timestamp, RTS) - private final Map>>>> followerHeartbeatTimeStamps; - private final Map>>>> leaderHeartbeatTimeStamps; + private final Map>>> followerHeartbeatTimeStamps; + private final Map>>> leaderHeartbeatTimeStamps; HeartbeatVersionedStats versionStatsReporter; public HeartbeatMonitoringService( @@ -76,7 +74,7 @@ public HeartbeatMonitoringService( } private synchronized void initializeEntry( - Map>>>> heartbeatTimestamps, + Map>>> heartbeatTimestamps, Version version, int partition, boolean isFollower) { @@ -87,20 +85,21 @@ private synchronized void initializeEntry( heartbeatTimestamps.computeIfAbsent(version.getStoreName(), storeKey -> new VeniceConcurrentHashMap<>()) .computeIfAbsent(version.getNumber(), versionKey -> new VeniceConcurrentHashMap<>()) .computeIfAbsent(partition, partitionKey -> { - Map> regionTimestamps = new VeniceConcurrentHashMap<>(); + Map regionTimestamps = new VeniceConcurrentHashMap<>(); if (version.isActiveActiveReplicationEnabled() && !isFollower) { for (String region: regionNames) { - regionTimestamps.put(region, new MutablePair<>(System.currentTimeMillis(), false)); + regionTimestamps.put(region, new HeartbeatTimeStampEntry(System.currentTimeMillis(), false, false)); } } else { - regionTimestamps.put(localRegionName, new MutablePair<>(System.currentTimeMillis(), false)); + regionTimestamps + .put(localRegionName, new HeartbeatTimeStampEntry(System.currentTimeMillis(), false, false)); } return regionTimestamps; }); } private synchronized void removeEntry( - Map>>>> heartbeatTimestamps, + Map>>> heartbeatTimestamps, Version version, int partition) { heartbeatTimestamps.computeIfPresent(version.getStoreName(), (storeKey, versionMap) -> { @@ -173,21 +172,21 @@ public Map getHeartbeatInfo( } Map getHeartbeatInfoFromMap( - Map>>>> heartbeatTimestampMap, + Map>>> heartbeatTimestampMap, String leaderState, long currentTimestamp, String versionTopicName, int partitionFilter, boolean filterLagReplica) { Map result = new VeniceConcurrentHashMap<>(); - for (Map.Entry>>>> storeName: heartbeatTimestampMap + for (Map.Entry>>> storeName: heartbeatTimestampMap .entrySet()) { - for (Map.Entry>>> version: storeName.getValue() + for (Map.Entry>> version: storeName.getValue() .entrySet()) { - for (Map.Entry>> partition: version.getValue().entrySet()) { - for (Map.Entry> region: partition.getValue().entrySet()) { + for (Map.Entry> partition: version.getValue().entrySet()) { + for (Map.Entry region: partition.getValue().entrySet()) { String topicName = Version.composeKafkaTopic(storeName.getKey(), version.getKey()); - long heartbeatTs = region.getValue().getLeft(); + long heartbeatTs = region.getValue().timestamp; long lag = currentTimestamp - heartbeatTs; if (!versionTopicName.equals(topicName)) { continue; @@ -204,7 +203,7 @@ Map getHeartbeatInfoFromMap( replicaId, region.getKey(), leaderState, - region.getValue().getRight(), + region.getValue().readyToServe, heartbeatTs, lag); result.put(replicaId + "-" + region.getKey(), replicaHeartbeatInfo); @@ -245,7 +244,7 @@ public void recordLeaderHeartbeat( String region, Long timestamp, boolean isReadyToServe) { - recordHeartbeat(store, version, partition, region, timestamp, leaderHeartbeatTimeStamps, isReadyToServe); + recordHeartbeat(store, version, partition, region, timestamp, leaderHeartbeatTimeStamps, isReadyToServe, false); } /** @@ -265,7 +264,7 @@ public void recordFollowerHeartbeat( String region, Long timestamp, boolean isReadyToServe) { - recordHeartbeat(store, version, partition, region, timestamp, followerHeartbeatTimeStamps, isReadyToServe); + recordHeartbeat(store, version, partition, region, timestamp, followerHeartbeatTimeStamps, isReadyToServe, true); } private void recordHeartbeat( @@ -274,13 +273,24 @@ private void recordHeartbeat( int partition, String region, Long timestamp, - Map>>>> heartbeatTimestamps, - boolean isReadyToServe) { + Map>>> heartbeatTimestamps, + boolean isReadyToServe, + boolean retainHighestTimeStamp) { if (region != null) { heartbeatTimestamps.computeIfPresent(store, (storeKey, perVersionMap) -> { perVersionMap.computeIfPresent(version, (versionKey, perPartitionMap) -> { perPartitionMap.computeIfPresent(partition, (partitionKey, perRegionMap) -> { - perRegionMap.put(region, new MutablePair<>(timestamp, isReadyToServe)); + // If we are retaining only the highest timestamp for a given heartbeat, if the current held heartbeat + // is of a higher value AND was an entry was consumed (not a place holder value by the process) then + // we will No-Op in favor of retaining that higher timestamp. This behavior is specific to follower + // nodes because the intent of this metric is to only show the lag of the follower relative to the leader + if (retainHighestTimeStamp && perRegionMap.get(region) != null + && perRegionMap.get(region).timestamp > timestamp && perRegionMap.get(region).consumedFromUpstream) { + // No-Op + } else { + // record the heartbeat time stamp + perRegionMap.put(region, new HeartbeatTimeStampEntry(timestamp, isReadyToServe, true)); + } return perRegionMap; }); return perPartitionMap; @@ -290,29 +300,29 @@ private void recordHeartbeat( } } - protected Map>>>> getLeaderHeartbeatTimeStamps() { + protected Map>>> getLeaderHeartbeatTimeStamps() { return leaderHeartbeatTimeStamps; } - protected Map>>>> getFollowerHeartbeatTimeStamps() { + protected Map>>> getFollowerHeartbeatTimeStamps() { return followerHeartbeatTimeStamps; } protected void recordLags( - Map>>>> heartbeatTimestamps, + Map>>> heartbeatTimestamps, ReportLagFunction lagFunction) { - for (Map.Entry>>>> storeName: heartbeatTimestamps + for (Map.Entry>>> storeName: heartbeatTimestamps .entrySet()) { - for (Map.Entry>>> version: storeName.getValue() + for (Map.Entry>> version: storeName.getValue() .entrySet()) { - for (Map.Entry>> partition: version.getValue().entrySet()) { - for (Map.Entry> region: partition.getValue().entrySet()) { + for (Map.Entry> partition: version.getValue().entrySet()) { + for (Map.Entry region: partition.getValue().entrySet()) { lagFunction.apply( storeName.getKey(), version.getKey(), region.getKey(), - region.getValue().getLeft(), - region.getValue().getRight()); + region.getValue().timestamp, + region.getValue().readyToServe); } } } @@ -331,17 +341,17 @@ protected void record() { } protected void checkAndMaybeLogHeartbeatDelayMap( - Map>>>> heartbeatTimestamps) { + Map>>> heartbeatTimestamps) { long currentTimestamp = System.currentTimeMillis(); - for (Map.Entry>>>> storeName: heartbeatTimestamps + for (Map.Entry>>> storeName: heartbeatTimestamps .entrySet()) { - for (Map.Entry>>> version: storeName.getValue() + for (Map.Entry>> version: storeName.getValue() .entrySet()) { - for (Map.Entry>> partition: version.getValue().entrySet()) { - for (Map.Entry> region: partition.getValue().entrySet()) { - long heartbeatTs = region.getValue().getLeft(); + for (Map.Entry> partition: version.getValue().entrySet()) { + for (Map.Entry region: partition.getValue().entrySet()) { + long heartbeatTs = region.getValue().timestamp; long lag = currentTimestamp - heartbeatTs; - if (lag > DEFAULT_STALE_HEARTBEAT_LOG_THRESHOLD_MILLIS && region.getValue().getRight()) { + if (lag > DEFAULT_STALE_HEARTBEAT_LOG_THRESHOLD_MILLIS && region.getValue().readyToServe) { String replicaId = Utils .getReplicaId(Version.composeKafkaTopic(storeName.getKey(), version.getKey()), partition.getKey()); LOGGER.warn( diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatTimeStampEntry.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatTimeStampEntry.java new file mode 100644 index 00000000000..ec647fa86fd --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatTimeStampEntry.java @@ -0,0 +1,24 @@ +package com.linkedin.davinci.stats.ingestion.heartbeat; + +public class HeartbeatTimeStampEntry { + /** + * Whether this heartbeat entry is for a partition which is ready to serve + */ + public final boolean readyToServe; + + /** + * Whether this heartbeat entry was consumed from input or if the system initialized it as a default entry + */ + public final boolean consumedFromUpstream; + + /** + * The timestamp associated with this entry + */ + public final long timestamp; + + public HeartbeatTimeStampEntry(long timestamp, boolean readyToServe, boolean consumedFromUpstream) { + this.readyToServe = readyToServe; + this.consumedFromUpstream = consumedFromUpstream; + this.timestamp = timestamp; + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStats.java index 1e303f3e8a1..7f3220c000e 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStats.java @@ -7,20 +7,19 @@ import io.tehuti.metrics.MetricsRepository; import java.util.Map; import java.util.function.Supplier; -import org.apache.commons.lang3.tuple.Pair; public class HeartbeatVersionedStats extends AbstractVeniceAggVersionedStats { - private final Map>>>> leaderMonitors; - private final Map>>>> followerMonitors; + private final Map>>> leaderMonitors; + private final Map>>> followerMonitors; public HeartbeatVersionedStats( MetricsRepository metricsRepository, ReadOnlyStoreRepository metadataRepository, Supplier statsInitiator, StatsSupplier reporterSupplier, - Map>>>> leaderMonitors, - Map>>>> followerMonitors) { + Map>>> leaderMonitors, + Map>>> followerMonitors) { super(metricsRepository, metadataRepository, statsInitiator, reporterSupplier, true); this.leaderMonitors = leaderMonitors; this.followerMonitors = followerMonitors; diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java index bc0dbc8ca08..01c902bd89b 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java @@ -24,8 +24,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.tuple.MutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; @@ -51,7 +49,7 @@ public void testGetHeartbeatInfoFromMap() { HeartbeatMonitoringService heartbeatMonitoringService = mock(HeartbeatMonitoringService.class); doCallRealMethod().when(heartbeatMonitoringService) .getHeartbeatInfoFromMap(anyMap(), anyString(), anyLong(), anyString(), anyInt(), anyBoolean()); - Map>>>> leaderMap = + Map>>> leaderMap = new VeniceConcurrentHashMap<>(); String store = "testStore"; int version = 1; @@ -61,7 +59,7 @@ public void testGetHeartbeatInfoFromMap() { leaderMap.put(store, new VeniceConcurrentHashMap<>()); leaderMap.get(store).put(version, new VeniceConcurrentHashMap<>()); leaderMap.get(store).get(version).put(partition, new VeniceConcurrentHashMap<>()); - leaderMap.get(store).get(version).get(partition).put(region, new MutablePair<>(timestamp, true)); + leaderMap.get(store).get(version).get(partition).put(region, new HeartbeatTimeStampEntry(timestamp, true, true)); Assert.assertEquals( heartbeatMonitoringService .getHeartbeatInfoFromMap( @@ -148,7 +146,6 @@ public void testAddLeaderLagMonitor() { // Default hybrid store config HybridStoreConfig hybridStoreConfig = new HybridStoreConfigImpl(1L, 1L, 1L, DataReplicationPolicy.NON_AGGREGATE, BufferReplayPolicy.REWIND_FROM_SOP); - // Version configs Version backupVersion = new VersionImpl(TEST_STORE, 1, "1"); // Non-hybrid version Version currentVersion = new VersionImpl(TEST_STORE, 2, "2"); // hybrid version, active/active @@ -201,30 +198,34 @@ public void testAddLeaderLagMonitor() { heartbeatMonitoringService.addFollowerLagMonitor(futureVersion, 1); heartbeatMonitoringService.addFollowerLagMonitor(futureVersion, 2); + // The above calls initialize entries with current time, and followers will retain the highest timestamp. + // we'll note the current time that comes AFTER the initialization and use that from which to increment the time. + long baseTimeStamp = System.currentTimeMillis(); + // Follower heartbeats // local fabric heartbeats - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 0, LOCAL_FABRIC, 1001L, true); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 0, LOCAL_FABRIC, 1001L, true); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 0, LOCAL_FABRIC, 1001L, true); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 1, LOCAL_FABRIC, 1001L, true); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 1, LOCAL_FABRIC, 1001L, true); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 1, LOCAL_FABRIC, 1001L, true); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 2, LOCAL_FABRIC, 1001L, true); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 2, LOCAL_FABRIC, 1001L, true); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 2, LOCAL_FABRIC, 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 0, LOCAL_FABRIC, baseTimeStamp + 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 0, LOCAL_FABRIC, baseTimeStamp + 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 0, LOCAL_FABRIC, baseTimeStamp + 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 1, LOCAL_FABRIC, baseTimeStamp + 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 1, LOCAL_FABRIC, baseTimeStamp + 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 1, LOCAL_FABRIC, baseTimeStamp + 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 2, LOCAL_FABRIC, baseTimeStamp + 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 2, LOCAL_FABRIC, baseTimeStamp + 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 2, LOCAL_FABRIC, baseTimeStamp + 1001L, true); // remote fabric heartbeats - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 0, REMOTE_FABRIC, 1001L, true); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 0, REMOTE_FABRIC, 1001L, true); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 0, REMOTE_FABRIC, 1001L, true); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 1, REMOTE_FABRIC, 1001L, true); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 1, REMOTE_FABRIC, 1001L, true); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 1, REMOTE_FABRIC, 1001L, true); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 3, REMOTE_FABRIC, 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 0, REMOTE_FABRIC, baseTimeStamp + 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 0, REMOTE_FABRIC, baseTimeStamp + 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 0, REMOTE_FABRIC, baseTimeStamp + 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 1, REMOTE_FABRIC, baseTimeStamp + 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 1, REMOTE_FABRIC, baseTimeStamp + 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 1, REMOTE_FABRIC, baseTimeStamp + 1001L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 3, REMOTE_FABRIC, baseTimeStamp + 1001L, true); // bogus heartbeats - heartbeatMonitoringService.recordLeaderHeartbeat(TEST_STORE, 2, 0, LOCAL_FABRIC, 1002L, true); - heartbeatMonitoringService.recordLeaderHeartbeat(TEST_STORE, 3, 0, LOCAL_FABRIC, 1002L, true); + heartbeatMonitoringService.recordLeaderHeartbeat(TEST_STORE, 2, 0, LOCAL_FABRIC, baseTimeStamp + 1002L, true); + heartbeatMonitoringService.recordLeaderHeartbeat(TEST_STORE, 3, 0, LOCAL_FABRIC, baseTimeStamp + 1002L, true); Assert.assertEquals(heartbeatMonitoringService.getFollowerHeartbeatTimeStamps().size(), 1); // We only expect two entries as version 1 is a non-hybrid version @@ -244,17 +245,15 @@ public void testAddLeaderLagMonitor() { .get(TEST_STORE) .get(futureVersion.getNumber()) .get(1) - .get(LOCAL_FABRIC) - .getLeft(); - Assert.assertEquals((long) value, 1001L); + .get(LOCAL_FABRIC).timestamp; + Assert.assertTrue(value >= baseTimeStamp + 1001L); value = heartbeatMonitoringService.getFollowerHeartbeatTimeStamps() .get(TEST_STORE) .get(futureVersion.getNumber()) .get(1) - .get(REMOTE_FABRIC) - .getLeft(); - Assert.assertEquals((long) value, 1001L); + .get(REMOTE_FABRIC).timestamp; + Assert.assertTrue(value >= baseTimeStamp + 1001L); // Leader state transitions heartbeatMonitoringService.addLeaderLagMonitor(currentVersion, 1); @@ -294,9 +293,9 @@ public void testAddLeaderLagMonitor() { Assert.assertNull( heartbeatMonitoringService.getLeaderHeartbeatTimeStamps().get(TEST_STORE).get(backupVersion.getNumber())); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 1, REMOTE_FABRIC, 1003L, true); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 1, REMOTE_FABRIC, 1003L, true); - heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 1, REMOTE_FABRIC, 1003L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 1, 1, REMOTE_FABRIC, baseTimeStamp + 1003L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 2, 1, REMOTE_FABRIC, baseTimeStamp + 1003L, true); + heartbeatMonitoringService.recordFollowerHeartbeat(TEST_STORE, 3, 1, REMOTE_FABRIC, baseTimeStamp + 1003L, true); // make sure leaders are cleared out Assert.assertNull( @@ -326,16 +325,14 @@ public void testAddLeaderLagMonitor() { .get(TEST_STORE) .get(futureVersion.getNumber()) .get(1) - .get(REMOTE_FABRIC) - .getLeft(); - Assert.assertEquals((long) value, 1003L); + .get(REMOTE_FABRIC).timestamp; + Assert.assertEquals((long) value, baseTimeStamp + 1003L); value = heartbeatMonitoringService.getFollowerHeartbeatTimeStamps() .get(TEST_STORE) .get(currentVersion.getNumber()) .get(1) - .get(REMOTE_FABRIC) - .getLeft(); - Assert.assertEquals((long) value, 1003L); + .get(REMOTE_FABRIC).timestamp; + Assert.assertEquals((long) value, baseTimeStamp + 1003L); // Drop/Error some heartbeatMonitoringService.removeLagMonitor(currentVersion, 0); @@ -344,11 +341,11 @@ public void testAddLeaderLagMonitor() { // Send heartbeats to resources we just dropped heartbeatMonitoringService - .recordFollowerHeartbeat(TEST_STORE, backupVersion.getNumber(), 2, LOCAL_FABRIC, 1005L, true); + .recordFollowerHeartbeat(TEST_STORE, backupVersion.getNumber(), 2, LOCAL_FABRIC, baseTimeStamp + 1005L, true); heartbeatMonitoringService - .recordFollowerHeartbeat(TEST_STORE, currentVersion.getNumber(), 0, LOCAL_FABRIC, 1005L, true); + .recordFollowerHeartbeat(TEST_STORE, currentVersion.getNumber(), 0, LOCAL_FABRIC, baseTimeStamp + 1005L, true); heartbeatMonitoringService - .recordFollowerHeartbeat(TEST_STORE, futureVersion.getNumber(), 1, LOCAL_FABRIC, 1005L, true); + .recordFollowerHeartbeat(TEST_STORE, futureVersion.getNumber(), 1, LOCAL_FABRIC, baseTimeStamp + 1005L, true); Assert.assertNull( heartbeatMonitoringService.getFollowerHeartbeatTimeStamps().get(TEST_STORE).get(backupVersion.getNumber()));