From 6ef56f2d2d427e8d45f756644abf45300ae49e50 Mon Sep 17 00:00:00 2001 From: Bohan Yang Date: Mon, 18 Nov 2024 12:46:19 -0800 Subject: [PATCH 1/5] Add announcer status delegate interface and support getting server announce mode --- CHANGELOG.md | 6 +++- .../d2/balancer/LoadBalancerServer.java | 17 ++++++++++ .../servers/AnnouncerStatusDelegate.java | 32 +++++++++++++++++++ .../balancer/servers/ZooKeeperAnnouncer.java | 22 ++++++++++++- .../servers/ZooKeeperConnectionManager.java | 17 ++++++++++ .../d2/balancer/servers/ZooKeeperServer.java | 7 +++- .../d2/jmx/ZooKeeperAnnouncerJmx.java | 5 +++ .../d2/jmx/ZooKeeperAnnouncerJmxMXBean.java | 6 ++++ gradle.properties | 2 +- 9 files changed, 110 insertions(+), 4 deletions(-) create mode 100644 d2/src/main/java/com/linkedin/d2/balancer/servers/AnnouncerStatusDelegate.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 50e8ccc2a4..81c5000922 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.63.0] - 2024-11-06 +- Add announcer status delegate interface + ## [29.62.1] - 2024-11-05 - Enhancements in ByteString and its ByteIterator to reduce object allocation @@ -5758,7 +5761,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.62.1...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.63.0...master +[29.63.0]: https://github.com/linkedin/rest.li/compare/v29.62.1...v29.63.0 [29.62.1]: https://github.com/linkedin/rest.li/compare/v29.62.0...v29.62.1 [29.62.0]: https://github.com/linkedin/rest.li/compare/v29.61.0...v29.62.0 [29.61.0]: https://github.com/linkedin/rest.li/compare/v29.60.0...v29.61.0 diff --git a/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java b/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java index 978b80fc3e..2853c92e34 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java @@ -71,4 +71,21 @@ void addUriSpecificProperty(String clusterName, void shutdown(Callback callback); String getConnectString(); + + /** + * Get announce mode of the server. Some server may have different announce mode, e.g. dual write mode, force announce + * mode. + */ + AnnounceMode getAnnounceMode(); + + enum AnnounceMode + { + STATIC_OLD_SR_ONLY, // statically only announce to old service registry + DYNAMIC_OLD_SR_ONLY, // dynamically only announce to old service registry + DYNAMIC_DUAL_WRITE, // dynamically announce to both service registries + DYNAMIC_NEW_SR_ONLY, // dynamically only announce to new service registry + DYNAMIC_FORCE_DUAL_WRITE, // Using dynamic server yet forced to announce to both service registries + STATIC_NEW_SR_ONLY, // statically only announce to new service registry + STATIC_NEW_SR_ONLY_NO_WRITE_BACK // statically only announce to new service registry without writing back to old service registry + } } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/servers/AnnouncerStatusDelegate.java b/d2/src/main/java/com/linkedin/d2/balancer/servers/AnnouncerStatusDelegate.java new file mode 100644 index 0000000000..0178b75742 --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/balancer/servers/AnnouncerStatusDelegate.java @@ -0,0 +1,32 @@ +package com.linkedin.d2.balancer.servers; + +import java.net.URI; + + +public interface AnnouncerStatusDelegate +{ + /** + * @return true if the markup intent has been sent. + */ + boolean isMarkUpIntentSent(); + + /** + * @return true if the dark warmup mark up intent has been sent. + */ + boolean isDarkWarmupMarkUpIntentSent(); + + /** + * @return the name of the regular cluster that the announcer manages. + */ + String getCluster(); + + /** + * @return the name of the warmup cluster that the announcer manages. + */ + String getWarmupCluster(); + + /** + * @return the uri that the announcer manages. + */ + URI getURI(); +} diff --git a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java index 78fbfb0b49..bc494c259e 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java @@ -66,7 +66,7 @@ * @author Francesco Capponi (fcapponi@linkedin.com) */ -public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper +public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper, AnnouncerStatusDelegate { public static final boolean DEFAULT_DARK_WARMUP_ENABLED = false; public static final int DEFAULT_DARK_WARMUP_DURATION = 0; @@ -705,6 +705,13 @@ public void onSuccess(None result) }; } + @Override + public String getWarmupCluster() + { + return _warmupClusterName; + } + + @Override public String getCluster() { return _cluster; @@ -720,6 +727,12 @@ public String getUri() return _uri.toString(); } + @Override + public URI getURI() + { + return _uri; + } + public void setUri(String uri) { _uri = URI.create(uri); @@ -816,11 +829,13 @@ public boolean isMarkUpFailed() return _markUpFailed; } + @Override public boolean isMarkUpIntentSent() { return _isMarkUpIntentSent.get(); } + @Override public boolean isDarkWarmupMarkUpIntentSent() { return _isDarkWarmupMarkUpIntentSent.get(); @@ -836,6 +851,11 @@ public int getWeightDecimalPlacesBreachedCount() return _weightDecimalPlacesBreachedCount.get(); } + public LoadBalancerServer.AnnounceMode getServerAnnounceMode() + { + return _server.getAnnounceMode(); + } + public void setEventEmitter(ServiceDiscoveryEventEmitter emitter) { _eventEmitter = emitter; } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperConnectionManager.java b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperConnectionManager.java index 59db8f72d1..026b814a79 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperConnectionManager.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperConnectionManager.java @@ -21,6 +21,8 @@ package com.linkedin.d2.balancer.servers; import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; @@ -74,6 +76,9 @@ public class ZooKeeperConnectionManager extends ConnectionManager private volatile ZooKeeperEphemeralStore _store; + // Additional watchers that want to watch the connection status + private final Set _zooKeeperConnectionWatchers = ConcurrentHashMap.newKeySet(); + public ZooKeeperConnectionManager(ZKPersistentConnection zkConnection, String zkBasePath, ZKStoreFactory> factory, @@ -292,6 +297,8 @@ public void notifyEvent(ZKPersistentConnection.Event event) { server.retry(Callbacks.empty()); } + + _zooKeeperConnectionWatchers.forEach(ZooKeeperConnectionWatcher::onConnected); } break; } @@ -302,6 +309,11 @@ public void notifyEvent(ZKPersistentConnection.Event event) } } + public interface ZooKeeperConnectionWatcher + { + void onConnected(); + } + /** * Store should only be started if two conditions are satisfied * 1. store is ready. store is ready when connection is established @@ -401,4 +413,9 @@ public String getZooKeeperBasePath() { return _zkBasePath; } + + public void addConnectionWatcher(ZooKeeperConnectionWatcher watcher) + { + _zooKeeperConnectionWatchers.add(watcher); + } } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperServer.java b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperServer.java index abfab3506b..225d67ee7d 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperServer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperServer.java @@ -60,6 +60,11 @@ public String getConnectString() { return _store.getConnectString(); } + @Override + public AnnounceMode getAnnounceMode() { + return AnnounceMode.STATIC_OLD_SR_ONLY; + } + @Override public void start(Callback callback) { @@ -367,7 +372,7 @@ protected UriProperties constructUriPropertiesForNode(final String clusterName, return new UriProperties(clusterName, partitionDesc, uriToUriSpecificProperties); } - private void storeGet(final String clusterName, final Callback callback) + protected void storeGet(final String clusterName, final Callback callback) { if (_store == null) { diff --git a/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmx.java b/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmx.java index 0fcbe33bfd..83efb1aa02 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmx.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmx.java @@ -206,4 +206,9 @@ public int getWeightDecimalPlacesBreachedCount() { return _announcer.getWeightDecimalPlacesBreachedCount(); } + + @Override + public int getServerAnnounceMode() { + return _announcer.getServerAnnounceMode().ordinal(); + } } diff --git a/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmxMXBean.java b/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmxMXBean.java index 70cd1c7165..dade7f51d5 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmxMXBean.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmxMXBean.java @@ -20,6 +20,7 @@ package com.linkedin.d2.jmx; +import com.linkedin.d2.balancer.LoadBalancerServer; import com.linkedin.d2.balancer.properties.PartitionData; import com.linkedin.d2.discovery.stores.PropertyStoreException; @@ -95,4 +96,9 @@ void setPartitionDataUsingJson(String partitionDataJson) * @return the times that the max number of decimal places on weight has been breached. */ int getWeightDecimalPlacesBreachedCount(); + + /** + * @return the server announce mode corresponding to {@link LoadBalancerServer#getAnnounceMode()} + */ + int getServerAnnounceMode(); } diff --git a/gradle.properties b/gradle.properties index 4372902d71..99272913b7 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.62.1 +version=29.63.0 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true From a0630560e373dd5af67e6796c862af1ba0df4d27 Mon Sep 17 00:00:00 2001 From: Bohan Yang Date: Thu, 21 Nov 2024 13:36:02 -0800 Subject: [PATCH 2/5] adjust log msg and comment --- .../linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java | 9 +++++---- .../linkedin/d2/balancer/servers/ZooKeeperServer.java | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java index bc494c259e..617bc53f68 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java @@ -862,10 +862,11 @@ public void setEventEmitter(ServiceDiscoveryEventEmitter emitter) { @Override public void emitSDStatusActiveUpdateIntentAndWriteEvents(String cluster, boolean isMarkUp, boolean succeeded, long startAt) { - // since SD event is sent in IndisAnnouncer for INDIS-write-only, inside ZookeeperAnnouncer, any calls to - // "emitSDStatusActiveUpdateIntentAndWriteEvents" should only happen when _server is an instance of - // ZooKeeperServer (which means it only emits the event when it's doing zk-only or dual write). - if (!(_server instanceof ZooKeeperServer)) + // In this class, SD event should be sent only when the announcing mode is to old service registry or dual write, + // so we can directly return when _server is NOT an instance of ZooKeeperServer or the announcement mode is dynamic + // new SR only. + if (!(_server instanceof ZooKeeperServer) + || _server.getAnnounceMode() == LoadBalancerServer.AnnounceMode.DYNAMIC_NEW_SR_ONLY) { return; } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperServer.java b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperServer.java index 225d67ee7d..a00331b1f3 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperServer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperServer.java @@ -194,7 +194,7 @@ else if (!uris.Uris().contains(uri)) } else { - warn(_log, _store, " marked down for cluster ", clusterName, "with uri: ", uri); + warn(_log, _store, " marked down for cluster ", clusterName, " with uri: ", uri); Map> partitionData = new HashMap<>(2); partitionData.put(uri, Collections.emptyMap()); _store.removePartial(clusterName, new UriProperties(clusterName, partitionData), callback); From 05a19317ec3fceca68ddb27b02a48c488fd96f7f Mon Sep 17 00:00:00 2001 From: Bohan Yang Date: Thu, 21 Nov 2024 13:41:47 -0800 Subject: [PATCH 3/5] adjust announce mode enum order --- .../java/com/linkedin/d2/balancer/LoadBalancerServer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java b/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java index 2853c92e34..6982eef555 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java @@ -81,11 +81,11 @@ void addUriSpecificProperty(String clusterName, enum AnnounceMode { STATIC_OLD_SR_ONLY, // statically only announce to old service registry + STATIC_NEW_SR_ONLY, // statically only announce to new service registry + STATIC_NEW_SR_ONLY_NO_WRITE_BACK, // statically only announce to new service registry without writing back to old service registry DYNAMIC_OLD_SR_ONLY, // dynamically only announce to old service registry DYNAMIC_DUAL_WRITE, // dynamically announce to both service registries DYNAMIC_NEW_SR_ONLY, // dynamically only announce to new service registry - DYNAMIC_FORCE_DUAL_WRITE, // Using dynamic server yet forced to announce to both service registries - STATIC_NEW_SR_ONLY, // statically only announce to new service registry - STATIC_NEW_SR_ONLY_NO_WRITE_BACK // statically only announce to new service registry without writing back to old service registry + DYNAMIC_FORCE_DUAL_WRITE // Using dynamic server yet forced to announce to both service registries } } From 8c2133c3ba9d58ee3ef5316ae913f8483f6f7f48 Mon Sep 17 00:00:00 2001 From: Bohan Yang Date: Thu, 21 Nov 2024 13:54:07 -0800 Subject: [PATCH 4/5] comment on the announcement mode enum instead of reordering it --- .../com/linkedin/d2/balancer/LoadBalancerServer.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java b/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java index 6982eef555..c329c9b98a 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java @@ -74,18 +74,19 @@ void addUriSpecificProperty(String clusterName, /** * Get announce mode of the server. Some server may have different announce mode, e.g. dual write mode, force announce - * mode. + * mode. NOTE the order of the enum shows the migration progress. The ordinal is used in JMX --- each number higher + * means one more step completed in the migration --- which can ease devs to know the status. */ AnnounceMode getAnnounceMode(); enum AnnounceMode { STATIC_OLD_SR_ONLY, // statically only announce to old service registry - STATIC_NEW_SR_ONLY, // statically only announce to new service registry - STATIC_NEW_SR_ONLY_NO_WRITE_BACK, // statically only announce to new service registry without writing back to old service registry DYNAMIC_OLD_SR_ONLY, // dynamically only announce to old service registry DYNAMIC_DUAL_WRITE, // dynamically announce to both service registries DYNAMIC_NEW_SR_ONLY, // dynamically only announce to new service registry - DYNAMIC_FORCE_DUAL_WRITE // Using dynamic server yet forced to announce to both service registries + DYNAMIC_FORCE_DUAL_WRITE, // Using dynamic server yet forced to announce to both service registries + STATIC_NEW_SR_ONLY, // statically only announce to new service registry + STATIC_NEW_SR_ONLY_NO_WRITE_BACK // statically only announce to new service registry without writing back to old service registry } } From fcba9b62ac1c0fd89eb3afafa33b4c7d5818c962 Mon Sep 17 00:00:00 2001 From: Bohan Yang Date: Thu, 21 Nov 2024 13:57:27 -0800 Subject: [PATCH 5/5] adjust comment --- .../java/com/linkedin/d2/balancer/LoadBalancerServer.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java b/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java index c329c9b98a..95c5079b7d 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java @@ -74,11 +74,15 @@ void addUriSpecificProperty(String clusterName, /** * Get announce mode of the server. Some server may have different announce mode, e.g. dual write mode, force announce - * mode. NOTE the order of the enum shows the migration progress. The ordinal is used in JMX --- each number higher - * means one more step completed in the migration --- which can ease devs to know the status. + * mode. */ AnnounceMode getAnnounceMode(); + /** + * NOTE the order in this enum shows the migration progress from an old service registry to a new one. + * The ordinal is used in JMX --- each number higher means one more step completed in the migration --- which can + * ease devs to know the status. + */ enum AnnounceMode { STATIC_OLD_SR_ONLY, // statically only announce to old service registry