Skip to content

Commit

Permalink
Handle HTTP/2 stream errors in tracker client and LB state updator (#…
Browse files Browse the repository at this point in the history
…1000)

* Handle HTTP/2 stream errors in tracker client and LB state updator

* Add UTs

* update changelog

* add to degrader load balancer, put behind config, add and fix tests

* code style

* revert un-needed change in test helper

* add TODO for optimizing config management

---------

Co-authored-by: Bohan Yang <[email protected]>
  • Loading branch information
shivamgupta1 and bohhyang authored May 30, 2024
1 parent 63ba831 commit 82450f7
Show file tree
Hide file tree
Showing 21 changed files with 341 additions and 41 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.55.0] - 2024-05-14
- degrade hosts for HTTP/2 stream errors in Degrader and Relative LB.

## [29.55.0] - 2024-05-23
- Allow HttpBridge to return RetriableRequestException for the Netty max active stream error

Expand Down
11 changes: 9 additions & 2 deletions d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ public D2Client build()
_config.xdsChannelLoadBalancingPolicy,
_config.xdsChannelLoadBalancingPolicyConfig,
_config.subscribeToUriGlobCollection,
_config._xdsServerMetricsProvider
_config._xdsServerMetricsProvider,
_config.loadBalanceStreamException
);

final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ?
Expand Down Expand Up @@ -724,6 +725,11 @@ public D2ClientBuilder setXdsServerMetricsProvider(XdsServerMetricsProvider xdsS
return this;
}

public D2ClientBuilder setLoadBalanceStreamException(boolean loadBalanceStreamException) {
_config.loadBalanceStreamException = loadBalanceStreamException;
return this;
}

