Skip to content

Commit

Permalink
Respect startPublishing call by always re-notifying watcher in XdsC…
Browse files Browse the repository at this point in the history
…lientImpl (#1018)

* Respect `startPublishing` call by always re-notifying watcher in XdsClientImpl

When switching from the backup to the primary store (implemented in the
XdsClientImpl), the data in the primary store is never replayed. This is
different from the backup store behavior which respects the invocation of
startPublishing and replays the contents of the store. This means that if the
contents of the backup store are different from the contents of the primary
store, and the client switches from the backup to the primary, the client will
only see the backup values and not the primary values.

* Address comments

* Updated CHANGELOG
  • Loading branch information
PapaCharlie authored Sep 4, 2024
1 parent c67ff74 commit 8ad594d
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 54 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.58.4] - 2024-09-03

## [29.58.3] - 2024-08-12
- Disable the warmUp flaky unit test

Expand Down Expand Up @@ -5719,7 +5721,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.3...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.4...master
[29.58.4]: https://github.com/linkedin/rest.li/compare/v29.58.3...v29.58.4
[29.58.3]: https://github.com/linkedin/rest.li/compare/v29.58.2...v29.58.3
[29.58.2]: https://github.com/linkedin/rest.li/compare/v29.58.1...v29.58.2
[29.58.1]: https://github.com/linkedin/rest.li/compare/v29.58.0...v29.58.1
Expand Down
6 changes: 6 additions & 0 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,12 @@ static ResourceType fromTypeUrl(String typeUrl)
}
}

/**
* Subscribes the given {@link ResourceWatcher} to the resource of the given name. The watcher will be notified when
* the resource is received from the backend. Repeated calls to this function with the same resource name and watcher
* will always notify the given watcher of the current data if it is already present, even if the given watcher was
* already subscribed to said resource. However, the subscription will only be added once.
*/
abstract void watchXdsResource(String resourceName, ResourceWatcher watcher);

abstract void startRpcStream();
Expand Down
6 changes: 1 addition & 5 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -554,15 +554,11 @@ public void setData(@Nullable ResourceUpdate data)

void addWatcher(ResourceWatcher watcher)
{
if (_watchers.contains(watcher))
{
_log.warn("Watcher {} already registered", watcher);
return;
}
_watchers.add(watcher);
if (_data != null)
{
watcher.onChanged(_data);
_log.debug("Notifying watcher of current data for resource {} of type {}: {}", _resource, _type, _data);
}
}

Expand Down
38 changes: 13 additions & 25 deletions d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,9 @@ public void listenToCluster(String clusterName)
}
else
{
_watchedClusterResources.computeIfAbsent(clusterName, k ->
{
XdsClient.NodeResourceWatcher watcher = getClusterResourceWatcher(clusterName);
_xdsClient.watchXdsResource(resourceName, watcher);
return watcher;
});
XdsClient.ResourceWatcher watcher =
_watchedClusterResources.computeIfAbsent(clusterName, this::getClusterResourceWatcher);
_xdsClient.watchXdsResource(resourceName, watcher);
}
}

Expand All @@ -169,36 +166,27 @@ public void listenToUris(String clusterName)
}
else
{
_watchedUriResources.computeIfAbsent(clusterName, k ->
{
XdsClient.D2URIMapResourceWatcher watcher = getUriResourceWatcher(clusterName);
_xdsClient.watchXdsResource(resourceName, watcher);
return watcher;
});
XdsClient.ResourceWatcher watcher =
_watchedUriResources.computeIfAbsent(clusterName, this::getUriResourceWatcher);
_xdsClient.watchXdsResource(resourceName, watcher);
}
}

public void listenToService(String serviceName)
{
_watchedServiceResources.computeIfAbsent(serviceName, k ->
{
XdsClient.NodeResourceWatcher watcher = getServiceResourceWatcher(serviceName);
_xdsClient.watchXdsResource(D2_SERVICE_NODE_PREFIX + serviceName, watcher);
return watcher;
});
XdsClient.ResourceWatcher watcher =
_watchedServiceResources.computeIfAbsent(serviceName, this::getServiceResourceWatcher);
_xdsClient.watchXdsResource(D2_SERVICE_NODE_PREFIX + serviceName, watcher);
}

private void listenToSymlink(String name, String fullResourceName)
{
// use full resource name ("/d2/clusters/$FooClusterMater", "/d2/uris/$FooClusterMaster") as the key
// instead of just the symlink name ("$FooClusterMaster") to differentiate clusters and uris symlink resources.
_watchedSymlinkResources.computeIfAbsent(fullResourceName, k ->
{
// use symlink name "$FooClusterMaster" to create the watcher
XdsClient.NodeResourceWatcher watcher = getSymlinkResourceWatcher(fullResourceName, name);
_xdsClient.watchXdsResource(k, watcher);
return watcher;
});
XdsClient.ResourceWatcher watcher =
_watchedSymlinkResources.computeIfAbsent(fullResourceName, k -> getSymlinkResourceWatcher(k, name));
// use symlink name "$FooClusterMaster" to create the watcher
_xdsClient.watchXdsResource(fullResourceName, watcher);
}

