Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed race conditions and bugs in xds client #941

Merged
merged 3 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.46.7] - 2023-10-10
- fix xDS client bugs and race conditions

## [29.46.6] - 2023-10-04
- simplify symlink subscription in xds flow

Expand Down Expand Up @@ -5548,7 +5551,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.46.6...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.46.7...master
[29.46.7]: https://github.com/linkedin/rest.li/compare/v29.46.6...v29.46.7
[29.46.6]: https://github.com/linkedin/rest.li/compare/v29.46.5...v29.46.6
[29.46.5]: https://github.com/linkedin/rest.li/compare/v29.45.1...v29.45.2
[29.46.4]: https://github.com/linkedin/rest.li/compare/v29.46.3...v29.46.4
Expand Down
17 changes: 16 additions & 1 deletion d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ public D2Client build()
_config.failoutConfigProviderFactory,
_config.failoutRedirectStrategy,
_config.serviceDiscoveryEventEmitter,
_config.dualReadStateManager
_config.dualReadStateManager,
_config.xdsExecutorService,
_config.xdsStreamReadyTimeout
);

final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ?
Expand Down Expand Up @@ -641,6 +643,19 @@ public D2ClientBuilder setDualReadStateManager(DualReadStateManager dualReadStat
return this;
}

/**
* Single-threaded executor service for xDS communication.
*/
public D2ClientBuilder setXdsExecutorService(ScheduledExecutorService xdsExecutorService) {
_config.xdsExecutorService = xdsExecutorService;
return this;
}

public D2ClientBuilder setXdsStreamReadyTimeout(long xdsStreamReadyTimeout) {
_config.xdsStreamReadyTimeout = xdsStreamReadyTimeout;
return this;
}

private Map<String, TransportClientFactory> createDefaultTransportClientFactories()
{
final Map<String, TransportClientFactory> clientFactories = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ public class D2ClientConfig
public ServiceDiscoveryEventEmitter serviceDiscoveryEventEmitter = new LogOnlyServiceDiscoveryEventEmitter(); // default to use log-only emitter
public DualReadStateManager dualReadStateManager = null;

public ScheduledExecutorService xdsExecutorService = null;
public Long xdsStreamReadyTimeout = null;

public D2ClientConfig()
{
}
Expand Down Expand Up @@ -190,7 +193,9 @@ public D2ClientConfig()
FailoutConfigProviderFactory failoutConfigProviderFactory,
FailoutRedirectStrategy failoutRedirectStrategy,
ServiceDiscoveryEventEmitter serviceDiscoveryEventEmitter,
DualReadStateManager dualReadStateManager)
DualReadStateManager dualReadStateManager,
ScheduledExecutorService xdsExecutorService,
Long xdsStreamReadyTimeout)
{
this.zkHosts = zkHosts;
this.xdsServer = xdsServer;
Expand Down Expand Up @@ -254,5 +259,7 @@ public D2ClientConfig()
this.failoutRedirectStrategy = failoutRedirectStrategy;
this.serviceDiscoveryEventEmitter = serviceDiscoveryEventEmitter;
this.dualReadStateManager = dualReadStateManager;
this.xdsExecutorService = xdsExecutorService;
this.xdsStreamReadyTimeout = xdsStreamReadyTimeout;
}
}
151 changes: 113 additions & 38 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import io.grpc.Status;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -39,6 +41,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -49,6 +52,7 @@
public class XdsClientImpl extends XdsClient
{
private static final Logger _log = LoggerFactory.getLogger(XdsClientImpl.class);
public static final long DEFAULT_READY_TIMEOUT_MILLIS = 2000L;

private final Map<String, ResourceSubscriber> _d2NodeSubscribers = new HashMap<>();
private final Map<String, ResourceSubscriber> _d2SymlinkNodeSubscribers = new HashMap<>();
Expand All @@ -62,9 +66,19 @@ public class XdsClientImpl extends XdsClient
private BackoffPolicy _retryBackoffPolicy;
private AdsStream _adsStream;
private boolean _shutdown;
private ScheduledFuture<?> _retryRpcStreamFuture;
private ScheduledFuture<?> _readyTimeoutFuture;
private final long _readyTimeoutMillis;

@Deprecated
public XdsClientImpl(Node node, ManagedChannel managedChannel, ScheduledExecutorService executorService)
{
this(node, managedChannel, executorService, DEFAULT_READY_TIMEOUT_MILLIS);
}

public XdsClientImpl(Node node, ManagedChannel managedChannel, ScheduledExecutorService executorService,
long readyTimeoutMillis) {
_readyTimeoutMillis = readyTimeoutMillis;
_node = node;
_managedChannel = managedChannel;
_executorService = executorService;
Expand All @@ -83,11 +97,13 @@ void watchXdsResource(String resourceName, ResourceType type, ResourceWatcher wa
subscriber = new ResourceSubscriber(type, resourceName);
resourceSubscriberMap.put(resourceName, subscriber);

if (_adsStream == null)
if (_adsStream == null && !isInBackoff())
{
startRpcStream();
startRpcStreamLocal();
}
if (_adsStream != null) {
_adsStream.sendDiscoveryRequest(type, Collections.singletonList(resourceName));
}
_adsStream.sendDiscoveryRequest(type, Collections.singletonList(resourceName));
}
subscriber.addWatcher(watcher);
});
Expand All @@ -96,9 +112,32 @@ void watchXdsResource(String resourceName, ResourceType type, ResourceWatcher wa
@Override
public void startRpcStream()
{
_executorService.execute(() -> {
if (!isInBackoff()) {
startRpcStreamLocal();
}
});
}

// Start RPC stream. Must be called from the executor, and only if we're not backed off.
private void startRpcStreamLocal() {
if (_shutdown) {
_log.warn("RPC stream cannot be started after shutdown!");
return;
}
// Check rpc stream is null to ensure duplicate RPC retry tasks are no-op
if (_adsStream != null) {
_log.warn("Tried to create duplicate RPC stream, ignoring!");
return;
}
AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub =
AggregatedDiscoveryServiceGrpc.newStub(_managedChannel);
_adsStream = new AdsStream(stub);
_readyTimeoutFuture = _executorService.schedule(() -> {
_log.warn("ADS stream not ready within {} milliseconds", _readyTimeoutMillis);
// notify subscribers about the error and wait for the stream to be ready by keeping it open.
notifyStreamError(Status.DEADLINE_EXCEEDED);
}, _readyTimeoutMillis, TimeUnit.MILLISECONDS);
_adsStream.start();
_log.info("ADS stream started, connected to server: {}", _managedChannel.authority());
}
Expand All @@ -122,6 +161,37 @@ String getXdsServerAuthority()
return _managedChannel.authority();
}