private Map<String, TransportClientFactory> createDefaultTransportClientFactories()
{
final Map<String, TransportClientFactory> clientFactories = new HashMap<>();
Expand Down Expand Up @@ -756,9 +762,10 @@ private Map<String, LoadBalancerStrategyFactory<?>> createDefaultLoadBalancerStr

if (_config.enableRelativeLoadBalancer)
{
// TODO: create StateUpdater.LoadBalanceConfig and pass it to the RelativeLoadBalancerStrategyFactory
final RelativeLoadBalancerStrategyFactory relativeLoadBalancerStrategyFactory = new RelativeLoadBalancerStrategyFactory(
_config._executorService, _config.healthCheckOperations, Collections.emptyList(), _config.eventEmitter,
SystemClock.instance());
SystemClock.instance(), _config.loadBalanceStreamException);
loadBalancerStrategyFactories.putIfAbsent(RelativeLoadBalancerStrategy.RELATIVE_LOAD_BALANCER_STRATEGY_NAME,
relativeLoadBalancerStrategyFactory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public class D2ClientConfig
public Map<String, ?> xdsChannelLoadBalancingPolicyConfig = null;
public boolean subscribeToUriGlobCollection = false;
public XdsServerMetricsProvider _xdsServerMetricsProvider = new NoOpXdsServerMetricsProvider();
public boolean loadBalanceStreamException = false;

public D2ClientConfig()
{
Expand Down Expand Up @@ -214,7 +215,8 @@ public D2ClientConfig()
String xdsChannelLoadBalancingPolicy,
Map<String, ?> xdsChannelLoadBalancingPolicyConfig,
boolean subscribeToUriGlobCollection,
XdsServerMetricsProvider xdsServerMetricsProvider
XdsServerMetricsProvider xdsServerMetricsProvider,
boolean loadBalanceStreamException
)
{
this.zkHosts = zkHosts;
Expand Down Expand Up @@ -289,5 +291,6 @@ public D2ClientConfig()
this.xdsChannelLoadBalancingPolicyConfig = xdsChannelLoadBalancingPolicyConfig;
this.subscribeToUriGlobCollection = subscribeToUriGlobCollection;
this._xdsServerMetricsProvider = xdsServerMetricsProvider;
this.loadBalanceStreamException = loadBalanceStreamException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public LoadBalancerWithFacilities create(D2ClientConfig config)
SimpleLoadBalancerState state = new SimpleLoadBalancerState(
config._executorService, uriBus, clusterBus, serviceBus, config.clientFactories, config.loadBalancerStrategyFactories,
config.sslContext, config.sslParameters, config.isSSLEnabled, config.partitionAccessorRegistry,
config.sslSessionValidatorFactory, config.deterministicSubsettingMetadataProvider, config.canaryDistributionProvider);
config.sslSessionValidatorFactory, config.deterministicSubsettingMetadataProvider, config.canaryDistributionProvider,
config.loadBalanceStreamException);
d2ClientJmxManager.setSimpleLoadBalancerState(state);

SimpleLoadBalancer simpleLoadBalancer = new SimpleLoadBalancer(state, config.lbWaitTimeout, config.lbWaitUnit, config._executorService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ private ZKFSLoadBalancer.TogglingLoadBalancerFactory createLoadBalancerFactory(D
config.failoutConfigProviderFactory,
config.canaryDistributionProvider,
config.serviceDiscoveryEventEmitter,
config.dualReadStateManager
config.dualReadStateManager,
config.loadBalanceStreamException
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class DegraderTrackerClientImpl extends TrackerClientImpl implements Degr

private final Map<Integer, PartitionState> _partitionStates;

@Deprecated
public DegraderTrackerClientImpl(URI uri, Map<Integer, PartitionData> partitionDataMap, TransportClient wrappedClient)
{
this(uri, partitionDataMap, wrappedClient, SystemClock.instance(), null,
Expand All @@ -61,6 +62,13 @@ public DegraderTrackerClientImpl(URI uri, Map<Integer, PartitionData> partitionD
public DegraderTrackerClientImpl(URI uri, Map<Integer, PartitionData> partitionDataMap, TransportClient wrappedClient,
Clock clock, DegraderImpl.Config config, long interval, Pattern errorStatusPattern,
boolean doNotSlowStart)
{
this(uri, partitionDataMap, wrappedClient, clock, config, interval, errorStatusPattern, doNotSlowStart, false);
}

public DegraderTrackerClientImpl(URI uri, Map<Integer, PartitionData> partitionDataMap, TransportClient wrappedClient,
Clock clock, DegraderImpl.Config config, long interval, Pattern errorStatusPattern,
boolean doNotSlowStart, boolean loadBalanceStreamException)
{
super(uri, partitionDataMap, wrappedClient, clock, interval,
(status) -> errorStatusPattern.matcher(Integer.toString(status)).matches(), true, doNotSlowStart, false);
Expand All @@ -79,6 +87,7 @@ public DegraderTrackerClientImpl(URI uri, Map<Integer, PartitionData> partitionD
{
config.setInitialDropRate(DegraderImpl.DEFAULT_DO_NOT_SLOW_START_INITIAL_DROP_RATE);
}
config.setLoadBalanceStreamException(loadBalanceStreamException);

/* TrackerClient contains state for each partition, but they actually share the same DegraderImpl
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ public class TrackerClientFactory

private static final int LOG_RATE_MS = 20000;

/**
* @see #createTrackerClient(URI, UriProperties, ServiceProperties, String, TransportClient, Clock)
*/
@Deprecated
public static TrackerClient createTrackerClient(URI uri,
UriProperties uriProperties,
ServiceProperties serviceProperties,
Expand All @@ -66,6 +64,17 @@ public static TrackerClient createTrackerClient(URI uri,
return createTrackerClient(uri, uriProperties, serviceProperties, loadBalancerStrategyName, transportClient, SystemClock.instance());
}

public static TrackerClient createTrackerClient(URI uri,
UriProperties uriProperties,
ServiceProperties serviceProperties,
String loadBalancerStrategyName,
TransportClient transportClient,
boolean loadBalanceStreamException)
{
return createTrackerClient(uri, uriProperties, serviceProperties, loadBalancerStrategyName, transportClient,
SystemClock.instance(), loadBalanceStreamException);
}

/**
* Creates a {@link TrackerClient}.
*
Expand All @@ -77,12 +86,25 @@ public static TrackerClient createTrackerClient(URI uri,
* @param clock Clock used for internal call tracking.
* @return TrackerClient
*/
@Deprecated
public static TrackerClient createTrackerClient(URI uri,
UriProperties uriProperties,
ServiceProperties serviceProperties,
String loadBalancerStrategyName,
TransportClient transportClient,
Clock clock)
{
return createTrackerClient(uri, uriProperties, serviceProperties, loadBalancerStrategyName, transportClient, clock,
false);
}

public static TrackerClient createTrackerClient(URI uri,
UriProperties uriProperties,
ServiceProperties serviceProperties,
String loadBalancerStrategyName,
TransportClient transportClient,
Clock clock,
boolean loadBalanceStreamException)
{
TrackerClient trackerClient;

Expand All @@ -104,7 +126,8 @@ public static TrackerClient createTrackerClient(URI uri,
switch (loadBalancerStrategyName)
{
case (DegraderLoadBalancerStrategyV3.DEGRADER_STRATEGY_NAME):
trackerClient = createDegraderTrackerClient(uri, uriProperties, serviceProperties, loadBalancerStrategyName, transportClient, clock, doNotSlowStart);
trackerClient = createDegraderTrackerClient(uri, uriProperties, serviceProperties, loadBalancerStrategyName,
transportClient, clock, doNotSlowStart, loadBalanceStreamException);
break;
case (RelativeLoadBalancerStrategy.RELATIVE_LOAD_BALANCER_STRATEGY_NAME):
trackerClient = createTrackerClientImpl(uri, uriProperties, serviceProperties, loadBalancerStrategyName,
Expand All @@ -124,7 +147,8 @@ private static DegraderTrackerClient createDegraderTrackerClient(URI uri,
String loadBalancerStrategyName,
TransportClient transportClient,
Clock clock,
boolean doNotSlowStart)
boolean doNotSlowStart,
boolean loadBalanceStreamException)
{
DegraderImpl.Config config = null;

Expand All @@ -151,7 +175,8 @@ private static DegraderTrackerClient createDegraderTrackerClient(URI uri,
config,
trackerClientInterval,
errorStatusPattern,
doNotSlowStart);
doNotSlowStart,
loadBalanceStreamException);
}

private static long getInterval(String loadBalancerStrategyName, ServiceProperties serviceProperties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.linkedin.util.degrader.CallTrackerImpl;
import com.linkedin.util.degrader.ErrorType;

import io.netty.handler.codec.http2.Http2Exception;
import java.net.ConnectException;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
Expand Down Expand Up @@ -311,6 +312,10 @@ else if (originalThrowable instanceof TimeoutException)
{
callCompletion.endCallWithError(ErrorType.TIMEOUT_EXCEPTION);
}
else if (originalThrowable instanceof Http2Exception.StreamException)
{
callCompletion.endCallWithError(ErrorType.STREAM_ERROR);
}
else
{
callCompletion.endCallWithError(ErrorType.REMOTE_INVOCATION_EXCEPTION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public class SimpleLoadBalancerState implements LoadBalancerState, ClientFactory
private final SslSessionValidatorFactory _sslSessionValidatorFactory;
private final SubsettingState _subsettingState;
private final CanaryDistributionProvider _canaryDistributionProvider;
private final boolean _loadBalanceStreamException;

/*
* Concurrency considerations:
Expand Down Expand Up @@ -314,6 +315,26 @@ public SimpleLoadBalancerState(ScheduledExecutorService executorService,
SslSessionValidatorFactory sessionValidatorFactory,
DeterministicSubsettingMetadataProvider deterministicSubsettingMetadataProvider,
CanaryDistributionProvider canaryDistributionProvider)
{
this(executorService, uriBus, clusterBus, serviceBus, clientFactories, loadBalancerStrategyFactories, sslContext,
sslParameters, isSSLEnabled, partitionAccessorRegistry, sessionValidatorFactory,
deterministicSubsettingMetadataProvider, canaryDistributionProvider, false);
}

public SimpleLoadBalancerState(ScheduledExecutorService executorService,
PropertyEventBus<UriProperties> uriBus,
PropertyEventBus<ClusterProperties> clusterBus,
PropertyEventBus<ServiceProperties> serviceBus,
Map<String, TransportClientFactory> clientFactories,
Map<String, LoadBalancerStrategyFactory<? extends LoadBalancerStrategy>> loadBalancerStrategyFactories,
SSLContext sslContext,
SSLParameters sslParameters,
boolean isSSLEnabled,
PartitionAccessorRegistry partitionAccessorRegistry,
SslSessionValidatorFactory sessionValidatorFactory,
DeterministicSubsettingMetadataProvider deterministicSubsettingMetadataProvider,
CanaryDistributionProvider canaryDistributionProvider,
boolean loadBalanceStreamException)
{
_executor = executorService;
_uriProperties = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -349,6 +370,7 @@ public SimpleLoadBalancerState(ScheduledExecutorService executorService,
_subsettingState = null;
}
_canaryDistributionProvider = canaryDistributionProvider;
_loadBalanceStreamException = loadBalanceStreamException;
}

public void register(final SimpleLoadBalancerStateListener listener)
Expand Down Expand Up @@ -952,11 +974,9 @@ private TrackerClient buildTrackerClient(URI uri, UriProperties uriProperties, S
return null;
}

return serviceProperties == null ? null : TrackerClientFactory.createTrackerClient(uri,
uriProperties,
serviceProperties,
loadBalancerStrategy.getName(),
transportClient);
//TODO: create TrackerClient.LoadBalanceConfig and pass it into createTrackerClient method
return serviceProperties == null ? null : TrackerClientFactory.createTrackerClient(uri, uriProperties,
serviceProperties, loadBalancerStrategy.getName(), transportClient, _loadBalanceStreamException);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,24 @@ public class RelativeLoadBalancerStrategyFactory implements LoadBalancerStrategy
private final List<PartitionStateUpdateListener.Factory<PartitionState>> _stateListenerFactories;
private final EventEmitter _eventEmitter;
private final Clock _clock;
private final boolean _loadBalanceStreamException;

public RelativeLoadBalancerStrategyFactory(ScheduledExecutorService executorService, HealthCheckOperations healthCheckOperations,
List<PartitionStateUpdateListener.Factory<PartitionState>> stateListenerFactories, EventEmitter eventEmitter, Clock clock)
{
this(executorService, healthCheckOperations, stateListenerFactories, eventEmitter, clock, false);
}

public RelativeLoadBalancerStrategyFactory(ScheduledExecutorService executorService, HealthCheckOperations healthCheckOperations,
List<PartitionStateUpdateListener.Factory<PartitionState>> stateListenerFactories, EventEmitter eventEmitter, Clock clock,
boolean loadBalanceStreamException)
{
_executorService = executorService;
_healthCheckOperations = healthCheckOperations;
_stateListenerFactories = stateListenerFactories;
_eventEmitter = (eventEmitter == null) ? new NoopEventEmitter() : eventEmitter;
_clock = clock;
_loadBalanceStreamException = loadBalanceStreamException;
}


Expand Down Expand Up @@ -112,7 +121,8 @@ private StateUpdater getRelativeStateUpdater(D2RelativeStrategyProperties relati
{
listenerFactories.addAll(_stateListenerFactories);
}
return new StateUpdater(relativeStrategyProperties, quarantineManager, _executorService, listenerFactories, serviceName);
return new StateUpdater(relativeStrategyProperties, quarantineManager, _executorService, listenerFactories,
serviceName, _loadBalanceStreamException);
}

private ClientSelector getClientSelector(D2RelativeStrategyProperties relativeStrategyProperties)
Expand Down
Loading

0 comments on commit 82450f7

Please sign in to comment.