diff --git a/CHANGELOG.md b/CHANGELOG.md index 0809448634..4e60da83ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.55.0] - 2024-05-14 +- degrade hosts for HTTP/2 stream errors in Degrader and Relative LB. + ## [29.55.0] - 2024-05-23 - Allow HttpBridge to return RetriableRequestException for the Netty max active stream error diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java index e539cd0b95..1d704aa38d 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java @@ -217,7 +217,8 @@ public D2Client build() _config.xdsChannelLoadBalancingPolicy, _config.xdsChannelLoadBalancingPolicyConfig, _config.subscribeToUriGlobCollection, - _config._xdsServerMetricsProvider + _config._xdsServerMetricsProvider, + _config.loadBalanceStreamException ); final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ? @@ -724,6 +725,11 @@ public D2ClientBuilder setXdsServerMetricsProvider(XdsServerMetricsProvider xdsS return this; } + public D2ClientBuilder setLoadBalanceStreamException(boolean loadBalanceStreamException) { + _config.loadBalanceStreamException = loadBalanceStreamException; + return this; + } + private Map createDefaultTransportClientFactories() { final Map clientFactories = new HashMap<>(); @@ -756,9 +762,10 @@ private Map> createDefaultLoadBalancerStr if (_config.enableRelativeLoadBalancer) { + // TODO: create StateUpdater.LoadBalanceConfig and pass it to the RelativeLoadBalancerStrategyFactory final RelativeLoadBalancerStrategyFactory relativeLoadBalancerStrategyFactory = new RelativeLoadBalancerStrategyFactory( _config._executorService, _config.healthCheckOperations, Collections.emptyList(), _config.eventEmitter, - SystemClock.instance()); + SystemClock.instance(), _config.loadBalanceStreamException); loadBalancerStrategyFactories.putIfAbsent(RelativeLoadBalancerStrategy.RELATIVE_LOAD_BALANCER_STRATEGY_NAME, relativeLoadBalancerStrategyFactory); } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java index a3c289c971..798373478c 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java @@ -138,6 +138,7 @@ public class D2ClientConfig public Map xdsChannelLoadBalancingPolicyConfig = null; public boolean subscribeToUriGlobCollection = false; public XdsServerMetricsProvider _xdsServerMetricsProvider = new NoOpXdsServerMetricsProvider(); + public boolean loadBalanceStreamException = false; public D2ClientConfig() { @@ -214,7 +215,8 @@ public D2ClientConfig() String xdsChannelLoadBalancingPolicy, Map xdsChannelLoadBalancingPolicyConfig, boolean subscribeToUriGlobCollection, - XdsServerMetricsProvider xdsServerMetricsProvider + XdsServerMetricsProvider xdsServerMetricsProvider, + boolean loadBalanceStreamException ) { this.zkHosts = zkHosts; @@ -289,5 +291,6 @@ public D2ClientConfig() this.xdsChannelLoadBalancingPolicyConfig = xdsChannelLoadBalancingPolicyConfig; this.subscribeToUriGlobCollection = subscribeToUriGlobCollection; this._xdsServerMetricsProvider = xdsServerMetricsProvider; + this.loadBalanceStreamException = loadBalanceStreamException; } } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java index 7df9629ef6..76d53f93a3 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java @@ -100,7 +100,8 @@ public LoadBalancerWithFacilities create(D2ClientConfig config) SimpleLoadBalancerState state = new SimpleLoadBalancerState( config._executorService, uriBus, clusterBus, serviceBus, config.clientFactories, config.loadBalancerStrategyFactories, config.sslContext, config.sslParameters, config.isSSLEnabled, config.partitionAccessorRegistry, - config.sslSessionValidatorFactory, config.deterministicSubsettingMetadataProvider, config.canaryDistributionProvider); + config.sslSessionValidatorFactory, config.deterministicSubsettingMetadataProvider, config.canaryDistributionProvider, + config.loadBalanceStreamException); d2ClientJmxManager.setSimpleLoadBalancerState(state); SimpleLoadBalancer simpleLoadBalancer = new SimpleLoadBalancer(state, config.lbWaitTimeout, config.lbWaitUnit, config._executorService, diff --git a/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java index 2f296b49f7..07aa4b94d7 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java @@ -98,7 +98,8 @@ private ZKFSLoadBalancer.TogglingLoadBalancerFactory createLoadBalancerFactory(D config.failoutConfigProviderFactory, config.canaryDistributionProvider, config.serviceDiscoveryEventEmitter, - config.dualReadStateManager + config.dualReadStateManager, + config.loadBalanceStreamException ); } } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/clients/DegraderTrackerClientImpl.java b/d2/src/main/java/com/linkedin/d2/balancer/clients/DegraderTrackerClientImpl.java index ac6efa86fe..e54bb497ad 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/clients/DegraderTrackerClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/clients/DegraderTrackerClientImpl.java @@ -39,6 +39,7 @@ public class DegraderTrackerClientImpl extends TrackerClientImpl implements Degr private final Map _partitionStates; + @Deprecated public DegraderTrackerClientImpl(URI uri, Map partitionDataMap, TransportClient wrappedClient) { this(uri, partitionDataMap, wrappedClient, SystemClock.instance(), null, @@ -61,6 +62,13 @@ public DegraderTrackerClientImpl(URI uri, Map partitionD public DegraderTrackerClientImpl(URI uri, Map partitionDataMap, TransportClient wrappedClient, Clock clock, DegraderImpl.Config config, long interval, Pattern errorStatusPattern, boolean doNotSlowStart) + { + this(uri, partitionDataMap, wrappedClient, clock, config, interval, errorStatusPattern, doNotSlowStart, false); + } + + public DegraderTrackerClientImpl(URI uri, Map partitionDataMap, TransportClient wrappedClient, + Clock clock, DegraderImpl.Config config, long interval, Pattern errorStatusPattern, + boolean doNotSlowStart, boolean loadBalanceStreamException) { super(uri, partitionDataMap, wrappedClient, clock, interval, (status) -> errorStatusPattern.matcher(Integer.toString(status)).matches(), true, doNotSlowStart, false); @@ -79,6 +87,7 @@ public DegraderTrackerClientImpl(URI uri, Map partitionD { config.setInitialDropRate(DegraderImpl.DEFAULT_DO_NOT_SLOW_START_INITIAL_DROP_RATE); } + config.setLoadBalanceStreamException(loadBalanceStreamException); /* TrackerClient contains state for each partition, but they actually share the same DegraderImpl * diff --git a/d2/src/main/java/com/linkedin/d2/balancer/clients/TrackerClientFactory.java b/d2/src/main/java/com/linkedin/d2/balancer/clients/TrackerClientFactory.java index c97048aeef..60788cc6e3 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/clients/TrackerClientFactory.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/clients/TrackerClientFactory.java @@ -54,9 +54,7 @@ public class TrackerClientFactory private static final int LOG_RATE_MS = 20000; - /** - * @see #createTrackerClient(URI, UriProperties, ServiceProperties, String, TransportClient, Clock) - */ + @Deprecated public static TrackerClient createTrackerClient(URI uri, UriProperties uriProperties, ServiceProperties serviceProperties, @@ -66,6 +64,17 @@ public static TrackerClient createTrackerClient(URI uri, return createTrackerClient(uri, uriProperties, serviceProperties, loadBalancerStrategyName, transportClient, SystemClock.instance()); } + public static TrackerClient createTrackerClient(URI uri, + UriProperties uriProperties, + ServiceProperties serviceProperties, + String loadBalancerStrategyName, + TransportClient transportClient, + boolean loadBalanceStreamException) + { + return createTrackerClient(uri, uriProperties, serviceProperties, loadBalancerStrategyName, transportClient, + SystemClock.instance(), loadBalanceStreamException); + } + /** * Creates a {@link TrackerClient}. * @@ -77,12 +86,25 @@ public static TrackerClient createTrackerClient(URI uri, * @param clock Clock used for internal call tracking. * @return TrackerClient */ + @Deprecated public static TrackerClient createTrackerClient(URI uri, UriProperties uriProperties, ServiceProperties serviceProperties, String loadBalancerStrategyName, TransportClient transportClient, Clock clock) + { + return createTrackerClient(uri, uriProperties, serviceProperties, loadBalancerStrategyName, transportClient, clock, + false); + } + + public static TrackerClient createTrackerClient(URI uri, + UriProperties uriProperties, + ServiceProperties serviceProperties, + String loadBalancerStrategyName, + TransportClient transportClient, + Clock clock, + boolean loadBalanceStreamException) { TrackerClient trackerClient; @@ -104,7 +126,8 @@ public static TrackerClient createTrackerClient(URI uri, switch (loadBalancerStrategyName) { case (DegraderLoadBalancerStrategyV3.DEGRADER_STRATEGY_NAME): - trackerClient = createDegraderTrackerClient(uri, uriProperties, serviceProperties, loadBalancerStrategyName, transportClient, clock, doNotSlowStart); + trackerClient = createDegraderTrackerClient(uri, uriProperties, serviceProperties, loadBalancerStrategyName, + transportClient, clock, doNotSlowStart, loadBalanceStreamException); break; case (RelativeLoadBalancerStrategy.RELATIVE_LOAD_BALANCER_STRATEGY_NAME): trackerClient = createTrackerClientImpl(uri, uriProperties, serviceProperties, loadBalancerStrategyName, @@ -124,7 +147,8 @@ private static DegraderTrackerClient createDegraderTrackerClient(URI uri, String loadBalancerStrategyName, TransportClient transportClient, Clock clock, - boolean doNotSlowStart) + boolean doNotSlowStart, + boolean loadBalanceStreamException) { DegraderImpl.Config config = null; @@ -151,7 +175,8 @@ private static DegraderTrackerClient createDegraderTrackerClient(URI uri, config, trackerClientInterval, errorStatusPattern, - doNotSlowStart); + doNotSlowStart, + loadBalanceStreamException); } private static long getInterval(String loadBalancerStrategyName, ServiceProperties serviceProperties) diff --git a/d2/src/main/java/com/linkedin/d2/balancer/clients/TrackerClientImpl.java b/d2/src/main/java/com/linkedin/d2/balancer/clients/TrackerClientImpl.java index 345ef7e324..47cbe75788 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/clients/TrackerClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/clients/TrackerClientImpl.java @@ -42,6 +42,7 @@ import com.linkedin.util.degrader.CallTrackerImpl; import com.linkedin.util.degrader.ErrorType; +import io.netty.handler.codec.http2.Http2Exception; import java.net.ConnectException; import java.net.URI; import java.nio.channels.ClosedChannelException; @@ -311,6 +312,10 @@ else if (originalThrowable instanceof TimeoutException) { callCompletion.endCallWithError(ErrorType.TIMEOUT_EXCEPTION); } + else if (originalThrowable instanceof Http2Exception.StreamException) + { + callCompletion.endCallWithError(ErrorType.STREAM_ERROR); + } else { callCompletion.endCallWithError(ErrorType.REMOTE_INVOCATION_EXCEPTION); diff --git a/d2/src/main/java/com/linkedin/d2/balancer/simple/SimpleLoadBalancerState.java b/d2/src/main/java/com/linkedin/d2/balancer/simple/SimpleLoadBalancerState.java index d7239e0f65..4614ce6cbd 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/simple/SimpleLoadBalancerState.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/simple/SimpleLoadBalancerState.java @@ -146,6 +146,7 @@ public class SimpleLoadBalancerState implements LoadBalancerState, ClientFactory private final SslSessionValidatorFactory _sslSessionValidatorFactory; private final SubsettingState _subsettingState; private final CanaryDistributionProvider _canaryDistributionProvider; + private final boolean _loadBalanceStreamException; /* * Concurrency considerations: @@ -314,6 +315,26 @@ public SimpleLoadBalancerState(ScheduledExecutorService executorService, SslSessionValidatorFactory sessionValidatorFactory, DeterministicSubsettingMetadataProvider deterministicSubsettingMetadataProvider, CanaryDistributionProvider canaryDistributionProvider) + { + this(executorService, uriBus, clusterBus, serviceBus, clientFactories, loadBalancerStrategyFactories, sslContext, + sslParameters, isSSLEnabled, partitionAccessorRegistry, sessionValidatorFactory, + deterministicSubsettingMetadataProvider, canaryDistributionProvider, false); + } + + public SimpleLoadBalancerState(ScheduledExecutorService executorService, + PropertyEventBus uriBus, + PropertyEventBus clusterBus, + PropertyEventBus serviceBus, + Map clientFactories, + Map> loadBalancerStrategyFactories, + SSLContext sslContext, + SSLParameters sslParameters, + boolean isSSLEnabled, + PartitionAccessorRegistry partitionAccessorRegistry, + SslSessionValidatorFactory sessionValidatorFactory, + DeterministicSubsettingMetadataProvider deterministicSubsettingMetadataProvider, + CanaryDistributionProvider canaryDistributionProvider, + boolean loadBalanceStreamException) { _executor = executorService; _uriProperties = new ConcurrentHashMap<>(); @@ -349,6 +370,7 @@ public SimpleLoadBalancerState(ScheduledExecutorService executorService, _subsettingState = null; } _canaryDistributionProvider = canaryDistributionProvider; + _loadBalanceStreamException = loadBalanceStreamException; } public void register(final SimpleLoadBalancerStateListener listener) @@ -952,11 +974,9 @@ private TrackerClient buildTrackerClient(URI uri, UriProperties uriProperties, S return null; } - return serviceProperties == null ? null : TrackerClientFactory.createTrackerClient(uri, - uriProperties, - serviceProperties, - loadBalancerStrategy.getName(), - transportClient); + //TODO: create TrackerClient.LoadBalanceConfig and pass it into createTrackerClient method + return serviceProperties == null ? null : TrackerClientFactory.createTrackerClient(uri, uriProperties, + serviceProperties, loadBalancerStrategy.getName(), transportClient, _loadBalanceStreamException); } /** diff --git a/d2/src/main/java/com/linkedin/d2/balancer/strategies/relative/RelativeLoadBalancerStrategyFactory.java b/d2/src/main/java/com/linkedin/d2/balancer/strategies/relative/RelativeLoadBalancerStrategyFactory.java index 7a929ce292..cd019594fa 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/strategies/relative/RelativeLoadBalancerStrategyFactory.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/strategies/relative/RelativeLoadBalancerStrategyFactory.java @@ -76,15 +76,24 @@ public class RelativeLoadBalancerStrategyFactory implements LoadBalancerStrategy private final List> _stateListenerFactories; private final EventEmitter _eventEmitter; private final Clock _clock; + private final boolean _loadBalanceStreamException; public RelativeLoadBalancerStrategyFactory(ScheduledExecutorService executorService, HealthCheckOperations healthCheckOperations, List> stateListenerFactories, EventEmitter eventEmitter, Clock clock) + { + this(executorService, healthCheckOperations, stateListenerFactories, eventEmitter, clock, false); + } + + public RelativeLoadBalancerStrategyFactory(ScheduledExecutorService executorService, HealthCheckOperations healthCheckOperations, + List> stateListenerFactories, EventEmitter eventEmitter, Clock clock, + boolean loadBalanceStreamException) { _executorService = executorService; _healthCheckOperations = healthCheckOperations; _stateListenerFactories = stateListenerFactories; _eventEmitter = (eventEmitter == null) ? new NoopEventEmitter() : eventEmitter; _clock = clock; + _loadBalanceStreamException = loadBalanceStreamException; } @@ -112,7 +121,8 @@ private StateUpdater getRelativeStateUpdater(D2RelativeStrategyProperties relati { listenerFactories.addAll(_stateListenerFactories); } - return new StateUpdater(relativeStrategyProperties, quarantineManager, _executorService, listenerFactories, serviceName); + return new StateUpdater(relativeStrategyProperties, quarantineManager, _executorService, listenerFactories, + serviceName, _loadBalanceStreamException); } private ClientSelector getClientSelector(D2RelativeStrategyProperties relativeStrategyProperties) diff --git a/d2/src/main/java/com/linkedin/d2/balancer/strategies/relative/StateUpdater.java b/d2/src/main/java/com/linkedin/d2/balancer/strategies/relative/StateUpdater.java index a94c0600d0..91f55573e5 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/strategies/relative/StateUpdater.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/strategies/relative/StateUpdater.java @@ -16,6 +16,7 @@ package com.linkedin.d2.balancer.strategies.relative; +import com.google.common.annotations.VisibleForTesting; import com.linkedin.d2.D2RelativeStrategyProperties; import com.linkedin.d2.balancer.clients.TrackerClient; import com.linkedin.d2.balancer.strategies.PartitionStateUpdateListener; @@ -65,14 +66,27 @@ public class StateUpdater private final ScheduledFuture scheduledFuture; private ConcurrentMap _partitionLoadBalancerStateMap; private int _firstPartitionId = -1; + private final boolean _loadBalanceStreamException; + @Deprecated StateUpdater(D2RelativeStrategyProperties relativeStrategyProperties, QuarantineManager quarantineManager, ScheduledExecutorService executorService, List> listenerFactories, String serviceName) { - this(relativeStrategyProperties, quarantineManager, executorService, new ConcurrentHashMap<>(), listenerFactories, serviceName); + this(relativeStrategyProperties, quarantineManager, executorService, new ConcurrentHashMap<>(), listenerFactories, + serviceName, false); + } + + StateUpdater(D2RelativeStrategyProperties relativeStrategyProperties, + QuarantineManager quarantineManager, + ScheduledExecutorService executorService, + List> listenerFactories, + String serviceName, boolean loadBalanceStreamException) + { + this(relativeStrategyProperties, quarantineManager, executorService, new ConcurrentHashMap<>(), listenerFactories, + serviceName, loadBalanceStreamException); } StateUpdater(D2RelativeStrategyProperties relativeStrategyProperties, @@ -81,6 +95,17 @@ public class StateUpdater ConcurrentMap partitionLoadBalancerStateMap, List> listenerFactories, String serviceName) + { + this(relativeStrategyProperties, quarantineManager, executorService, partitionLoadBalancerStateMap, + listenerFactories, serviceName, false); + } + + StateUpdater(D2RelativeStrategyProperties relativeStrategyProperties, + QuarantineManager quarantineManager, + ScheduledExecutorService executorService, + ConcurrentMap partitionLoadBalancerStateMap, + List> listenerFactories, + String serviceName, boolean loadBalanceStreamException) { _relativeStrategyProperties = relativeStrategyProperties; _quarantineManager = quarantineManager; @@ -91,8 +116,9 @@ public class StateUpdater _serviceName = serviceName; scheduledFuture = executorService.scheduleWithFixedDelay(this::updateState, EXECUTOR_INITIAL_DELAY, - _relativeStrategyProperties.getUpdateIntervalMs(), - TimeUnit.MILLISECONDS); + _relativeStrategyProperties.getUpdateIntervalMs(), + TimeUnit.MILLISECONDS); + _loadBalanceStreamException = loadBalanceStreamException; } /** @@ -434,15 +460,22 @@ private void notifyPartitionStateUpdateListener(PartitionState state) state.getListeners().forEach(listener -> listener.onUpdate(state)); } - private static double getErrorRate(Map errorTypeCounts, int callCount) + @VisibleForTesting + double getErrorRate(Map errorTypeCounts, int callCount) { Integer connectExceptionCount = errorTypeCounts.getOrDefault(ErrorType.CONNECT_EXCEPTION, 0); Integer closedChannelExceptionCount = errorTypeCounts.getOrDefault(ErrorType.CLOSED_CHANNEL_EXCEPTION, 0); Integer serverErrorCount = errorTypeCounts.getOrDefault(ErrorType.SERVER_ERROR, 0); Integer timeoutExceptionCount = errorTypeCounts.getOrDefault(ErrorType.TIMEOUT_EXCEPTION, 0); - return callCount == 0 - ? 0 - : (double) (connectExceptionCount + closedChannelExceptionCount + serverErrorCount + timeoutExceptionCount) / callCount; + Integer streamErrorCount = errorTypeCounts.getOrDefault(ErrorType.STREAM_ERROR, 0); + + double validExceptionCount = connectExceptionCount + closedChannelExceptionCount + serverErrorCount + + timeoutExceptionCount; + if (_loadBalanceStreamException) + { + validExceptionCount += streamErrorCount; + } + return callCount == 0 ? 0 : validExceptionCount / callCount; } private void initializePartition(Set trackerClients, int partitionId, long clusterGenerationId) diff --git a/d2/src/main/java/com/linkedin/d2/balancer/zkfs/ZKFSTogglingLoadBalancerFactoryImpl.java b/d2/src/main/java/com/linkedin/d2/balancer/zkfs/ZKFSTogglingLoadBalancerFactoryImpl.java index b1f32dcf29..9ebe058bb9 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/zkfs/ZKFSTogglingLoadBalancerFactoryImpl.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/zkfs/ZKFSTogglingLoadBalancerFactoryImpl.java @@ -97,6 +97,7 @@ public class ZKFSTogglingLoadBalancerFactoryImpl implements ZKFSLoadBalancer.Tog private final FailoutConfigProviderFactory _failoutConfigProviderFactory; private final ServiceDiscoveryEventEmitter _serviceDiscoveryEventEmitter; private final DualReadStateManager _dualReadStateManager; + private final boolean _loadBalanceStreamException; private static final Logger _log = LoggerFactory.getLogger(ZKFSTogglingLoadBalancerFactoryImpl.class); @@ -362,6 +363,38 @@ public ZKFSTogglingLoadBalancerFactoryImpl(ComponentFactory factory, CanaryDistributionProvider canaryDistributionProvider, ServiceDiscoveryEventEmitter serviceDiscoveryEventEmitter, DualReadStateManager dualReadStateManager) + { + this(factory, timeout, timeoutUnit, baseZKPath, fsBasePath, clientFactories, loadBalancerStrategyFactories, d2ServicePath, + sslContext, sslParameters, isSSLEnabled, clientServicesConfig, useNewEphemeralStoreWatcher, partitionAccessorRegistry, + enableSaveUriDataOnDisk, sslSessionValidatorFactory, d2ClientJmxManager, zookeeperReadWindowMs, + deterministicSubsettingMetadataProvider, failoutConfigProviderFactory, canaryDistributionProvider, + serviceDiscoveryEventEmitter, dualReadStateManager, false); + } + + public ZKFSTogglingLoadBalancerFactoryImpl(ComponentFactory factory, + long timeout, + TimeUnit timeoutUnit, + String baseZKPath, + String fsBasePath, + Map clientFactories, + Map> loadBalancerStrategyFactories, + String d2ServicePath, + SSLContext sslContext, + SSLParameters sslParameters, + boolean isSSLEnabled, + Map> clientServicesConfig, + boolean useNewEphemeralStoreWatcher, + PartitionAccessorRegistry partitionAccessorRegistry, + boolean enableSaveUriDataOnDisk, + SslSessionValidatorFactory sslSessionValidatorFactory, + D2ClientJmxManager d2ClientJmxManager, + int zookeeperReadWindowMs, + DeterministicSubsettingMetadataProvider deterministicSubsettingMetadataProvider, + FailoutConfigProviderFactory failoutConfigProviderFactory, + CanaryDistributionProvider canaryDistributionProvider, + ServiceDiscoveryEventEmitter serviceDiscoveryEventEmitter, + DualReadStateManager dualReadStateManager, + boolean loadBalanceStreamException) { _factory = factory; _lbTimeout = timeout; @@ -386,6 +419,7 @@ public ZKFSTogglingLoadBalancerFactoryImpl(ComponentFactory factory, _canaryDistributionProvider = canaryDistributionProvider; _serviceDiscoveryEventEmitter = serviceDiscoveryEventEmitter; _dualReadStateManager = dualReadStateManager; + _loadBalanceStreamException = loadBalanceStreamException; } @Override @@ -445,9 +479,9 @@ public TogglingLoadBalancer createLoadBalancer(ZKConnection zkConnection, Schedu TogglingPublisher uriToggle = _factory.createUriToggle(zkUriRegistry, fsUriStore, uriBus); SimpleLoadBalancerState state = new SimpleLoadBalancerState( - executorService, uriBus, clusterBus, serviceBus, _clientFactories, _loadBalancerStrategyFactories, - _sslContext, _sslParameters, _isSSLEnabled, _partitionAccessorRegistry, - _sslSessionValidatorFactory, _deterministicSubsettingMetadataProvider, _canaryDistributionProvider); + executorService, uriBus, clusterBus, serviceBus, _clientFactories, _loadBalancerStrategyFactories, _sslContext, + _sslParameters, _isSSLEnabled, _partitionAccessorRegistry, _sslSessionValidatorFactory, + _deterministicSubsettingMetadataProvider, _canaryDistributionProvider, _loadBalanceStreamException); _d2ClientJmxManager.setSimpleLoadBalancerState(state); SimpleLoadBalancer balancer = new SimpleLoadBalancer(state, _lbTimeout, _lbTimeoutUnit, executorService, _failoutConfigProviderFactory); diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsFsTogglingLoadBalancerFactory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsFsTogglingLoadBalancerFactory.java index 7485e507b2..082af88d62 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsFsTogglingLoadBalancerFactory.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsFsTogglingLoadBalancerFactory.java @@ -80,7 +80,9 @@ public class XdsFsTogglingLoadBalancerFactory private final DeterministicSubsettingMetadataProvider _deterministicSubsettingMetadataProvider; private final CanaryDistributionProvider _canaryDistributionProvider; private final FailoutConfigProviderFactory _failoutConfigProviderFactory; + private final boolean _loadBalanceStreamException; + @Deprecated public XdsFsTogglingLoadBalancerFactory(long timeout, TimeUnit timeoutUnit, String fsBasePath, Map clientFactories, Map> loadBalancerStrategyFactories, @@ -89,6 +91,22 @@ public XdsFsTogglingLoadBalancerFactory(long timeout, TimeUnit timeoutUnit, Stri SslSessionValidatorFactory sslSessionValidatorFactory, D2ClientJmxManager d2ClientJmxManager, DeterministicSubsettingMetadataProvider deterministicSubsettingMetadataProvider, FailoutConfigProviderFactory failoutConfigProviderFactory, CanaryDistributionProvider canaryDistributionProvider) + { + this(timeout, timeoutUnit, fsBasePath, clientFactories, loadBalancerStrategyFactories, d2ServicePath, sslContext, + sslParameters, isSSLEnabled, clientServicesConfig, partitionAccessorRegistry, sslSessionValidatorFactory, + d2ClientJmxManager, deterministicSubsettingMetadataProvider, failoutConfigProviderFactory, + canaryDistributionProvider, false); + } + + public XdsFsTogglingLoadBalancerFactory(long timeout, TimeUnit timeoutUnit, String fsBasePath, + Map clientFactories, + Map> loadBalancerStrategyFactories, + String d2ServicePath, SSLContext sslContext, SSLParameters sslParameters, boolean isSSLEnabled, + Map> clientServicesConfig, PartitionAccessorRegistry partitionAccessorRegistry, + SslSessionValidatorFactory sslSessionValidatorFactory, D2ClientJmxManager d2ClientJmxManager, + DeterministicSubsettingMetadataProvider deterministicSubsettingMetadataProvider, + FailoutConfigProviderFactory failoutConfigProviderFactory, CanaryDistributionProvider canaryDistributionProvider, + boolean loadBalanceStreamException) { _lbTimeout = timeout; _lbTimeoutUnit = timeoutUnit; @@ -106,6 +124,7 @@ public XdsFsTogglingLoadBalancerFactory(long timeout, TimeUnit timeoutUnit, Stri _deterministicSubsettingMetadataProvider = deterministicSubsettingMetadataProvider; _failoutConfigProviderFactory = failoutConfigProviderFactory; _canaryDistributionProvider = canaryDistributionProvider; + _loadBalanceStreamException = loadBalanceStreamException; } public TogglingLoadBalancer create(ScheduledExecutorService executorService, XdsToD2PropertiesAdaptor xdsAdaptor) @@ -146,7 +165,8 @@ public TogglingLoadBalancer create(ScheduledExecutorService executorService, Xds SimpleLoadBalancerState state = new SimpleLoadBalancerState(executorService, uriBus, clusterBus, serviceBus, _clientFactories, _loadBalancerStrategyFactories, _sslContext, _sslParameters, _isSSLEnabled, _partitionAccessorRegistry, - _sslSessionValidatorFactory, _deterministicSubsettingMetadataProvider, _canaryDistributionProvider); + _sslSessionValidatorFactory, _deterministicSubsettingMetadataProvider, _canaryDistributionProvider, + _loadBalanceStreamException); _d2ClientJmxManager.setSimpleLoadBalancerState(state); SimpleLoadBalancer balancer = diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java index 4e7f8d614b..294b5b7430 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java @@ -65,12 +65,15 @@ public LoadBalancerWithFacilities create(D2ClientConfig config) XdsToD2PropertiesAdaptor adaptor = new XdsToD2PropertiesAdaptor(xdsClient, config.dualReadStateManager, config.serviceDiscoveryEventEmitter, config.clientServicesConfig); - XdsLoadBalancer xdsLoadBalancer = new XdsLoadBalancer(adaptor, executorService, + XdsLoadBalancer xdsLoadBalancer = new XdsLoadBalancer( + adaptor, + executorService, new XdsFsTogglingLoadBalancerFactory(config.lbWaitTimeout, config.lbWaitUnit, config.indisFsBasePath, config.clientFactories, config.loadBalancerStrategyFactories, config.d2ServicePath, config.sslContext, config.sslParameters, config.isSSLEnabled, config.clientServicesConfig, config.partitionAccessorRegistry, config.sslSessionValidatorFactory, d2ClientJmxManager, config.deterministicSubsettingMetadataProvider, - config.failoutConfigProviderFactory, config.canaryDistributionProvider)); + config.failoutConfigProviderFactory, config.canaryDistributionProvider, config.loadBalanceStreamException) + ); LoadBalancerWithFacilities balancer = xdsLoadBalancer; diff --git a/d2/src/test/java/com/linkedin/d2/balancer/clients/RetryClientTest.java b/d2/src/test/java/com/linkedin/d2/balancer/clients/RetryClientTest.java index aa14a2b3a4..08a8701d57 100644 --- a/d2/src/test/java/com/linkedin/d2/balancer/clients/RetryClientTest.java +++ b/d2/src/test/java/com/linkedin/d2/balancer/clients/RetryClientTest.java @@ -330,7 +330,7 @@ public void testStreamRetryNoAvailableHosts() throws Exception } } - @Test(retryAnalyzer = SingleRetry.class) // Known to be flaky in CI + @Test(retryAnalyzer = ThreeRetries.class) // Known to be flaky in CI public void testRestRetryExceedsClientRetryRatio() throws Exception { SimpleLoadBalancer balancer = prepareLoadBalancer(Arrays.asList("http://test.linkedin.com/retry1", "http://test.linkedin.com/good"), diff --git a/d2/src/test/java/com/linkedin/d2/balancer/strategies/relative/StateUpdaterTest.java b/d2/src/test/java/com/linkedin/d2/balancer/strategies/relative/StateUpdaterTest.java index 7c48e03ecd..7c5c9691aa 100644 --- a/d2/src/test/java/com/linkedin/d2/balancer/strategies/relative/StateUpdaterTest.java +++ b/d2/src/test/java/com/linkedin/d2/balancer/strategies/relative/StateUpdaterTest.java @@ -16,13 +16,16 @@ package com.linkedin.d2.balancer.strategies.relative; +import com.google.common.collect.ImmutableMap; import com.linkedin.d2.D2RelativeStrategyProperties; import com.linkedin.d2.balancer.clients.TrackerClient; import com.linkedin.r2.util.NamedThreadFactory; import com.linkedin.test.util.retry.ThreeRetries; +import com.linkedin.util.degrader.ErrorType; import java.net.URI; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -42,10 +45,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; +import static org.testng.Assert.*; /** @@ -650,6 +650,33 @@ public void testExecutorSchedule() throws InterruptedException { executorService.shutdown(); } + @DataProvider + public Object[][] loadBalanceStreamExceptionDataProvider() + { + return new Object[][] { + { false }, + { true } + }; + } + + @Test(dataProvider = "loadBalanceStreamExceptionDataProvider") + public void testGetErrorRateWithStreamError(Boolean loadBalanceStreamException) + { + Map errorTypeCounts = ImmutableMap.of( + ErrorType.CONNECT_EXCEPTION, 1, + ErrorType.CLOSED_CHANNEL_EXCEPTION, 1, + ErrorType.SERVER_ERROR, 1, + ErrorType.TIMEOUT_EXCEPTION, 1, + ErrorType.STREAM_ERROR, 10 + ); + + StateUpdater stateUpdater = new StateUpdater(new D2RelativeStrategyProperties().setUpdateIntervalMs(5000), + _quarantineManager, _executorService, new ConcurrentHashMap<>(), Collections.emptyList(), SERVICE_NAME, + loadBalanceStreamException); + + assertEquals( stateUpdater.getErrorRate(errorTypeCounts, 20), loadBalanceStreamException ? 0.7 : 0.2); + } + private void runIndividualConcurrentTask(ExecutorService executorService, Runnable runnable, CountDownLatch countDownLatch) { executorService.submit(() -> { diff --git a/d2/src/test/java/com/linkedin/d2/discovery/stores/zk/ZooKeeperEphemeralStoreChildrenDelayedWatcherTest.java b/d2/src/test/java/com/linkedin/d2/discovery/stores/zk/ZooKeeperEphemeralStoreChildrenDelayedWatcherTest.java index d282a4d3bd..de4772b198 100644 --- a/d2/src/test/java/com/linkedin/d2/discovery/stores/zk/ZooKeeperEphemeralStoreChildrenDelayedWatcherTest.java +++ b/d2/src/test/java/com/linkedin/d2/discovery/stores/zk/ZooKeeperEphemeralStoreChildrenDelayedWatcherTest.java @@ -27,6 +27,7 @@ import com.linkedin.test.util.AssertionMethods; import com.linkedin.test.util.ClockedExecutor; +import com.linkedin.test.util.retry.ThreeRetries; import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -235,7 +236,7 @@ public void onSuccess(None result) client.shutdown(); } - @Test(dataProvider = "dataNumOfchildrenToAddToRemoveReadWindow") + @Test(dataProvider = "dataNumOfchildrenToAddToRemoveReadWindow", retryAnalyzer = ThreeRetries.class) public void testChildNodeRemoved(int numberOfAdditionalChildren, int numberOfRemove, int zookeeperReadWindowMs) throws Exception { diff --git a/degrader/build.gradle b/degrader/build.gradle index a03aa06521..50af203a74 100644 --- a/degrader/build.gradle +++ b/degrader/build.gradle @@ -1,4 +1,7 @@ dependencies { compile project(':pegasus-common') testCompile externalDependency.testng + + testImplementation externalDependency.mockito + testImplementation externalDependency.guava } \ No newline at end of file diff --git a/degrader/src/main/java/com/linkedin/util/degrader/DegraderImpl.java b/degrader/src/main/java/com/linkedin/util/degrader/DegraderImpl.java index 08007bc554..f194dd002c 100644 --- a/degrader/src/main/java/com/linkedin/util/degrader/DegraderImpl.java +++ b/degrader/src/main/java/com/linkedin/util/degrader/DegraderImpl.java @@ -513,15 +513,22 @@ protected boolean isLow() * so we don't want to punish the server for exceptions that the server is not responsible for e.g. * bad user input, frameTooLongException, etc. */ - private double getErrorRateToDegrade() + double getErrorRateToDegrade() { Map errorTypeCounts = _callTrackerStats.getErrorTypeCounts(); Integer connectExceptionCount = errorTypeCounts.getOrDefault(ErrorType.CONNECT_EXCEPTION, 0); Integer closedChannelExceptionCount = errorTypeCounts.getOrDefault(ErrorType.CLOSED_CHANNEL_EXCEPTION, 0); Integer serverErrorCount = errorTypeCounts.getOrDefault(ErrorType.SERVER_ERROR, 0); Integer timeoutExceptionCount = errorTypeCounts.getOrDefault(ErrorType.TIMEOUT_EXCEPTION, 0); - return safeDivide(connectExceptionCount + closedChannelExceptionCount + serverErrorCount + timeoutExceptionCount, - _callTrackerStats.getCallCount()); + Integer streamErrorCount = errorTypeCounts.getOrDefault(ErrorType.STREAM_ERROR, 0); + + double validExceptionCount = connectExceptionCount + closedChannelExceptionCount + serverErrorCount + + timeoutExceptionCount; + if (_config.getLoadBalanceStreamException()) + { + validExceptionCount += streamErrorCount; + } + return safeDivide(validExceptionCount, _callTrackerStats.getCallCount()); } private double safeDivide(double numerator, double denominator) @@ -699,6 +706,7 @@ public static class ImmutableConfig protected Logger _logger = DEFAULT_LOGGER; protected double _logThreshold = DEFAULT_LOG_THRESHOLD; protected double _preemptiveRequestTimeoutRate = DEFAULT_PREEMPTIVE_REQUEST_TIMEOUT_RATE; + protected boolean _loadBalanceStreamException = false; public ImmutableConfig() { @@ -729,7 +737,7 @@ public ImmutableConfig(ImmutableConfig config) _slowStartThreshold = config._slowStartThreshold; _logger = config._logger; _preemptiveRequestTimeoutRate = config._preemptiveRequestTimeoutRate; - + _loadBalanceStreamException = config._loadBalanceStreamException; } public String getName() @@ -851,6 +859,11 @@ public double getPreemptiveRequestTimeoutRate() { return _preemptiveRequestTimeoutRate; } + + public boolean getLoadBalanceStreamException() + { + return _loadBalanceStreamException; + } } public static class Config extends ImmutableConfig @@ -984,5 +997,10 @@ public void setPreemptiveRequestTimeoutRate(double preemptiveRequestTimeoutRate) { _preemptiveRequestTimeoutRate = preemptiveRequestTimeoutRate; } + + public void setLoadBalanceStreamException(boolean loadBalanceStreamException) + { + _loadBalanceStreamException = loadBalanceStreamException; + } } } diff --git a/degrader/src/main/java/com/linkedin/util/degrader/ErrorType.java b/degrader/src/main/java/com/linkedin/util/degrader/ErrorType.java index 686dc38515..9c10f25cda 100644 --- a/degrader/src/main/java/com/linkedin/util/degrader/ErrorType.java +++ b/degrader/src/main/java/com/linkedin/util/degrader/ErrorType.java @@ -35,5 +35,10 @@ public enum ErrorType /** * represents a server side error condition */ - SERVER_ERROR + SERVER_ERROR, + + /** + * represents an http2 stream error + */ + STREAM_ERROR } diff --git a/degrader/src/test/java/com/linkedin/util/degrader/DegraderImplTest.java b/degrader/src/test/java/com/linkedin/util/degrader/DegraderImplTest.java new file mode 100644 index 0000000000..29ab61fa3b --- /dev/null +++ b/degrader/src/test/java/com/linkedin/util/degrader/DegraderImplTest.java @@ -0,0 +1,72 @@ +package com.linkedin.util.degrader; + +import com.google.common.collect.ImmutableMap; +import com.linkedin.util.clock.SystemClock; +import java.util.Map; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.slf4j.LoggerFactory; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.*; +import static org.testng.Assert.*; + + +public class DegraderImplTest +{ + @DataProvider + public Object[][] loadBalanceStreamExceptionDataProvider() + { + return new Object[][] { + { false }, + { true } + }; + } + + @Test(dataProvider = "loadBalanceStreamExceptionDataProvider") + public void testGetErrorRateToDegrade(Boolean loadBalancerStreamException) + { + Map errorTypeCounts = ImmutableMap.of( + ErrorType.CONNECT_EXCEPTION, 1, + ErrorType.CLOSED_CHANNEL_EXCEPTION, 1, + ErrorType.SERVER_ERROR, 1, + ErrorType.TIMEOUT_EXCEPTION, 1, + ErrorType.STREAM_ERROR, 1 + ); + DegraderImpl degrader = new DegraderImplTestFixture().getDegraderImpl(loadBalancerStreamException, errorTypeCounts, + 10); + assertEquals(degrader.getErrorRateToDegrade(), loadBalancerStreamException ? 0.5 : 0.4); + } + + private static final class DegraderImplTestFixture + { + @Mock + CallTracker _callTracker; + @Mock + CallTracker.CallStats _callStats; + + DegraderImplTestFixture() + { + MockitoAnnotations.initMocks(this); + doReturn(_callStats).when(_callTracker).getCallStats(); + doNothing().when(_callTracker).addStatsRolloverEventListener(any()); + } + + DegraderImpl getDegraderImpl(boolean loadBalancerStreamException, Map errorTypeCounts, int callCount) + { + when(_callStats.getErrorTypeCounts()).thenReturn(errorTypeCounts); + when(_callStats.getCallCount()).thenReturn(callCount); + + DegraderImpl.Config config = new DegraderImpl.Config(); + config.setName("DegraderImplTest"); + config.setClock(SystemClock.instance()); + config.setCallTracker(_callTracker); + config.setMaxDropDuration(1); + config.setInitialDropRate(0.01); + config.setLogger(LoggerFactory.getLogger(DegraderImplTest.class)); + config.setLoadBalanceStreamException(loadBalancerStreamException); + return new DegraderImpl(config); + } + } +}