XdsClient.NodeResourceWatcher getServiceResourceWatcher(String serviceName)
Expand Down
18 changes: 18 additions & 0 deletions d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Map;
import java.util.Objects;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -442,6 +443,23 @@ public void testHandleD2URICollectionResponseWithRemoval()
Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap());
}

@Test
public void testResourceSubscriberAddWatcher()
{
ResourceSubscriber subscriber = new ResourceSubscriber(NODE, "foo", null);
XdsClient.ResourceWatcher watcher = Mockito.mock(XdsClient.ResourceWatcher.class);
subscriber.addWatcher(watcher);
verify(watcher, times(0)).onChanged(any());

D2URIMapUpdate update = new D2URIMapUpdate(Collections.emptyMap());
subscriber.setData(update);
for (int i = 0; i < 10; i++)
{
subscriber.addWatcher(watcher);
}
verify(watcher, times(10)).onChanged(eq(update));
}

private static class XdsClientImplFixture
{
XdsClientImpl _xdsClientImpl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static com.linkedin.d2.balancer.properties.PropertyKeys.*;
import static com.linkedin.d2.balancer.properties.PropertyKeys.ALLOWED_CLIENT_OVERRIDE_KEYS;
import static com.linkedin.d2.balancer.properties.PropertyKeys.HTTP_REQUEST_TIMEOUT;
import static org.mockito.Mockito.*;


Expand Down Expand Up @@ -112,10 +113,13 @@ public void testListenToService(Map<String, Object> clientOverride, Map<String,
{
XdsToD2PropertiesAdaptorFixture fixture = new XdsToD2PropertiesAdaptorFixture();
String serviceName = "FooService";
fixture.getSpiedAdaptor(Collections.singletonMap(serviceName, clientOverride))
.listenToService(serviceName);
for (int i = 0; i < 10; i++)
{
fixture.getSpiedAdaptor(Collections.singletonMap(serviceName, clientOverride))
.listenToService(serviceName);
}

verify(fixture._xdsClient).watchXdsResource(eq("/d2/services/" + serviceName), anyNodeWatcher());
verify(fixture._xdsClient, times(10)).watchXdsResource(eq("/d2/services/" + serviceName), anyNodeWatcher());

NodeResourceWatcher symlinkNodeWatcher = fixture._nodeWatcher;
symlinkNodeWatcher.onChanged(new XdsClient.NodeUpdate(XdsD2.Node.newBuilder()
Expand Down Expand Up @@ -150,28 +154,37 @@ public void testListenToService(Map<String, Object> clientOverride, Map<String,
public void testListenToNormalCluster()
{
XdsToD2PropertiesAdaptorFixture fixture = new XdsToD2PropertiesAdaptorFixture();
fixture.getSpiedAdaptor().listenToCluster(PRIMARY_CLUSTER_NAME);
for (int i = 0; i < 10; i++)
{
fixture.getSpiedAdaptor().listenToCluster(PRIMARY_CLUSTER_NAME);
}

verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_CLUSTER_RESOURCE_NAME), anyNodeWatcher());
verify(fixture._xdsClient, times(10)).watchXdsResource(eq(PRIMARY_CLUSTER_RESOURCE_NAME), anyNodeWatcher());
verifyClusterNodeUpdate(fixture, PRIMARY_CLUSTER_NAME, null, PRIMARY_CLUSTER_PROPERTIES);
}

@Test
public void testListenToClusterSymlink()
{
XdsToD2PropertiesAdaptorFixture fixture = new XdsToD2PropertiesAdaptorFixture();
fixture.getSpiedAdaptor().listenToCluster(SYMLINK_NAME);
for (int i = 0; i < 10; i++)
{
fixture.getSpiedAdaptor().listenToCluster(SYMLINK_NAME);
}

// verify symlink is watched
verify(fixture._xdsClient).watchXdsResource(eq(CLUSTER_SYMLINK_RESOURCE_NAME), anyNodeWatcher());
verify(fixture._xdsClient, times(10)).watchXdsResource(eq(CLUSTER_SYMLINK_RESOURCE_NAME), anyNodeWatcher());

// update symlink data
NodeResourceWatcher symlinkNodeWatcher = fixture._nodeWatcher;
fixture._nodeWatcher.onChanged(getSymlinkNodeUpdate(PRIMARY_CLUSTER_RESOURCE_NAME));
for (int i = 0; i < 10; i++)
{
symlinkNodeWatcher.onChanged(getSymlinkNodeUpdate(PRIMARY_CLUSTER_RESOURCE_NAME));
}

// verify both cluster and uri data of the actual cluster is watched
verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_CLUSTER_RESOURCE_NAME), anyNodeWatcher());
verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), anyNodeWatcher());
verify(fixture._xdsClient, times(10)).watchXdsResource(eq(PRIMARY_CLUSTER_RESOURCE_NAME), anyNodeWatcher());
verify(fixture._xdsClient, times(10)).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), anyNodeWatcher());

