From 154e40d84aa4590800f70842ab60cb5f8e8dbf2a Mon Sep 17 00:00:00 2001 From: PapaCharlie Date: Thu, 4 Jan 2024 18:35:52 -0500 Subject: [PATCH] Respect xDS deletions --- .../java/com/linkedin/d2/xds/XdsClient.java | 6 +++ .../com/linkedin/d2/xds/XdsClientImpl.java | 11 ++++- .../d2/xds/XdsToD2PropertiesAdaptor.java | 46 +++++++++++++++++-- 3 files changed, 56 insertions(+), 7 deletions(-) diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java index 5c1d1e2c28..32a160cd3c 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java @@ -43,16 +43,22 @@ interface ResourceWatcher interface NodeResourceWatcher extends ResourceWatcher { void onChanged(NodeUpdate update); + + void onDelete(String resourceName); } interface SymlinkNodeResourceWatcher extends ResourceWatcher { void onChanged(String resourceName, NodeUpdate update); + + void onDelete(String resourceName); } interface D2URIMapResourceWatcher extends ResourceWatcher { void onChanged(D2URIMapUpdate update); + + void onDelete(String resourceName); } interface ResourceUpdate diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index 59765a311e..25fe996be9 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -443,19 +443,21 @@ private static final class DiscoveryResponseData { private final ResourceType _resourceType; private final List _resources; + private final List _removedResources; private final String _nonce; - DiscoveryResponseData(ResourceType resourceType, List resources, String nonce) + DiscoveryResponseData(ResourceType resourceType, List resources, List removedResources, String nonce) { _resourceType = resourceType; _resources = resources; + _removedResources = removedResources; _nonce = nonce; } static DiscoveryResponseData fromEnvoyProto(DeltaDiscoveryResponse proto) { return new DiscoveryResponseData(ResourceType.fromTypeUrl(proto.getTypeUrl()), proto.getResourcesList(), - proto.getNonce()); + proto.getRemovedResourcesList(), proto.getNonce()); } ResourceType getResourceType() @@ -468,6 +470,11 @@ List getResourcesList() return _resources; } + List getRemovedResources() + { + return _removedResources; + } + String getNonce() { return _nonce; diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java b/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java index ccf87b4078..dd57acc013 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java @@ -43,7 +43,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -225,6 +224,12 @@ public void onChanged(XdsClient.NodeUpdate update) } } + @Override + public void onDelete(String name) + { + _serviceEventBus.publishRemove(name); + } + @Override public void onError(Status error) { @@ -286,6 +291,12 @@ private void publishClusterData(String clusterName, ClusterProperties properties } } + @Override + public void onDelete(String name) + { + _clusterEventBus.publishRemove(name); + } + @Override public void onError(Status error) { @@ -325,6 +336,13 @@ public void onChanged(String resourceName, XdsClient.NodeUpdate update) listenToUris(actualNodeName); } + @Override + public void onDelete(String resourceName) + { + // TODO: Is this all we need to do here? + removeSymlink(symlinkName); + } + @Override public void onError(Status error) { @@ -339,14 +357,26 @@ public void onReconnect() }; } - private void updateSymlinkAndActualNodeMap(String symlinkName, String actualNodeName) { - synchronized (_symlinkAndActualNodeLock) { + private void updateSymlinkAndActualNodeMap(String symlinkName, String actualNodeName) + { + synchronized (_symlinkAndActualNodeLock) + { _symlinkAndActualNode.put(symlinkName, actualNodeName); } } - private String getSymlink(String actualNodeName) { - synchronized (_symlinkAndActualNodeLock) { + private String removeSymlink(String symlinkName) + { + synchronized (_symlinkAndActualNodeLock) + { + return _symlinkAndActualNode.remove(symlinkName); + } + } + + private String getSymlink(String actualNodeName) + { + synchronized (_symlinkAndActualNodeLock) + { return _symlinkAndActualNode.inverse().get(actualNodeName); } } @@ -486,6 +516,12 @@ private void mergeAndPublishUris(String clusterName) } } + @Override + public void onDelete(String resourceName) + { + _uriEventBus.publishRemove(resourceName); + } + @Override public void onError(Status error) {