Skip to content

Commit

Permalink
simplify symlink subscription in xds flow
Browse files Browse the repository at this point in the history
  • Loading branch information
bohhyang committed Oct 4, 2023
1 parent c5cff52 commit 7ef0bc7
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 26 deletions.
31 changes: 14 additions & 17 deletions d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -57,6 +56,7 @@ public class XdsToD2PropertiesAdaptor
private static final String D2_SERVICE_NODE_PREFIX = "/d2/services/";
private static final String D2_URI_NODE_PREFIX = "/d2/uris/";
private static final char SYMLINK_NODE_IDENTIFIER = '$';
private static final char PATH_SEPARATOR = '/';

private final XdsClient _xdsClient;
private final List<XdsConnectionListener> _xdsConnectionListeners;
Expand Down Expand Up @@ -313,20 +313,17 @@ XdsClient.D2SymlinkNodeResourceWatcher getSymlinkResourceWatcher(String symlinkN
@Override
public void onChanged(String resourceName, XdsClient.D2SymlinkNodeUpdate update)
{
// update maps between symlink name and actual node name, listen to the actual node
// Update maps between symlink name and actual node name
String actualResourceName = update.getNodeData().getMasterClusterNodePath();
if (resourceName.contains(D2_CLUSTER_NODE_PREFIX))
{
String actualNodeName = removeNodePathPrefix(actualResourceName, D2_CLUSTER_NODE_PREFIX);
updateSymlinkAndActualNodeMap(symlinkName, actualNodeName);
listenToCluster(actualNodeName);
}
else
{
String actualNodeName = removeNodePathPrefix(actualResourceName, D2_URI_NODE_PREFIX);
updateSymlinkAndActualNodeMap(symlinkName, actualNodeName);
listenToUris(actualNodeName);
}
String actualNodeName = getNodeName(actualResourceName);
updateSymlinkAndActualNodeMap(symlinkName, actualNodeName);
// listen to the actual nodes
// Note: since cluster symlink and uri parent symlink always point to the same actual node name, and it's a
// redundancy and a burden for the symlink-update tool to maintain two symlinks for the same actual node name,
// we optimize here to use the cluster symlink to listen to the actual nodes for both cluster
// and uri parent.
listenToCluster(actualNodeName);
listenToUris(actualNodeName);
}

@Override
Expand Down Expand Up @@ -355,16 +352,16 @@ private String getSymlink(String actualNodeName) {
}
}