/**
* The client may be in backoff if there are RPC stream failures, and if it's waiting to establish the stream again.
* NOTE: Must be called from the executor.
* @return {@code true} if the client is in backoff
*/
private boolean isInBackoff() {
return _adsStream == null && _retryRpcStreamFuture != null && !_retryRpcStreamFuture.isDone();
}

/**
* Handles ready callbacks from the RPC stream. Must be called from the executor.
*/
private void readyHandler() {
_log.debug("Received ready callback from the ADS stream");
if (_adsStream == null || isInBackoff()) {
_log.warn("Unexpected state, ready called on null or backed off ADS stream!");
return;
}
// confirm ready state to neglect spurious callbacks; we'll get another callback whenever it is ready again.
if (_adsStream.isReady()) {
// if the ready timeout future is non-null, a reconnect notification hasn't been sent yet.
if (_readyTimeoutFuture != null) {
// timeout task will be cancelled only if it hasn't already executed.
boolean cancelledTimeout = _readyTimeoutFuture.cancel(false);
_log.info("ADS stream ready, cancelled timeout task: {}", cancelledTimeout);
_readyTimeoutFuture = null; // set it to null to avoid repeat notifications to subscribers.
notifyStreamReconnect();
shivamgupta1 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

private void handleD2NodeResponse(DiscoveryResponseData data)
{
Map<String, D2NodeUpdate> updates = new HashMap<>();
Expand Down Expand Up @@ -213,7 +283,7 @@ private void handleResourceUpdate(Map<String, ? extends ResourceUpdate> updates,
}
}

private void handleStreamClosed(Status error) {
private void notifyStreamError(Status error) {
for (ResourceSubscriber subscriber : _d2NodeSubscribers.values()) {
subscriber.onError(error);
}
Expand All @@ -222,7 +292,7 @@ private void handleStreamClosed(Status error) {
}
}

private void handleStreamRestarted() {
private void notifyStreamReconnect() {
for (ResourceSubscriber subscriber : _d2NodeSubscribers.values()) {
subscriber.onReconnect();
}
Expand Down Expand Up @@ -328,10 +398,7 @@ private void onReconnect()
final class RpcRetryTask implements Runnable {
@Override
public void run() {
if (_shutdown) {
return;
}
startRpcStream();
startRpcStreamLocal();
for (ResourceType type : ResourceType.values()) {
if (type == ResourceType.UNKNOWN) {
continue;
Expand All @@ -342,7 +409,6 @@ public void run() {
_adsStream.sendDiscoveryRequest(type, resources);
}
}
handleStreamRestarted();
}
}

Expand Down Expand Up @@ -477,37 +543,45 @@ private AdsStream(@Nonnull AggregatedDiscoveryServiceGrpc.AggregatedDiscoverySer
_responseReceived = false;
}

public boolean isReady() {
return _requestWriter != null && ((ClientCallStreamObserver<?>) _requestWriter).isReady();
}

private void start()
{
StreamObserver<DeltaDiscoveryResponse> responseReader = new StreamObserver<DeltaDiscoveryResponse>()
{
@Override
public void onNext(DeltaDiscoveryResponse response)
{
_executorService.execute(() ->
{
_log.debug("Received {} response:\n{}", ResourceType.fromTypeUrl(response.getTypeUrl()), response);
DiscoveryResponseData responseData = DiscoveryResponseData.fromEnvoyProto(response);
handleResponse(responseData);
});
}

@Override
public void onError(Throwable t)
{
_executorService.execute(() -> handleRpcError(t));
}

@Override
public void onCompleted()
{
_executorService.execute(() -> handleRpcCompleted());
}
};
StreamObserver<DeltaDiscoveryResponse> responseReader =
new ClientResponseObserver<DeltaDiscoveryRequest, DeltaDiscoveryResponse>() {
@Override
public void beforeStart(ClientCallStreamObserver<DeltaDiscoveryRequest> requestStream) {
requestStream.setOnReadyHandler(() -> _executorService.execute(XdsClientImpl.this::readyHandler));
}

@Override
public void onNext(DeltaDiscoveryResponse response)
{
_executorService.execute(() ->
{
_log.debug("Received {} response:\n{}", ResourceType.fromTypeUrl(response.getTypeUrl()), response);
DiscoveryResponseData responseData = DiscoveryResponseData.fromEnvoyProto(response);
handleResponse(responseData);
});
}

@Override
public void onError(Throwable t)
{
_executorService.execute(() -> handleRpcError(t));
}

@Override
public void onCompleted()
{
_executorService.execute(() -> handleRpcCompleted());
}
};
_requestWriter = _stub.withWaitForReady().deltaAggregatedResources(responseReader);
}


/**
* Sends a client-initiated discovery request.
*/
Expand Down Expand Up @@ -576,6 +650,7 @@ private void handleRpcCompleted()
handleRpcStreamClosed(Status.UNAVAILABLE.withDescription("ADS stream closed by server"));
}

// Must be called from the executor.
private void handleRpcStreamClosed(Status error)
{
if (_closed)
Expand All @@ -585,7 +660,7 @@ private void handleRpcStreamClosed(Status error)
_log.error("ADS stream closed with status {}: {}. Cause: {}", error.getCode(), error.getDescription(),
error.getCause());
_closed = true;
handleStreamClosed(error);
notifyStreamError(error);
cleanUp();
if (_responseReceived || _retryBackoffPolicy == null) {
// Reset the backoff sequence if had received a response, or backoff sequence
Expand All @@ -597,7 +672,7 @@ private void handleRpcStreamClosed(Status error)
delayNanos = _retryBackoffPolicy.nextBackoffNanos();
}
_log.info("Retry ADS stream in {} ns", delayNanos);
_executorService.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS);
_retryRpcStreamFuture = _executorService.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS);
}

private void close(Exception error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class XdsToD2PropertiesAdaptor
private static final String D2_URI_NODE_PREFIX = "/d2/uris/";
private static final char SYMLINK_NODE_IDENTIFIER = '$';
private static final char PATH_SEPARATOR = '/';
private static final String NON_EXISTENT_CLUSTER = "NonExistentCluster";

private final XdsClient _xdsClient;
private final List<XdsConnectionListener> _xdsConnectionListeners;
Expand All @@ -82,7 +83,7 @@ public class XdsToD2PropertiesAdaptor
private final Object _symlinkAndActualNodeLock = new Object();
private final ServiceDiscoveryEventEmitter _eventEmitter;

private boolean _isAvailable;
private Boolean _isAvailable;
private PropertyEventBus<UriProperties> _uriEventBus;
private PropertyEventBus<ServiceProperties> _serviceEventBus;
private PropertyEventBus<ClusterProperties> _clusterEventBus;
Expand All @@ -97,7 +98,8 @@ public XdsToD2PropertiesAdaptor(XdsClient xdsClient, DualReadStateManager dualRe
_clusterPropertiesJsonSerializer = new ClusterPropertiesJsonSerializer();
_uriPropertiesJsonSerializer = new UriPropertiesJsonSerializer();
_uriPropertiesMerger = new UriPropertiesMerger();
_isAvailable = false;
// set to null so that the first notification on connection establishment success/failure is always sent
_isAvailable = null;
_watchedClusterResources = new ConcurrentHashMap<>();
_watchedSymlinkResources = new ConcurrentHashMap<>();
_watchedServiceResources = new ConcurrentHashMap<>();
Expand All @@ -108,7 +110,10 @@ public XdsToD2PropertiesAdaptor(XdsClient xdsClient, DualReadStateManager dualRe
public void start()
{
_xdsClient.startRpcStream();
notifyAvailabilityChanges(true);
// Watch any resource to get notified of xds connection updates, including initial connection establishment.
// TODO: Note, this is a workaround since the xDS client implementation currently integrates connection
// error/success notifications along with the resource updates. This can be improved in a future refactor.
listenToCluster(NON_EXISTENT_CLUSTER);
}

public void shutdown()
Expand Down Expand Up @@ -361,7 +366,7 @@ private void notifyAvailabilityChanges(boolean isAvailable)
{
synchronized (_xdsConnectionListeners)
{
if (_isAvailable != isAvailable)
if (_isAvailable == null || _isAvailable != isAvailable)
{
_isAvailable = isAvailable;

Expand Down
Loading
Loading