Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics about xds connection status and count #943

Merged
merged 4 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.46.8] - 2023-10-11
- add metrics about xds connection status and count

## [29.46.7] - 2023-10-10
- fix xDS client bugs and race conditions

Expand Down Expand Up @@ -5551,7 +5554,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.46.7...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.46.8...master
[29.46.8]: https://github.com/linkedin/rest.li/compare/v29.46.7...v29.46.8
[29.46.7]: https://github.com/linkedin/rest.li/compare/v29.46.6...v29.46.7
[29.46.6]: https://github.com/linkedin/rest.li/compare/v29.46.5...v29.46.6
[29.46.5]: https://github.com/linkedin/rest.li/compare/v29.45.1...v29.45.2
Expand Down
10 changes: 10 additions & 0 deletions d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,16 @@ public void registerDualReadLoadBalancerJmx(DualReadLoadBalancerJmx dualReadLoad
_jmxManager.registerDualReadLoadBalancerJmxBean(jmxName, dualReadLoadBalancerJmx);
}

public void registerXdsClientJmx(XdsClientJmx xdsClientJmx)
{
if (_discoverySourceType != DiscoverySourceType.XDS)
{
_log.warn("Setting XdsClientJmx for Non-XDS source type: {}", _discoverySourceType);
}
final String jmxName = String.format("%s-XdsClientJmx", getGlobalPrefix(null));
_jmxManager.registerXdsClientJmxBean(jmxName, xdsClientJmx);
}

private void doRegisterLoadBalancer(SimpleLoadBalancer balancer, @Nullable DualReadModeProvider.DualReadMode mode)
{
final String jmxName = String.format("%s-LoadBalancer", getGlobalPrefix(mode));
Expand Down
6 changes: 6 additions & 0 deletions d2/src/main/java/com/linkedin/d2/jmx/JmxManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ public synchronized JmxManager registerDualReadLoadBalancerJmxBean(String name,
return this;
}

public synchronized JmxManager registerXdsClientJmxBean(String name, XdsClientJmxMBean xdsClientJmx)
{
checkReg(xdsClientJmx, name);
return this;
}

public synchronized JmxManager registerZooKeeperAnnouncer(String name,
ZooKeeperAnnouncer announcer)
{
Expand Down
74 changes: 74 additions & 0 deletions d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmx.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright (c) 2023 LinkedIn Corp.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.linkedin.d2.jmx;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;


public class XdsClientJmx implements XdsClientJmxMBean {

private final AtomicInteger _connectionLostCount = new AtomicInteger();
private final AtomicInteger _connectionClosedCount = new AtomicInteger();
private final AtomicInteger _reconnectionCount = new AtomicInteger();

private final AtomicBoolean _isConnected = new AtomicBoolean();

@Override
public int getConnectionLostCount()
{
return _connectionLostCount.get();
}

@Override
public int getConnectionClosedCount()
{
return _connectionClosedCount.get();
}

@Override
public int getReconnectionCount()
{
return _reconnectionCount.get();
}

@Override
public int isDisconnected()
{
return _isConnected.get() ? 0 : 1;
}

public void incrementConnectionLostCount()
{
_connectionLostCount.incrementAndGet();
}

public void incrementConnectionClosedCount()
{
_connectionClosedCount.incrementAndGet();
}

public void incrementReconnectionCount()
{
_reconnectionCount.incrementAndGet();
}

public void setIsConnected(boolean connected)
{
_isConnected.getAndSet(connected);
}
}
34 changes: 34 additions & 0 deletions d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmxMBean.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
Copyright (c) 2023 LinkedIn Corp.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.linkedin.d2.jmx;

public interface XdsClientJmxMBean {

// when the connection is lost for errors.
int getConnectionLostCount();

// when the connection is closed by xDS server.
int getConnectionClosedCount();

// when the connection is reconnected
int getReconnectionCount();

// whether client is disconnected from xDS server: 1 means disconnected; 0 means connected.
// note: users need to pay attention to disconnected rather than connected state, so setting the metric this way
// to stress the disconnected state.
int isDisconnected();
}
3 changes: 3 additions & 0 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.linkedin.d2.xds;

import com.linkedin.d2.jmx.XdsClientJmx;
import indis.XdsD2;
import io.grpc.Status;
import java.util.Map;
Expand Down Expand Up @@ -171,4 +172,6 @@ String typeUrl()
abstract void shutdown();

abstract String getXdsServerAuthority();

abstract public XdsClientJmx getXdsClientJmx();
}
36 changes: 30 additions & 6 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.base.Joiner;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.rpc.Code;
import com.linkedin.d2.jmx.XdsClientJmx;
import indis.XdsD2;
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest;
Expand Down Expand Up @@ -70,18 +71,22 @@ public class XdsClientImpl extends XdsClient
private ScheduledFuture<?> _readyTimeoutFuture;
private final long _readyTimeoutMillis;

private final XdsClientJmx _xdsClientJmx;

@Deprecated
public XdsClientImpl(Node node, ManagedChannel managedChannel, ScheduledExecutorService executorService)
{
this(node, managedChannel, executorService, DEFAULT_READY_TIMEOUT_MILLIS);
}

public XdsClientImpl(Node node, ManagedChannel managedChannel, ScheduledExecutorService executorService,
long readyTimeoutMillis) {
long readyTimeoutMillis)
{
_readyTimeoutMillis = readyTimeoutMillis;
_node = node;
_managedChannel = managedChannel;
_executorService = executorService;
_xdsClientJmx = new XdsClientJmx();
shivamgupta1 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down Expand Up @@ -119,6 +124,12 @@ public void startRpcStream()
});
}