private static String removeNodePathPrefix(String path, String prefix)
private static String getNodeName(String path)
{
int idx = path.indexOf(prefix);
int idx = path.lastIndexOf(PATH_SEPARATOR);
if (idx == -1)
{
return path;
}
else
{
return path.substring(idx + prefix.length());
return path.substring(idx + 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void testListenToService()
verify(fixture._xdsClient).watchXdsResource(eq("/d2/services/" + serviceName), eq(XdsClient.ResourceType.D2_NODE), any());

XdsClient.D2NodeResourceWatcher symlinkNodeWatcher =
(XdsClient.D2NodeResourceWatcher) fixture._watcherArgumentCaptor.getValue();
(XdsClient.D2NodeResourceWatcher) fixture._clusterWatcherArgumentCaptor.getValue();
symlinkNodeWatcher.onChanged(new XdsClient.D2NodeUpdate("", XdsD2.D2Node.newBuilder()
.setData(Struct.newBuilder().putAllFields(
ImmutableMap.of(
Expand Down Expand Up @@ -86,13 +86,14 @@ public void testListenToClusterSymlink() {
verify(fixture._xdsClient).watchXdsResource(eq(CLUSTER_SYMLINK_RESOURCE_NAME), eq(XdsClient.ResourceType.D2_SYMLINK_NODE), any());

XdsClient.D2SymlinkNodeResourceWatcher symlinkNodeWatcher =
(XdsClient.D2SymlinkNodeResourceWatcher) fixture._watcherArgumentCaptor.getValue();
(XdsClient.D2SymlinkNodeResourceWatcher) fixture._symlinkWatcherArgumentCaptor.getValue();
symlinkNodeWatcher.onChanged(CLUSTER_SYMLINK_RESOURCE_NAME, getSymlinkNodeUpdate(PRIMARY_CLUSTER_RESOURCE_NAME));

verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_CLUSTER_RESOURCE_NAME), eq(XdsClient.ResourceType.D2_NODE), any());
verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), eq(XdsClient.ResourceType.D2_NODE_MAP), any());

XdsClient.D2NodeResourceWatcher clusterNodeWatcher =
(XdsClient.D2NodeResourceWatcher) fixture._watcherArgumentCaptor.getValue();
(XdsClient.D2NodeResourceWatcher) fixture._clusterWatcherArgumentCaptor.getValue();
clusterNodeWatcher.onChanged(getClusterNodeUpdate(PRIMARY_CLUSTER_NAME));

verify(fixture._clusterEventBus).publishInitialize(SYMLINK_NAME, PRIMARY_CLUSTER_PROPERTIES);
Expand All @@ -105,13 +106,17 @@ public void testListenToClusterSymlink() {
symlinkNodeWatcher.onChanged(CLUSTER_SYMLINK_RESOURCE_NAME, getSymlinkNodeUpdate(primaryClusterResourceName2));

verify(fixture._xdsClient).watchXdsResource(eq(primaryClusterResourceName2), eq(XdsClient.ResourceType.D2_NODE), any());
verify(fixture._xdsClient).watchXdsResource(eq(URI_NODE_PREFIX + PRIMARY_CLUSTER_NAME_2),
eq(XdsClient.ResourceType.D2_NODE_MAP), any());
verifyClusterNodeUpdate(fixture, PRIMARY_CLUSTER_NAME_2, SYMLINK_NAME, primaryClusterProperties2);

// if the old primary cluster gets an update, it will be published under its original cluster name
// since the symlink points to the new primary cluster now.
clusterNodeWatcher.onChanged(getClusterNodeUpdate(PRIMARY_CLUSTER_NAME_2));

verify(fixture._clusterEventBus).publishInitialize(PRIMARY_CLUSTER_NAME, primaryClusterProperties2);
verify(fixture._clusterEventBus, times(1)) // verify symlink is published just once (from line 115)
.publishInitialize(SYMLINK_NAME, primaryClusterProperties2);
}

@Test
Expand All @@ -133,13 +138,13 @@ public void testListenToUriSymlink()
verify(fixture._xdsClient).watchXdsResource(eq(URI_SYMLINK_RESOURCE_NAME), eq(XdsClient.ResourceType.D2_SYMLINK_NODE), any());

XdsClient.D2SymlinkNodeResourceWatcher symlinkNodeWatcher =
(XdsClient.D2SymlinkNodeResourceWatcher) fixture._watcherArgumentCaptor.getValue();
(XdsClient.D2SymlinkNodeResourceWatcher) fixture._symlinkWatcherArgumentCaptor.getValue();
symlinkNodeWatcher.onChanged(URI_SYMLINK_RESOURCE_NAME, getSymlinkNodeUpdate(PRIMARY_URI_RESOURCE_NAME));

verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), eq(XdsClient.ResourceType.D2_NODE_MAP), any());

XdsClient.D2NodeMapResourceWatcher watcher =
(XdsClient.D2NodeMapResourceWatcher) fixture._watcherArgumentCaptor.getValue();
(XdsClient.D2NodeMapResourceWatcher) fixture._uriWatcherArgumentCaptor.getValue();
watcher.onChanged(DUMMY_NODE_MAP_UPDATE);

UriProperties uriProps = getDefaultUriProperties(PRIMARY_CLUSTER_NAME);
Expand Down Expand Up @@ -186,7 +191,8 @@ private static XdsClient.D2NodeUpdate getClusterNodeUpdate(String clusterName)
private void verifyClusterNodeUpdate(XdsToD2PropertiesAdaptorFixture fixture, String clusterName, String symlinkName,
ClusterStoreProperties expectedPublishProp)
{
XdsClient.D2NodeResourceWatcher watcher = (XdsClient.D2NodeResourceWatcher) fixture._watcherArgumentCaptor.getValue();
XdsClient.D2NodeResourceWatcher watcher = (XdsClient.D2NodeResourceWatcher)
fixture._clusterWatcherArgumentCaptor.getValue();
watcher.onChanged(getClusterNodeUpdate(clusterName));
verify(fixture._clusterEventBus).publishInitialize(clusterName, expectedPublishProp);
if (symlinkName != null)
Expand All @@ -197,7 +203,8 @@ private void verifyClusterNodeUpdate(XdsToD2PropertiesAdaptorFixture fixture, St

private void verifyUriUpdate(XdsToD2PropertiesAdaptorFixture fixture, String clusterName, String symlinkName)
{
XdsClient.D2NodeMapResourceWatcher watcher = (XdsClient.D2NodeMapResourceWatcher) fixture._watcherArgumentCaptor.getValue();
XdsClient.D2NodeMapResourceWatcher watcher = (XdsClient.D2NodeMapResourceWatcher)
fixture._uriWatcherArgumentCaptor.getValue();
watcher.onChanged(DUMMY_NODE_MAP_UPDATE);
UriProperties uriProps = getDefaultUriProperties(clusterName);
verify(fixture._uriEventBus).publishInitialize(clusterName, uriProps);
Expand Down Expand Up @@ -225,14 +232,23 @@ private static class XdsToD2PropertiesAdaptorFixture
@Mock
PropertyEventBus<UriProperties> _uriEventBus;
@Captor
ArgumentCaptor<XdsClient.ResourceWatcher> _watcherArgumentCaptor;
ArgumentCaptor<XdsClient.ResourceWatcher> _symlinkWatcherArgumentCaptor;
@Captor
ArgumentCaptor<XdsClient.ResourceWatcher> _clusterWatcherArgumentCaptor;
@Captor
ArgumentCaptor<XdsClient.ResourceWatcher> _uriWatcherArgumentCaptor;

XdsToD2PropertiesAdaptor _adaptor;

XdsToD2PropertiesAdaptorFixture()
{
MockitoAnnotations.initMocks(this);
doNothing().when(_xdsClient).watchXdsResource(any(), any(), _watcherArgumentCaptor.capture());
doNothing().when(_xdsClient).watchXdsResource(any(), eq(XdsClient.ResourceType.D2_SYMLINK_NODE),
_symlinkWatcherArgumentCaptor.capture());
doNothing().when(_xdsClient).watchXdsResource(any(), eq(XdsClient.ResourceType.D2_NODE),
_clusterWatcherArgumentCaptor.capture());
doNothing().when(_xdsClient).watchXdsResource(any(), eq(XdsClient.ResourceType.D2_NODE_MAP),
_uriWatcherArgumentCaptor.capture());
doNothing().when(_clusterEventBus).publishInitialize(any(), any());
doNothing().when(_serviceEventBus).publishInitialize(any(), any());
doNothing().when(_uriEventBus).publishInitialize(any(), any());
Expand Down

0 comments on commit 7ef0bc7

Please sign in to comment.