diff --git a/CHANGELOG.md b/CHANGELOG.md index 42f44879a1..448257624e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.46.8] - 2023-10-11 +- add metrics about xds connection status and count + ## [29.46.7] - 2023-10-10 - fix xDS client bugs and race conditions @@ -5551,7 +5554,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.46.7...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.46.8...master +[29.46.8]: https://github.com/linkedin/rest.li/compare/v29.46.7...v29.46.8 [29.46.7]: https://github.com/linkedin/rest.li/compare/v29.46.6...v29.46.7 [29.46.6]: https://github.com/linkedin/rest.li/compare/v29.46.5...v29.46.6 [29.46.5]: https://github.com/linkedin/rest.li/compare/v29.45.1...v29.45.2 diff --git a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java index a7f02cc14d..7795d8c80f 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java @@ -299,6 +299,16 @@ public void registerDualReadLoadBalancerJmx(DualReadLoadBalancerJmx dualReadLoad _jmxManager.registerDualReadLoadBalancerJmxBean(jmxName, dualReadLoadBalancerJmx); } + public void registerXdsClientJmx(XdsClientJmx xdsClientJmx) + { + if (_discoverySourceType != DiscoverySourceType.XDS) + { + _log.warn("Setting XdsClientJmx for Non-XDS source type: {}", _discoverySourceType); + } + final String jmxName = String.format("%s-XdsClientJmx", getGlobalPrefix(null)); + _jmxManager.registerXdsClientJmxBean(jmxName, xdsClientJmx); + } + private void doRegisterLoadBalancer(SimpleLoadBalancer balancer, @Nullable DualReadModeProvider.DualReadMode mode) { final String jmxName = String.format("%s-LoadBalancer", getGlobalPrefix(mode)); diff --git a/d2/src/main/java/com/linkedin/d2/jmx/JmxManager.java b/d2/src/main/java/com/linkedin/d2/jmx/JmxManager.java index d6ea394b99..c4b9eef977 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/JmxManager.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/JmxManager.java @@ -179,6 +179,12 @@ public synchronized JmxManager registerDualReadLoadBalancerJmxBean(String name, return this; } + public synchronized JmxManager registerXdsClientJmxBean(String name, XdsClientJmxMBean xdsClientJmx) + { + checkReg(xdsClientJmx, name); + return this; + } + public synchronized JmxManager registerZooKeeperAnnouncer(String name, ZooKeeperAnnouncer announcer) { diff --git a/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmx.java b/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmx.java new file mode 100644 index 0000000000..f8572e52ac --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmx.java @@ -0,0 +1,74 @@ +/* + Copyright (c) 2023 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.d2.jmx; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + + +public class XdsClientJmx implements XdsClientJmxMBean { + + private final AtomicInteger _connectionLostCount = new AtomicInteger(); + private final AtomicInteger _connectionClosedCount = new AtomicInteger(); + private final AtomicInteger _reconnectionCount = new AtomicInteger(); + + private final AtomicBoolean _isConnected = new AtomicBoolean(); + + @Override + public int getConnectionLostCount() + { + return _connectionLostCount.get(); + } + + @Override + public int getConnectionClosedCount() + { + return _connectionClosedCount.get(); + } + + @Override + public int getReconnectionCount() + { + return _reconnectionCount.get(); + } + + @Override + public int isDisconnected() + { + return _isConnected.get() ? 0 : 1; + } + + public void incrementConnectionLostCount() + { + _connectionLostCount.incrementAndGet(); + } + + public void incrementConnectionClosedCount() + { + _connectionClosedCount.incrementAndGet(); + } + + public void incrementReconnectionCount() + { + _reconnectionCount.incrementAndGet(); + } + + public void setIsConnected(boolean connected) + { + _isConnected.getAndSet(connected); + } +} diff --git a/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmxMBean.java b/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmxMBean.java new file mode 100644 index 0000000000..a6f906eba7 --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmxMBean.java @@ -0,0 +1,34 @@ +/* + Copyright (c) 2023 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.d2.jmx; + +public interface XdsClientJmxMBean { + + // when the connection is lost for errors. + int getConnectionLostCount(); + + // when the connection is closed by xDS server. + int getConnectionClosedCount(); + + // when the connection is reconnected + int getReconnectionCount(); + + // whether client is disconnected from xDS server: 1 means disconnected; 0 means connected. + // note: users need to pay attention to disconnected rather than connected state, so setting the metric this way + // to stress the disconnected state. + int isDisconnected(); +} diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java index 71ddde5ceb..a9b48de6c5 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java @@ -16,6 +16,7 @@ package com.linkedin.d2.xds; +import com.linkedin.d2.jmx.XdsClientJmx; import indis.XdsD2; import io.grpc.Status; import java.util.Map; @@ -171,4 +172,6 @@ String typeUrl() abstract void shutdown(); abstract String getXdsServerAuthority(); + + abstract public XdsClientJmx getXdsClientJmx(); } diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index 43ef338e92..2b1a5b7b15 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -19,6 +19,7 @@ import com.google.common.base.Joiner; import com.google.protobuf.InvalidProtocolBufferException; import com.google.rpc.Code; +import com.linkedin.d2.jmx.XdsClientJmx; import indis.XdsD2; import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc; import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest; @@ -70,6 +71,8 @@ public class XdsClientImpl extends XdsClient private ScheduledFuture _readyTimeoutFuture; private final long _readyTimeoutMillis; + private final XdsClientJmx _xdsClientJmx; + @Deprecated public XdsClientImpl(Node node, ManagedChannel managedChannel, ScheduledExecutorService executorService) { @@ -77,11 +80,13 @@ public XdsClientImpl(Node node, ManagedChannel managedChannel, ScheduledExecutor } public XdsClientImpl(Node node, ManagedChannel managedChannel, ScheduledExecutorService executorService, - long readyTimeoutMillis) { + long readyTimeoutMillis) + { _readyTimeoutMillis = readyTimeoutMillis; _node = node; _managedChannel = managedChannel; _executorService = executorService; + _xdsClientJmx = new XdsClientJmx(); } @Override @@ -119,6 +124,12 @@ public void startRpcStream() }); } + @Override + public XdsClientJmx getXdsClientJmx() + { + return _xdsClientJmx; + } + // Start RPC stream. Must be called from the executor, and only if we're not backed off. private void startRpcStreamLocal() { if (_shutdown) { @@ -137,6 +148,8 @@ private void startRpcStreamLocal() { _log.warn("ADS stream not ready within {} milliseconds", _readyTimeoutMillis); // notify subscribers about the error and wait for the stream to be ready by keeping it open. notifyStreamError(Status.DEADLINE_EXCEEDED); + // note: no need to start a retry task explicitly since xds stream internally will keep on retrying to connect + // to one of the sub-channels (unless an error or complete callback is called). }, _readyTimeoutMillis, TimeUnit.MILLISECONDS); _adsStream.start(); _log.info("ADS stream started, connected to server: {}", _managedChannel.authority()); @@ -166,29 +179,36 @@ String getXdsServerAuthority() * NOTE: Must be called from the executor. * @return {@code true} if the client is in backoff */ - private boolean isInBackoff() { + private boolean isInBackoff() + { return _adsStream == null && _retryRpcStreamFuture != null && !_retryRpcStreamFuture.isDone(); } /** * Handles ready callbacks from the RPC stream. Must be called from the executor. */ - private void readyHandler() { + private void readyHandler() + { _log.debug("Received ready callback from the ADS stream"); - if (_adsStream == null || isInBackoff()) { + if (_adsStream == null || isInBackoff()) + { _log.warn("Unexpected state, ready called on null or backed off ADS stream!"); return; } // confirm ready state to neglect spurious callbacks; we'll get another callback whenever it is ready again. - if (_adsStream.isReady()) { + if (_adsStream.isReady()) + { // if the ready timeout future is non-null, a reconnect notification hasn't been sent yet. - if (_readyTimeoutFuture != null) { + if (_readyTimeoutFuture != null) + { // timeout task will be cancelled only if it hasn't already executed. boolean cancelledTimeout = _readyTimeoutFuture.cancel(false); _log.info("ADS stream ready, cancelled timeout task: {}", cancelledTimeout); _readyTimeoutFuture = null; // set it to null to avoid repeat notifications to subscribers. + _xdsClientJmx.incrementReconnectionCount(); notifyStreamReconnect(); } + _xdsClientJmx.setIsConnected(true); } } @@ -642,11 +662,15 @@ private void handleResponse(DiscoveryResponseData response) private void handleRpcError(Throwable t) { + _xdsClientJmx.incrementConnectionLostCount(); + _xdsClientJmx.setIsConnected(false); handleRpcStreamClosed(Status.fromThrowable(t)); } private void handleRpcCompleted() { + _xdsClientJmx.incrementConnectionClosedCount(); + _xdsClientJmx.setIsConnected(false); handleRpcStreamClosed(Status.UNAVAILABLE.withDescription("ADS stream closed by server")); } diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java index 3c724ef91a..a05fc1ef2a 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java @@ -55,6 +55,8 @@ public LoadBalancerWithFacilities create(D2ClientConfig config) XdsClient xdsClient = new XdsClientImpl(new Node(config.hostName), new XdsChannelFactory(config.grpcSslContext, config.xdsServer).createChannel(), executorService, xdsStreamReadyTimeout); + d2ClientJmxManager.registerXdsClientJmx(xdsClient.getXdsClientJmx()); + XdsToD2PropertiesAdaptor adaptor = new XdsToD2PropertiesAdaptor(xdsClient, config.dualReadStateManager, config.serviceDiscoveryEventEmitter); diff --git a/gradle.properties b/gradle.properties index 78dc3350f1..836a948536 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.46.7 +version=29.46.8 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true