@Override
public XdsClientJmx getXdsClientJmx()
{
return _xdsClientJmx;
}

// Start RPC stream. Must be called from the executor, and only if we're not backed off.
private void startRpcStreamLocal() {
if (_shutdown) {
Expand All @@ -137,6 +148,8 @@ private void startRpcStreamLocal() {
_log.warn("ADS stream not ready within {} milliseconds", _readyTimeoutMillis);
// notify subscribers about the error and wait for the stream to be ready by keeping it open.
notifyStreamError(Status.DEADLINE_EXCEEDED);
// note: no need to start a retry task explicitly since xds stream internally will keep on retrying to connect
// to one of the sub-channels (unless an error or complete callback is called).
}, _readyTimeoutMillis, TimeUnit.MILLISECONDS);
_adsStream.start();
_log.info("ADS stream started, connected to server: {}", _managedChannel.authority());
Expand Down Expand Up @@ -166,29 +179,36 @@ String getXdsServerAuthority()
* NOTE: Must be called from the executor.
* @return {@code true} if the client is in backoff
*/
private boolean isInBackoff() {
private boolean isInBackoff()
{
return _adsStream == null && _retryRpcStreamFuture != null && !_retryRpcStreamFuture.isDone();
}

/**
* Handles ready callbacks from the RPC stream. Must be called from the executor.
*/
private void readyHandler() {
private void readyHandler()
{
_log.debug("Received ready callback from the ADS stream");
if (_adsStream == null || isInBackoff()) {
if (_adsStream == null || isInBackoff())
{
_log.warn("Unexpected state, ready called on null or backed off ADS stream!");
return;
}
// confirm ready state to neglect spurious callbacks; we'll get another callback whenever it is ready again.
if (_adsStream.isReady()) {
if (_adsStream.isReady())
{
// if the ready timeout future is non-null, a reconnect notification hasn't been sent yet.
if (_readyTimeoutFuture != null) {
if (_readyTimeoutFuture != null)
{
// timeout task will be cancelled only if it hasn't already executed.
boolean cancelledTimeout = _readyTimeoutFuture.cancel(false);
_log.info("ADS stream ready, cancelled timeout task: {}", cancelledTimeout);
_readyTimeoutFuture = null; // set it to null to avoid repeat notifications to subscribers.
_xdsClientJmx.incrementReconnectionCount();
notifyStreamReconnect();
}
_xdsClientJmx.setIsConnected(true);
}
}

Expand Down Expand Up @@ -642,11 +662,15 @@ private void handleResponse(DiscoveryResponseData response)

private void handleRpcError(Throwable t)
{
_xdsClientJmx.incrementConnectionLostCount();
_xdsClientJmx.setIsConnected(false);
handleRpcStreamClosed(Status.fromThrowable(t));
}

private void handleRpcCompleted()
{
_xdsClientJmx.incrementConnectionClosedCount();
_xdsClientJmx.setIsConnected(false);
handleRpcStreamClosed(Status.UNAVAILABLE.withDescription("ADS stream closed by server"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public LoadBalancerWithFacilities create(D2ClientConfig config)
XdsClient xdsClient = new XdsClientImpl(new Node(config.hostName),
new XdsChannelFactory(config.grpcSslContext, config.xdsServer).createChannel(), executorService,
xdsStreamReadyTimeout);
d2ClientJmxManager.registerXdsClientJmx(xdsClient.getXdsClientJmx());

XdsToD2PropertiesAdaptor adaptor = new XdsToD2PropertiesAdaptor(xdsClient, config.dualReadStateManager,
config.serviceDiscoveryEventEmitter);

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.46.7
version=29.46.8
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down
Loading