From e2f6fa79b03a04da1c4cd2b3feb4e18f507db6e0 Mon Sep 17 00:00:00 2001 From: Paul Chesnais Date: Wed, 4 Sep 2024 17:44:00 -0400 Subject: [PATCH] Respect glob collection subscriptions on reconnect (#1011) * Respect glob collection subscriptions on reconnect The RetryTask did not resubscribe to glob collections after a reconnect. Instead it goes back to map subscriptions. This change honors the glob collection config in the retry task. * Update CHANGELOG * Bump version * Fix type used during resubscribe --- CHANGELOG.md | 7 ++++++- .../com/linkedin/d2/xds/XdsClientImpl.java | 20 +++++++++++++------ gradle.properties | 2 +- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a094c64db1..2120d7f9d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,11 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.58.5] - 2024-09-04 +- Respect glob collection subscriptions on reconnect + ## [29.58.4] - 2024-09-03 +- Respect `startPublishing` call by always re-notifying watcher in XdsClientImpl ## [29.58.3] - 2024-08-12 - Disable the warmUp flaky unit test @@ -5721,7 +5725,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.4...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.5...master +[29.58.5]: https://github.com/linkedin/rest.li/compare/v29.58.4...v29.58.5 [29.58.4]: https://github.com/linkedin/rest.li/compare/v29.58.3...v29.58.4 [29.58.3]: https://github.com/linkedin/rest.li/compare/v29.58.2...v29.58.3 [29.58.2]: https://github.com/linkedin/rest.li/compare/v29.58.1...v29.58.2 diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index e58c6fc711..b5ee6b0126 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -695,11 +695,19 @@ final class RpcRetryTask implements Runnable { public void run() { startRpcStreamLocal(); for (ResourceType type : ResourceType.values()) { - Map subscriberMap = getResourceSubscriberMap(type); - Collection resources = subscriberMap.isEmpty() ? null : subscriberMap.keySet(); - if (resources != null) { - _adsStream.sendDiscoveryRequest(type, resources); + Collection resources = getResourceSubscriberMap(type).keySet(); + if (resources.isEmpty()) + { + continue; + } + if (_subscribeToUriGlobCollection && type == ResourceType.D2_URI_MAP) + { + resources = resources.stream() + .map(GlobCollectionUtils::globCollectionUrlForClusterResource) + .collect(Collectors.toSet()); + type = ResourceType.D2_URI; } + _adsStream.sendDiscoveryRequest(type, resources); } } } @@ -936,8 +944,8 @@ public void onCompleted() private void sendDiscoveryRequest(ResourceType type, Collection resources) { _log.info("Sending {} request for resources: {}", type, resources); - DiscoveryRequestData request = new DiscoveryRequestData(_node, type, resources); - _requestWriter.onNext(request.toEnvoyProto()); + DeltaDiscoveryRequest request = new DiscoveryRequestData(_node, type, resources).toEnvoyProto(); + _requestWriter.onNext(request); _log.debug("Sent DiscoveryRequest\n{}", request); } diff --git a/gradle.properties b/gradle.properties index 8782188e30..ba684a867a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.58.4 +version=29.58.5 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true