Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add announcer status delegate interface and support getting server announce mode #1035

Merged
merged 5 commits into from
Nov 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.63.0] - 2024-11-06
- Add announcer status delegate interface

## [29.62.1] - 2024-11-05
- Enhancements in ByteString and its ByteIterator to reduce object allocation

Expand Down Expand Up @@ -5758,7 +5761,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.62.1...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.63.0...master
[29.63.0]: https://github.com/linkedin/rest.li/compare/v29.62.1...v29.63.0
[29.62.1]: https://github.com/linkedin/rest.li/compare/v29.62.0...v29.62.1
[29.62.0]: https://github.com/linkedin/rest.li/compare/v29.61.0...v29.62.0
[29.61.0]: https://github.com/linkedin/rest.li/compare/v29.60.0...v29.61.0
Expand Down
17 changes: 17 additions & 0 deletions d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,21 @@ void addUriSpecificProperty(String clusterName,
void shutdown(Callback<None> callback);

String getConnectString();

/**
* Get announce mode of the server. Some server may have different announce mode, e.g. dual write mode, force announce
* mode.
*/
AnnounceMode getAnnounceMode();

enum AnnounceMode
{
bohhyang marked this conversation as resolved.
Show resolved Hide resolved
STATIC_OLD_SR_ONLY, // statically only announce to old service registry
bohhyang marked this conversation as resolved.
Show resolved Hide resolved
DYNAMIC_OLD_SR_ONLY, // dynamically only announce to old service registry
DYNAMIC_DUAL_WRITE, // dynamically announce to both service registries
DYNAMIC_NEW_SR_ONLY, // dynamically only announce to new service registry
DYNAMIC_FORCE_DUAL_WRITE, // Using dynamic server yet forced to announce to both service registries
bohhyang marked this conversation as resolved.
Show resolved Hide resolved
STATIC_NEW_SR_ONLY, // statically only announce to new service registry
STATIC_NEW_SR_ONLY_NO_WRITE_BACK // statically only announce to new service registry without writing back to old service registry
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.linkedin.d2.balancer.servers;

import java.net.URI;


public interface AnnouncerStatusDelegate
{
/**
* @return true if the markup intent has been sent.
*/
boolean isMarkUpIntentSent();

/**
* @return true if the dark warmup mark up intent has been sent.
*/
boolean isDarkWarmupMarkUpIntentSent();

/**
* @return the name of the regular cluster that the announcer manages.
*/
String getCluster();

/**
* @return the name of the warmup cluster that the announcer manages.
*/
String getWarmupCluster();

/**
* @return the uri that the announcer manages.
*/
URI getURI();
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
* @author Francesco Capponi ([email protected])
*/

public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper
public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper, AnnouncerStatusDelegate
{
public static final boolean DEFAULT_DARK_WARMUP_ENABLED = false;
public static final int DEFAULT_DARK_WARMUP_DURATION = 0;
Expand Down Expand Up @@ -705,6 +705,13 @@ public void onSuccess(None result)
};
}

@Override
public String getWarmupCluster()
{
return _warmupClusterName;
}

@Override
public String getCluster()
{
return _cluster;
Expand All @@ -720,6 +727,12 @@ public String getUri()
return _uri.toString();
}

@Override
public URI getURI()
{
return _uri;
}

public void setUri(String uri)
{
_uri = URI.create(uri);
Expand Down Expand Up @@ -816,11 +829,13 @@ public boolean isMarkUpFailed()
return _markUpFailed;
}

@Override
public boolean isMarkUpIntentSent()
{
return _isMarkUpIntentSent.get();
}

@Override
public boolean isDarkWarmupMarkUpIntentSent()
{
return _isDarkWarmupMarkUpIntentSent.get();
Expand All @@ -836,6 +851,11 @@ public int getWeightDecimalPlacesBreachedCount()
return _weightDecimalPlacesBreachedCount.get();
}

public LoadBalancerServer.AnnounceMode getServerAnnounceMode()
{
return _server.getAnnounceMode();
}

public void setEventEmitter(ServiceDiscoveryEventEmitter emitter) {
_eventEmitter = emitter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package com.linkedin.d2.balancer.servers;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -74,6 +76,9 @@ public class ZooKeeperConnectionManager extends ConnectionManager

private volatile ZooKeeperEphemeralStore<UriProperties> _store;

// Additional watchers that want to watch the connection status
private final Set<ZooKeeperConnectionWatcher> _zooKeeperConnectionWatchers = ConcurrentHashMap.newKeySet();

public ZooKeeperConnectionManager(ZKPersistentConnection zkConnection,
String zkBasePath,
ZKStoreFactory<UriProperties,ZooKeeperEphemeralStore<UriProperties>> factory,
Expand Down Expand Up @@ -292,6 +297,8 @@ public void notifyEvent(ZKPersistentConnection.Event event)
{
server.retry(Callbacks.empty());
}

_zooKeeperConnectionWatchers.forEach(ZooKeeperConnectionWatcher::onConnected);
}
break;
}
Expand All @@ -302,6 +309,11 @@ public void notifyEvent(ZKPersistentConnection.Event event)
}
}

public interface ZooKeeperConnectionWatcher
{
void onConnected();
}

bohhyang marked this conversation as resolved.
Show resolved Hide resolved
/**
* Store should only be started if two conditions are satisfied
* 1. store is ready. store is ready when connection is established
Expand Down Expand Up @@ -401,4 +413,9 @@ public String getZooKeeperBasePath()
{
return _zkBasePath;
}

public void addConnectionWatcher(ZooKeeperConnectionWatcher watcher)
{
_zooKeeperConnectionWatchers.add(watcher);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public String getConnectString() {
return _store.getConnectString();
}

@Override
public AnnounceMode getAnnounceMode() {
return AnnounceMode.STATIC_OLD_SR_ONLY;
brycezhongqing marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void start(Callback<None> callback)
{
Expand Down Expand Up @@ -367,7 +372,7 @@ protected UriProperties constructUriPropertiesForNode(final String clusterName,
return new UriProperties(clusterName, partitionDesc, uriToUriSpecificProperties);
}

private void storeGet(final String clusterName, final Callback<UriProperties> callback)
protected void storeGet(final String clusterName, final Callback<UriProperties> callback)
{
if (_store == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,9 @@ public int getWeightDecimalPlacesBreachedCount()
{
return _announcer.getWeightDecimalPlacesBreachedCount();
}

@Override
public int getServerAnnounceMode() {
return _announcer.getServerAnnounceMode().ordinal();
bohhyang marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package com.linkedin.d2.jmx;

import com.linkedin.d2.balancer.LoadBalancerServer;
import com.linkedin.d2.balancer.properties.PartitionData;
import com.linkedin.d2.discovery.stores.PropertyStoreException;

Expand Down Expand Up @@ -95,4 +96,9 @@ void setPartitionDataUsingJson(String partitionDataJson)
* @return the times that the max number of decimal places on weight has been breached.
*/
int getWeightDecimalPlacesBreachedCount();

/**
* @return the server announce mode corresponding to {@link LoadBalancerServer#getAnnounceMode()}
*/
int getServerAnnounceMode();
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.62.1
version=29.63.0
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down
Loading