// update cluster data
NodeResourceWatcher clusterNodeWatcher = fixture._nodeWatcher;
Expand All @@ -185,10 +198,13 @@ public void testListenToClusterSymlink()
String primaryClusterResourceName2 = CLUSTER_NODE_PREFIX + PRIMARY_CLUSTER_NAME_2;
ClusterStoreProperties primaryClusterProperties2 = new ClusterStoreProperties(PRIMARY_CLUSTER_NAME_2);

symlinkNodeWatcher.onChanged(getSymlinkNodeUpdate(primaryClusterResourceName2));
for (int i = 0; i < 10; i++)
{
symlinkNodeWatcher.onChanged(getSymlinkNodeUpdate(primaryClusterResourceName2));
}

verify(fixture._xdsClient).watchXdsResource(eq(primaryClusterResourceName2), anyNodeWatcher());
verify(fixture._xdsClient).watchXdsResource(eq(URI_NODE_PREFIX + PRIMARY_CLUSTER_NAME_2), anyMapWatcher());
verify(fixture._xdsClient, times(10)).watchXdsResource(eq(primaryClusterResourceName2), anyNodeWatcher());
verify(fixture._xdsClient, times(10)).watchXdsResource(eq(URI_NODE_PREFIX + PRIMARY_CLUSTER_NAME_2), anyMapWatcher());
verifyClusterNodeUpdate(fixture, PRIMARY_CLUSTER_NAME_2, SYMLINK_NAME, primaryClusterProperties2);

// if the old primary cluster gets an update, it will be published under its original cluster name
Expand All @@ -204,9 +220,12 @@ public void testListenToClusterSymlink()
public void testListenToNormalUri() throws PropertySerializationException
{
XdsToD2PropertiesAdaptorFixture fixture = new XdsToD2PropertiesAdaptorFixture();
fixture.getSpiedAdaptor().listenToUris(PRIMARY_CLUSTER_NAME);
for (int i = 0; i < 10; i++)
{
fixture.getSpiedAdaptor().listenToUris(PRIMARY_CLUSTER_NAME);
}

verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), anyMapWatcher());
verify(fixture._xdsClient, times(10)).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), anyMapWatcher());
XdsD2.D2URI protoUri = getD2URI(PRIMARY_CLUSTER_NAME, URI_NAME, VERSION);
Map<String, XdsD2.D2URI> uriMap = new HashMap<>(Collections.singletonMap(URI_NAME, protoUri));
fixture._uriMapWatcher.onChanged(new XdsClient.D2URIMapUpdate(uriMap));
Expand Down Expand Up @@ -247,17 +266,23 @@ public void testListenToNormalUri() throws PropertySerializationException
public void testListenToUriSymlink() throws PropertySerializationException
{
XdsToD2PropertiesAdaptorFixture fixture = new XdsToD2PropertiesAdaptorFixture();
fixture.getSpiedAdaptor().listenToUris(SYMLINK_NAME);
for (int i = 0; i < 10; i++)
{
fixture.getSpiedAdaptor().listenToUris(SYMLINK_NAME);
}

// verify symlink is watched
verify(fixture._xdsClient).watchXdsResource(eq(URI_SYMLINK_RESOURCE_NAME), anyNodeWatcher());
verify(fixture._xdsClient, times(10)).watchXdsResource(eq(URI_SYMLINK_RESOURCE_NAME), anyNodeWatcher());

// update symlink data
NodeResourceWatcher symlinkNodeWatcher = fixture._nodeWatcher;
symlinkNodeWatcher.onChanged(getSymlinkNodeUpdate(PRIMARY_URI_RESOURCE_NAME));
for (int i = 0; i < 10; i++)
{
symlinkNodeWatcher.onChanged(getSymlinkNodeUpdate(PRIMARY_URI_RESOURCE_NAME));
}

// verify actual cluster of the uris is watched
verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), anyMapWatcher());
verify(fixture._xdsClient, times(10)).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), anyMapWatcher());

// update uri data
D2URIMapResourceWatcher watcher = fixture._uriMapWatcher;
Expand All @@ -269,9 +294,12 @@ public void testListenToUriSymlink() throws PropertySerializationException

// test update symlink to a new primary cluster
String primaryUriResourceName2 = URI_NODE_PREFIX + PRIMARY_CLUSTER_NAME_2;
symlinkNodeWatcher.onChanged(getSymlinkNodeUpdate(primaryUriResourceName2));
for (int i = 0; i < 10; i++)
{
symlinkNodeWatcher.onChanged(getSymlinkNodeUpdate(primaryUriResourceName2));
}

verify(fixture._xdsClient).watchXdsResource(eq(primaryUriResourceName2), anyMapWatcher());
verify(fixture._xdsClient, times(10)).watchXdsResource(eq(primaryUriResourceName2), anyMapWatcher());
verifyUriUpdate(fixture, PRIMARY_CLUSTER_NAME_2, SYMLINK_NAME);

// if the old primary cluster gets an update, it will be published under its original cluster name
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.58.3
version=29.58.4
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down

0 comments on commit 8ad594d

Please sign in to comment.