diff --git a/CHANGELOG.md b/CHANGELOG.md index a470ab134b..8dfb4bebaa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,10 @@ When updating the changelog, remember to be very clear about what behavior has c and what APIs have changed, if applicable. ## [Unreleased] + +## [29.49.4] - 2024-01-04 +- Make warm-up to respect dual read mode, and separate warmup configs for indis. + ## [29.49.3] - 2024-01-03 - Fix rate limiter for dual-read mode switch @@ -5602,7 +5606,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.49.3...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.49.4...master +[29.49.4]: https://github.com/linkedin/rest.li/compare/v29.49.3...v29.49.4 [29.49.3]: https://github.com/linkedin/rest.li/compare/v29.49.2...v29.49.3 [29.49.2]: https://github.com/linkedin/rest.li/compare/v29.49.1...v29.49.2 [29.49.1]: https://github.com/linkedin/rest.li/compare/v29.49.0...v29.49.1 diff --git a/d2-test-api/src/main/java/com/linkedin/d2/balancer/util/TestLoadBalancer.java b/d2-test-api/src/main/java/com/linkedin/d2/balancer/util/TestLoadBalancer.java index 5f623c5305..8b2974de23 100644 --- a/d2-test-api/src/main/java/com/linkedin/d2/balancer/util/TestLoadBalancer.java +++ b/d2-test-api/src/main/java/com/linkedin/d2/balancer/util/TestLoadBalancer.java @@ -47,15 +47,23 @@ public class TestLoadBalancer implements LoadBalancerWithFacilities, WarmUpServi private final AtomicInteger _requestCount = new AtomicInteger(); private final AtomicInteger _completedRequestCount = new AtomicInteger(); - private int _delayMs = 0; - private final int DELAY_STANDARD_DEVIATION = 10; //ms + private int _warmUpDelayMs = 0; + private int _serviceDataDelayMs = 0; + + private final int DELAY_STANDARD_DEVIATION = 5; //ms private final ScheduledExecutorService _executorService = Executors.newSingleThreadScheduledExecutor(); public TestLoadBalancer() {} - public TestLoadBalancer(int delayMs) + public TestLoadBalancer(int warmUpDelayMs) { - _delayMs = delayMs; + this(warmUpDelayMs, 0); + } + + public TestLoadBalancer(int warmUpDelayMs, int serviceDataDelayMs) + { + _warmUpDelayMs = warmUpDelayMs; + _serviceDataDelayMs = serviceDataDelayMs; } @Override @@ -67,14 +75,15 @@ public void getClient(Request request, RequestContext requestContext, Callback callback) { + double g = Math.min(1.0, Math.max(-1.0, new Random().nextGaussian())); + int actualDelay = Math.max(0, + _warmUpDelayMs + ((int) g * DELAY_STANDARD_DEVIATION)); // +/- DELAY_STANDARD_DEVIATION ms _requestCount.incrementAndGet(); _executorService.schedule(() -> { _completedRequestCount.incrementAndGet(); callback.onSuccess(None.none()); - }, Math.max(0, _delayMs - // any kind of random delay works for the test - + ((int) new Random().nextGaussian() * DELAY_STANDARD_DEVIATION)), TimeUnit.MILLISECONDS); + }, actualDelay, TimeUnit.MILLISECONDS); } @Override @@ -92,6 +101,14 @@ public void shutdown(PropertyEventShutdownCallback shutdown) @Override public void getLoadBalancedServiceProperties(String serviceName, Callback clientCallback) { + if (_serviceDataDelayMs > 0) + { + try { + Thread.sleep(_serviceDataDelayMs); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } clientCallback.onSuccess(new ServiceProperties(serviceName, "clustername", "/foo", Arrays.asList("rr"))); } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java index d3183aea39..2f07968717 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java @@ -178,7 +178,9 @@ public D2Client build() _config.retryAggregatedIntervalNum, _config.warmUp, _config.warmUpTimeoutSeconds, + _config.indisWarmUpTimeoutSeconds, _config.warmUpConcurrentRequests, + _config.indisWarmUpConcurrentRequests, _config.downstreamServicesFetcher, _config.indisDownstreamServicesFetcher, _config.backupRequestsEnabled, @@ -526,21 +528,36 @@ public D2ClientBuilder setWarmUp(boolean warmUp){ return this; } - public D2ClientBuilder setWarmUpTimeoutSeconds(int warmUpTimeoutSeconds){ + public D2ClientBuilder setWarmUpTimeoutSeconds(int warmUpTimeoutSeconds) + { _config.warmUpTimeoutSeconds = warmUpTimeoutSeconds; return this; } - public D2ClientBuilder setZookeeperReadWindowMs(int zookeeperReadWindowMs){ + public D2ClientBuilder setIndisWarmUpTimeoutSeconds(int indisWarmUpTimeoutSeconds) + { + _config.indisWarmUpTimeoutSeconds = indisWarmUpTimeoutSeconds; + return this; + } + + public D2ClientBuilder setZookeeperReadWindowMs(int zookeeperReadWindowMs) + { _config.zookeeperReadWindowMs = zookeeperReadWindowMs; return this; } - public D2ClientBuilder setWarmUpConcurrentRequests(int warmUpConcurrentRequests){ + public D2ClientBuilder setWarmUpConcurrentRequests(int warmUpConcurrentRequests) + { _config.warmUpConcurrentRequests = warmUpConcurrentRequests; return this; } + public D2ClientBuilder setIndisWarmUpConcurrentRequests(int indisWarmUpConcurrentRequests) + { + _config.indisWarmUpConcurrentRequests = indisWarmUpConcurrentRequests; + return this; + } + public D2ClientBuilder setStartUpExecutorService(ScheduledExecutorService executorService) { _config.startUpExecutorService = executorService; diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java index ae14d92ee7..103ac7f6a3 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java @@ -96,8 +96,10 @@ public class D2ClientConfig int retryAggregatedIntervalNum = RetryClient.DEFAULT_AGGREGATED_INTERVAL_NUM; public boolean warmUp = true; public int warmUpTimeoutSeconds = WarmUpLoadBalancer.DEFAULT_SEND_REQUESTS_TIMEOUT_SECONDS; + public int indisWarmUpTimeoutSeconds = WarmUpLoadBalancer.DEFAULT_SEND_REQUESTS_TIMEOUT_SECONDS; int zookeeperReadWindowMs = ZooKeeperStore.DEFAULT_READ_WINDOW_MS; public int warmUpConcurrentRequests = WarmUpLoadBalancer.DEFAULT_CONCURRENT_REQUESTS; + public int indisWarmUpConcurrentRequests = WarmUpLoadBalancer.DEFAULT_CONCURRENT_REQUESTS; public DownstreamServicesFetcher downstreamServicesFetcher = null; public DownstreamServicesFetcher indisDownstreamServicesFetcher = null; boolean backupRequestsEnabled = true; @@ -168,7 +170,9 @@ public D2ClientConfig() int retryAggregatedIntervalNum, boolean warmUp, int warmUpTimeoutSeconds, + int indisWarmUpTimeoutSeconds, int warmUpConcurrentRequests, + int indisWarmUpConcurrentRequests, DownstreamServicesFetcher downstreamServicesFetcher, DownstreamServicesFetcher indisDownstreamServicesFetcher, boolean backupRequestsEnabled, @@ -235,7 +239,9 @@ public D2ClientConfig() this.retryAggregatedIntervalNum = retryAggregatedIntervalNum; this.warmUp = warmUp; this.warmUpTimeoutSeconds = warmUpTimeoutSeconds; + this.indisWarmUpTimeoutSeconds = indisWarmUpTimeoutSeconds; this.warmUpConcurrentRequests = warmUpConcurrentRequests; + this.indisWarmUpConcurrentRequests = indisWarmUpConcurrentRequests; this.downstreamServicesFetcher = downstreamServicesFetcher; this.indisDownstreamServicesFetcher = indisDownstreamServicesFetcher; this.backupRequestsEnabled = backupRequestsEnabled; diff --git a/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java index 92e6badba8..7df9629ef6 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java @@ -117,7 +117,7 @@ public LoadBalancerWithFacilities create(D2ClientConfig config) { balancer = new WarmUpLoadBalancer(balancer, lastSeenLoadBalancer, config.startUpExecutorService, config.fsBasePath, config.d2ServicePath, config.downstreamServicesFetcher, config.warmUpTimeoutSeconds, - config.warmUpConcurrentRequests, config.dualReadStateManager); + config.warmUpConcurrentRequests, config.dualReadStateManager, false); } return balancer; diff --git a/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java index 7b666c5bf3..2f296b49f7 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java @@ -55,7 +55,7 @@ public LoadBalancerWithFacilities create(D2ClientConfig config) { balancer = new WarmUpLoadBalancer(balancer, zkfsLoadBalancer, config.startUpExecutorService, config.fsBasePath, config.d2ServicePath, config.downstreamServicesFetcher, config.warmUpTimeoutSeconds, - config.warmUpConcurrentRequests, config.dualReadStateManager); + config.warmUpConcurrentRequests, config.dualReadStateManager, false); } return balancer; } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java index 0dfbc5e22d..b704c62467 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancer.java @@ -95,28 +95,50 @@ public DualReadLoadBalancer(LoadBalancerWithFacilities oldLb, LoadBalancerWithFa public void start(Callback callback) { // Prefetch the global dual read mode - _dualReadStateManager.checkAndSwitchMode(null); + DualReadModeProvider.DualReadMode mode = _dualReadStateManager.getGlobalDualReadMode(); - _newLb.start(new Callback() - { + // if in new-lb-only mode, new lb needs to start successfully to call the callback. Otherwise, the old lb does. + // Use a separate executor service to start the new lb, so both lbs can start concurrently. + _newLbExecutor.execute(() -> _newLb.start(getStartUpCallback(true, + mode == DualReadModeProvider.DualReadMode.NEW_LB_ONLY ? callback : null) + )); + + _oldLb.start(getStartUpCallback(false, + mode == DualReadModeProvider.DualReadMode.NEW_LB_ONLY ? null : callback + )); + } + + private Callback getStartUpCallback(boolean isForNewLb, Callback callback) + { + return new Callback() { @Override - public void onError(Throwable e) - { - LOG.warn("Failed to start new load balancer. Fall back to read from old balancer only", e); - _isNewLbReady = false; + public void onError(Throwable e) { + LOG.warn("Failed to start {} load balancer.", isForNewLb ? "new" : "old", e); + if (isForNewLb) + { + _isNewLbReady = false; + } + + if (callback != null) + { + callback.onError(e); + } } @Override - public void onSuccess(None result) - { - LOG.info("New load balancer successfully started"); - _isNewLbReady = true; - } - }); + public void onSuccess(None result) { + LOG.info("{} load balancer successfully started", isForNewLb ? "New" : "Old"); + if (isForNewLb) + { + _isNewLbReady = true; + } - // Call back will succeed as long as the old balancer is successfully started. New load balancer failure - // won't block application start up. - _oldLb.start(callback); + if (callback != null) + { + callback.onSuccess(None.none()); + } + } + }; } @Override diff --git a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerMonitor.java b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerMonitor.java index 860e5804b0..6fc3d62cae 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerMonitor.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerMonitor.java @@ -69,6 +69,24 @@ public DualReadLoadBalancerMonitor(Clock clock) public void reportData(String propertyName, T property, String propertyVersion, boolean fromNewLb) { Cache> cacheToAdd = fromNewLb ? _newLbPropertyCache : _oldLbPropertyCache; + CacheEntry existingEntry = cacheToAdd.getIfPresent(propertyName); + if (existingEntry != null) + { + if (existingEntry._version.equals(propertyVersion) && existingEntry._data.equals(property)) + { + _rateLimitedLogger.debug("Reported duplicate for {} LB for property: {}, version: {}, data: {}", + fromNewLb ? "New" : "Old", propertyName, propertyVersion, property); + return; // skip setting duplicate data to avoid incorrectly incrementing OutOfSync metric + } + else if (existingEntry._data.equals(property)) + { + _rateLimitedLogger.warn("Reported data that only differs in version for {} LB for property: {}. " + + "Old version: {}, New version: {}, with the same data: {}", fromNewLb ? "New" : "Old", propertyName, + existingEntry._version, propertyVersion, existingEntry._data); + // since the version is different, we don't skipping setting it to the cache + } + } + Cache> cacheToCompare = fromNewLb ? _oldLbPropertyCache : _newLbPropertyCache; CacheEntry entryToCompare = cacheToCompare.getIfPresent(propertyName); diff --git a/d2/src/main/java/com/linkedin/d2/balancer/util/WarmUpLoadBalancer.java b/d2/src/main/java/com/linkedin/d2/balancer/util/WarmUpLoadBalancer.java index a1c52816e6..0c2e905fd1 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/util/WarmUpLoadBalancer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/util/WarmUpLoadBalancer.java @@ -16,6 +16,7 @@ package com.linkedin.d2.balancer.util; +import com.google.common.annotations.VisibleForTesting; import com.linkedin.common.callback.Callback; import com.linkedin.common.util.None; import com.linkedin.d2.balancer.LoadBalancerWithFacilities; @@ -41,7 +42,11 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.IntStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,8 +57,7 @@ * * @author Francesco Capponi (fcapponi@linkedin.com) */ -public class WarmUpLoadBalancer extends LoadBalancerWithFacilitiesDelegator -{ +public class WarmUpLoadBalancer extends LoadBalancerWithFacilitiesDelegator { private static final Logger LOG = LoggerFactory.getLogger(WarmUpLoadBalancer.class); /** @@ -67,12 +71,17 @@ public class WarmUpLoadBalancer extends LoadBalancerWithFacilitiesDelegator private WarmUpService _serviceWarmupper; private final String _d2FsDirPath; private final String _d2ServicePath; - private final int _warmUpTimeoutSeconds; + private final int _warmUpTimeoutMillis; private final int _concurrentRequests; private final ScheduledExecutorService _executorService; private final DownstreamServicesFetcher _downstreamServicesFetcher; private final DualReadStateManager _dualReadStateManager; + private final boolean _isIndis; // whether warming up for Indis (false means warming up for ZK) + private final String _printName; // name of this warmup load balancer based on it's indis or not. private volatile boolean _shuttingDown = false; + private long _allStartTime; + private List _servicesToWarmUp = null; + private Supplier _timeSupplier = () -> SystemClock.instance().currentTimeMillis(); /** * Since the list might from the fetcher might not be complete (new behavior, old data, etc..), and the user might @@ -81,17 +90,26 @@ public class WarmUpLoadBalancer extends LoadBalancerWithFacilitiesDelegator */ private final Set _usedServices; - public WarmUpLoadBalancer(LoadBalancerWithFacilities balancer, WarmUpService serviceWarmupper, ScheduledExecutorService executorService, - String d2FsDirPath, String d2ServicePath, DownstreamServicesFetcher downstreamServicesFetcher, - int warmUpTimeoutSeconds, int concurrentRequests) - { + public WarmUpLoadBalancer(LoadBalancerWithFacilities balancer, WarmUpService serviceWarmupper, + ScheduledExecutorService executorService, String d2FsDirPath, String d2ServicePath, + DownstreamServicesFetcher downstreamServicesFetcher, int warmUpTimeoutSeconds, int concurrentRequests) { + this(balancer, serviceWarmupper, executorService, d2FsDirPath, d2ServicePath, downstreamServicesFetcher, + warmUpTimeoutSeconds, concurrentRequests, null, false); + } + + public WarmUpLoadBalancer(LoadBalancerWithFacilities balancer, WarmUpService serviceWarmupper, + ScheduledExecutorService executorService, String d2FsDirPath, String d2ServicePath, + DownstreamServicesFetcher downstreamServicesFetcher, int warmUpTimeoutSeconds, int concurrentRequests, + DualReadStateManager dualReadStateManager, boolean isIndis) { this(balancer, serviceWarmupper, executorService, d2FsDirPath, d2ServicePath, downstreamServicesFetcher, - warmUpTimeoutSeconds, concurrentRequests, null); + warmUpTimeoutSeconds * 1000, concurrentRequests, dualReadStateManager, isIndis, null); } - public WarmUpLoadBalancer(LoadBalancerWithFacilities balancer, WarmUpService serviceWarmupper, ScheduledExecutorService executorService, - String d2FsDirPath, String d2ServicePath, DownstreamServicesFetcher downstreamServicesFetcher, - int warmUpTimeoutSeconds, int concurrentRequests, DualReadStateManager dualReadStateManager) + @VisibleForTesting + WarmUpLoadBalancer(LoadBalancerWithFacilities balancer, WarmUpService serviceWarmupper, + ScheduledExecutorService executorService, String d2FsDirPath, String d2ServicePath, + DownstreamServicesFetcher downstreamServicesFetcher, int warmUpTimeoutMillis, int concurrentRequests, + DualReadStateManager dualReadStateManager, boolean isIndis, Supplier timeSupplierForTest) { super(balancer); _serviceWarmupper = serviceWarmupper; @@ -99,110 +117,197 @@ public WarmUpLoadBalancer(LoadBalancerWithFacilities balancer, WarmUpService ser _d2FsDirPath = d2FsDirPath; _d2ServicePath = d2ServicePath; _downstreamServicesFetcher = downstreamServicesFetcher; - _warmUpTimeoutSeconds = warmUpTimeoutSeconds; + _warmUpTimeoutMillis = warmUpTimeoutMillis; _concurrentRequests = concurrentRequests; _outstandingRequests = new ConcurrentLinkedDeque<>(); _usedServices = new HashSet<>(); _dualReadStateManager = dualReadStateManager; + _isIndis = isIndis; + _printName = String.format("%s WarmUp", _isIndis ? "xDS" : "ZK"); + if (timeSupplierForTest != null) + { + _timeSupplier = timeSupplierForTest; + } } @Override - public void start(Callback callback) - { - LOG.info("D2 WarmUp enabled"); - _loadBalancer.start(new Callback() - { + public void start(Callback callback) { + LOG.info("{} enabled", _printName); + + Callback prepareWarmUpCallback = new Callback() { @Override - public void onError(Throwable e) - { - callback.onError(e); + public void onError(Throwable e) { + if (e instanceof TimeoutException) + { + LOG.info("{} hit timeout: {}ms. The WarmUp will continue in background", _printName, _warmUpTimeoutMillis); + callback.onSuccess(None.none()); + } + else + { + LOG.error("{} failed to fetch dual read mode, continuing warmup.", _printName, e); + } + continueWarmUp(callback); } @Override - public void onSuccess(None result) - { - // guaranteeing that we are going to use a thread that is not going to cause a deadlock - // the caller might call this method on other threads (e.g. the ZK thread) creating possible circular dependencies - // resulting in malfunctions - _executorService.execute(() -> warmUpServices(callback)); + public void onSuccess(None result) { + continueWarmUp(callback); } - }); - } + }; - /** - * When the D2 client is ready, fetch the service names and attempt to warmUp each service. If a request fails, it - * will be ignored and the warm up process will continue - */ - private void warmUpServices(Callback startUpCallback) - { - Callback timeoutCallback = new TimeoutCallback<>(_executorService, _warmUpTimeoutSeconds, TimeUnit.SECONDS, new Callback() - { + _loadBalancer.start(new Callback() { @Override - public void onError(Throwable e) - { - LOG.info("D2 WarmUp hit timeout, continuing startup. The WarmUp will continue in background", e); - startUpCallback.onSuccess(None.none()); + public void onError(Throwable e) { + callback.onError(e); } @Override - public void onSuccess(None result) - { - LOG.info("D2 WarmUp completed"); - startUpCallback.onSuccess(None.none()); + public void onSuccess(None result) { + _allStartTime = _timeSupplier.get(); + _executorService.submit(() -> prepareWarmUp(prepareWarmUpCallback)); } - }, "This message will never be used, even in case of timeout, no exception should be passed up"); + }); + } - _downstreamServicesFetcher.getServiceNames(serviceNames -> { - try - { + private void prepareWarmUp(Callback callback) + { + // not to be thread-safe, but just to be effectively final to be used in lambdas + final AtomicBoolean hasTimedOut = new AtomicBoolean(false); + + try { + _downstreamServicesFetcher.getServiceNames(serviceNames -> { // The downstreamServicesFetcher is the core group of the services that will be used during the lifecycle _usedServices.addAll(serviceNames); - LOG.info("Trying to warmup {} services: [{}]", serviceNames.size(), String.join(", ", serviceNames)); + LOG.info("{} starting to fetch dual read mode with timeout: {}ms, for {} services: [{}]", + _printName, _warmUpTimeoutMillis, serviceNames.size(), String.join(", ", serviceNames)); - if (serviceNames.size() == 0) - { - timeoutCallback.onSuccess(None.none()); - return; - } + _servicesToWarmUp = serviceNames; if (_dualReadStateManager != null) { + // warm up dual read mode for the service and its belonging cluster. This is needed BEFORE fetching the actual + // data of service/cluster/uri (in the WarmUpTask below), so that when the actual data is received, they can + // be reported to dual read monitoring under dual read mode. DualReadModeProvider dualReadModeProvider = _dualReadStateManager.getDualReadModeProvider(); - serviceNames.forEach(serviceName -> - { + _servicesToWarmUp = serviceNames.stream().filter(serviceName -> { DualReadModeProvider.DualReadMode dualReadMode = dualReadModeProvider.getDualReadMode(serviceName); _dualReadStateManager.updateService(serviceName, dualReadMode); - getLoadBalancedServiceProperties(serviceName, new Callback() + boolean res = isModeToWarmUp(dualReadMode, _isIndis); + if (!res) + { + LOG.info("{} skipping service: {} based on its dual read mode: {}", + _printName, serviceName, dualReadMode); + } + return res; + }).collect(Collectors.toList()); + + _servicesToWarmUp.forEach(serviceName -> { + // check timeout before continue + if (!hasTimedOut.get() + && _timeSupplier.get() - _allStartTime > _warmUpTimeoutMillis) { + hasTimedOut.set(true); + callback.onError(new TimeoutException()); + } + + // To warm up the cluster dual read mode, we need to fetch the service data to know its belonging cluster. + LOG.info("{} fetching service data for service: {}", _printName, serviceName); + + // NOTE: This call blocks! + getLoadBalancedServiceProperties(serviceName, new Callback() { @Override - public void onError(Throwable e) - { - LOG.warn("Failed to warm up dual read mode for service: " + serviceName, e); + public void onError(Throwable e) { + LOG.warn("{} failed to warm up dual read mode for service: {}", _printName, serviceName, e); } @Override - public void onSuccess(ServiceProperties result) - { - _dualReadStateManager.updateCluster(result.getClusterName(), dualReadMode); + public void onSuccess(ServiceProperties result) { + _dualReadStateManager.updateCluster(result.getClusterName(), + _dualReadStateManager.getServiceDualReadMode(result.getServiceName())); } }); }); + + LOG.info("{} fetched dual read mode for {} services in {}ms. {} services need to warm up.", + _printName, serviceNames.size(), _timeSupplier.get() - _allStartTime, + _servicesToWarmUp.size()); } - WarmUpTask warmUpTask = new WarmUpTask(serviceNames, timeoutCallback); - // get the min value because it makes no sense have an higher concurrency than the number of request to be made - int concurrentRequests = Math.min(serviceNames.size(), _concurrentRequests); - IntStream.range(0, concurrentRequests) + if (!hasTimedOut.get()) + { + callback.onSuccess(None.none()); + } + }); + } + catch (Exception e) + { + callback.onError(e); + } + } + + private void continueWarmUp(Callback callback) + { + if (_servicesToWarmUp.isEmpty()) + { + LOG.info("{} no services to warmup. Warmup completed", _printName); + callback.onSuccess(None.none()); + return; + } + + // guaranteeing that we are going to use a thread that is not going to cause a deadlock + // the caller might call this method on other threads (e.g. the ZK thread) creating possible circular dependencies + // resulting in malfunctions + _executorService.execute(() -> warmUpServices(callback)); + } + + /** + * When the D2 client is ready, fetch the service names and attempt to warmUp each service. If a request fails, it + * will be ignored and the warm up process will continue + */ + private void warmUpServices(Callback startUpCallback) + { + long timeoutMilli = Math.max(0, _warmUpTimeoutMillis - (_timeSupplier.get() - _allStartTime)); + LOG.info("{} starting to warm up with timeout: {}ms for {} services: [{}]", + _printName, timeoutMilli, _servicesToWarmUp.size(), String.join(", ", _servicesToWarmUp)); + + Callback timeoutCallback = new TimeoutCallback<>(_executorService, timeoutMilli, TimeUnit.MILLISECONDS, + new Callback() + { + @Override + public void onError(Throwable e) + { + LOG.info("{} hit timeout after {}ms since initial start time, continuing startup. " + + "Warmup will continue in background", + _printName, _timeSupplier.get() - _allStartTime, e); + startUpCallback.onSuccess(None.none()); + } + + @Override + public void onSuccess(None result) + { + LOG.info("{} completed", _printName); + startUpCallback.onSuccess(None.none()); + } + }, "This message will never be used, even in case of timeout, no exception should be passed up" + ); + + try + { + // the WarmUpTask fetches the cluster and uri data, since the service data is already fetched + WarmUpTask warmUpTask = new WarmUpTask(_servicesToWarmUp, timeoutCallback); + + // get the min value because it makes no sense have an higher concurrency than the number of request to be made + int concurrentRequests = Math.min(_servicesToWarmUp.size(), _concurrentRequests); + IntStream.range(0, concurrentRequests) .forEach(i -> _outstandingRequests.add(_executorService.submit(warmUpTask::execute))); - } - catch (Exception e) - { - LOG.error("D2 WarmUp Failed, continuing start up.", e); - timeoutCallback.onSuccess(None.none()); - } - }); + } + catch (Exception e) + { + LOG.error("{} failed, continuing start up.", _printName, e); + timeoutCallback.onSuccess(None.none()); + } } @Override @@ -234,7 +339,7 @@ private class WarmUpTask void execute() { - final long startTime = SystemClock.instance().currentTimeMillis(); + final long startTime = _timeSupplier.get(); final String serviceName = _serviceNamesQueue.poll(); if (serviceName == null || _shuttingDown) @@ -242,14 +347,19 @@ void execute() return; } - LOG.info("{}/{} Starting to warm up service {}", new Object[]{_requestStartedCount.incrementAndGet(), _serviceNames.size(), serviceName}); + LOG.info("{} starting to warm up service {}, started {}/{}", + _printName, serviceName, _requestStartedCount.incrementAndGet(), _serviceNames.size()); + // for services that have warmed up dual read mode above, their service data will be stored in event bus already, + // so warming up the service data will complete instantly. _serviceWarmupper.warmUpService(serviceName, new Callback() { private void executeNextTask() { if (_requestCompletedCount.incrementAndGet() == _serviceNames.size()) { + LOG.info("{} completed warming up {} services in {}ms", + _printName, _serviceNames.size(), _timeSupplier.get() - _allStartTime); _callback.onSuccess(None.none()); _outstandingRequests.clear(); return; @@ -260,22 +370,30 @@ private void executeNextTask() @Override public void onError(Throwable e) { - LOG.info(String.format("%s/%s Service %s failed to warm up, continuing with warm up", - _requestCompletedCount.get() + 1, _serviceNames.size(), serviceName), e); + LOG.info("{} failed to warm up service {}, completed {}/{}, continuing with warm up", + _printName, serviceName, _requestCompletedCount.get() + 1, _serviceNames.size(), e); executeNextTask(); } @Override public void onSuccess(None result) { - LOG.info("{}/{} Service {} warmed up in {}ms", new Object[]{_requestCompletedCount.get() + 1, _serviceNames.size(), - serviceName, SystemClock.instance().currentTimeMillis() - startTime}); + LOG.info("{} completed warming up service {} in {}ms, completed {}/{}", + _printName, serviceName, _timeSupplier.get() - startTime, + _requestCompletedCount.get() + 1, _serviceNames.size()); executeNextTask(); } }); } } + private static boolean isModeToWarmUp(DualReadModeProvider.DualReadMode mode, boolean isIndis) + { + return mode == DualReadModeProvider.DualReadMode.DUAL_READ + || mode == (isIndis ? + DualReadModeProvider.DualReadMode.NEW_LB_ONLY : DualReadModeProvider.DualReadMode.OLD_LB_ONLY); + } + @Override public void shutdown(PropertyEventThread.PropertyEventShutdownCallback shutdown) { diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java index d64e867e81..8e1f61e016 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java @@ -72,8 +72,8 @@ public LoadBalancerWithFacilities create(D2ClientConfig config) if (config.warmUp) { balancer = new WarmUpLoadBalancer(balancer, xdsLoadBalancer, config.indisStartUpExecutorService, config.indisFsBasePath, - config.d2ServicePath, config.indisDownstreamServicesFetcher, config.warmUpTimeoutSeconds, - config.warmUpConcurrentRequests, config.dualReadStateManager); + config.d2ServicePath, config.indisDownstreamServicesFetcher, config.indisWarmUpTimeoutSeconds, + config.indisWarmUpConcurrentRequests, config.dualReadStateManager, true); } return balancer; diff --git a/d2/src/test/java/com/linkedin/d2/balancer/util/WarmUpLoadBalancerTest.java b/d2/src/test/java/com/linkedin/d2/balancer/util/WarmUpLoadBalancerTest.java index c2db290301..5ce8834786 100644 --- a/d2/src/test/java/com/linkedin/d2/balancer/util/WarmUpLoadBalancerTest.java +++ b/d2/src/test/java/com/linkedin/d2/balancer/util/WarmUpLoadBalancerTest.java @@ -20,9 +20,13 @@ import com.linkedin.common.util.None; import com.linkedin.d2.balancer.LoadBalancer; import com.linkedin.d2.balancer.ServiceUnavailableException; +import com.linkedin.d2.balancer.dualread.DualReadModeProvider; +import com.linkedin.d2.balancer.dualread.DualReadStateManager; import com.linkedin.d2.balancer.util.downstreams.DownstreamServicesFetcher; import com.linkedin.d2.balancer.util.downstreams.FSBasedDownstreamServicesFetcher; +import com.linkedin.d2.util.TestDataHelper; import com.linkedin.r2.message.RequestContext; +import com.linkedin.test.util.retry.ThreeRetries; import java.io.File; import java.io.IOException; import java.net.URISyntaxException; @@ -35,11 +39,16 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import static com.linkedin.d2.balancer.dualread.DualReadModeProvider.DualReadMode.*; +import static org.mockito.Mockito.*; + public class WarmUpLoadBalancerTest { @@ -63,8 +72,43 @@ public class WarmUpLoadBalancerTest } private File _tmpdir; + private DualReadModeProvider _dualReadModeProvider; + private DualReadStateManager _dualReadStateManager; + private static final int TIME_FREEZED_CALL = 5; // the first call in warmUpServices which sets timeout - @Test(timeOut = 10000) + @BeforeMethod + public void beforeTest() throws IOException + { + _tmpdir = LoadBalancerUtil.createTempDirectory("d2FileStore"); + _FSBasedDownstreamServicesFetcher = new FSBasedDownstreamServicesFetcher(_tmpdir.getAbsolutePath(), MY_SERVICES_FS); + + _dualReadModeProvider = Mockito.mock(DualReadModeProvider.class); + _dualReadStateManager = Mockito.mock(DualReadStateManager.class); + when(_dualReadStateManager.getDualReadModeProvider()).thenReturn(_dualReadModeProvider); + doNothing().when(_dualReadStateManager).updateService(any(), any()); + doNothing().when(_dualReadStateManager).updateCluster(any(), any()); + } + + private void setDualReadMode(DualReadModeProvider.DualReadMode mode) + { + when(_dualReadModeProvider.getDualReadMode(any())).thenReturn(mode); + when(_dualReadStateManager.getServiceDualReadMode(any())).thenReturn(mode); + } + + @AfterMethod + public void afterTest() throws IOException + { + if (_tmpdir != null) + { + rmrf(_tmpdir); + _tmpdir = null; + } + + _dualReadModeProvider = null; + _dualReadStateManager = null; + } + + @Test public void testMakingWarmUpRequests() throws URISyntaxException, InterruptedException, ExecutionException, TimeoutException { createDefaultServicesIniFiles(); @@ -78,7 +122,7 @@ public void testMakingWarmUpRequests() throws URISyntaxException, InterruptedExc FutureCallback callback = new FutureCallback<>(); warmUpLoadBalancer.start(callback); - callback.get(5000, TimeUnit.MILLISECONDS); + callback.get(30, TimeUnit.MILLISECONDS); // 3 services should take at most 3 * 5ms Assert.assertEquals(VALID_FILES.size(), requestCount.get()); } @@ -316,22 +360,128 @@ public void testHitTimeout() throws URISyntaxException, InterruptedException, Ex "Expected # of requests between " + expectedRequests + " +/-" + deviation + ", found:" + requestCount.get()); } + @DataProvider // to test dual read modes under which the specific type of warmup load balancer should do warmup + public Object[][] modesToWarmUpDataProvider() + { + return new Object[][] + {// @params: {dual read mode, isIndis} + {NEW_LB_ONLY, true}, + {OLD_LB_ONLY, false}, + // under dual read mode, both INDIS and ZK warmup should do warmup + {DUAL_READ, true}, + {DUAL_READ, false} + }; + } - @BeforeMethod - public void createTempdir() throws IOException + @Test(dataProvider = "modesToWarmUpDataProvider", retryAnalyzer = ThreeRetries.class) + public void testSuccessWithDualRead(DualReadModeProvider.DualReadMode mode, Boolean isIndis) + throws InterruptedException, ExecutionException, TimeoutException { - _tmpdir = LoadBalancerUtil.createTempDirectory("d2FileStore"); - _FSBasedDownstreamServicesFetcher = new FSBasedDownstreamServicesFetcher(_tmpdir.getAbsolutePath(), MY_SERVICES_FS); + int timeoutMillis = 90; + createDefaultServicesIniFiles(); + setDualReadMode(mode); + + // 3 dual read fetches take 30ms, 3 warmups take at most 3 * (5 +/- 5) ms. Total at most is 60 ms. + TestLoadBalancer balancer = new TestLoadBalancer(5, 10); + AtomicInteger completedWarmUpCount = balancer.getCompletedRequestCount(); + LoadBalancer warmUpLb = new WarmUpLoadBalancer(balancer, balancer, Executors.newSingleThreadScheduledExecutor(), + _tmpdir.getAbsolutePath(), MY_SERVICES_FS, _FSBasedDownstreamServicesFetcher, timeoutMillis, + WarmUpLoadBalancer.DEFAULT_CONCURRENT_REQUESTS, _dualReadStateManager, isIndis, + TestDataHelper.getTimeSupplier(10, TIME_FREEZED_CALL)); + + FutureCallback callback = new FutureCallback<>(); + warmUpLb.start(callback); + + callback.get(timeoutMillis, TimeUnit.MILLISECONDS); + // all dual read (service data) fetched + verify(_dualReadStateManager, times(VALID_FILES.size())).updateCluster(any(), any()); + // all warmups completed + Assert.assertEquals(completedWarmUpCount.get(), VALID_FILES.size()); } - @AfterMethod - public void removeTempdir() throws IOException + @Test(dataProvider = "modesToWarmUpDataProvider", retryAnalyzer = ThreeRetries.class) + public void testDualReadHitTimeout(DualReadModeProvider.DualReadMode mode, Boolean isIndis) + throws InterruptedException, ExecutionException, TimeoutException { - if (_tmpdir != null) - { - rmrf(_tmpdir); - _tmpdir = null; - } + int timeoutMillis = 120; + createDefaultServicesIniFiles(); + setDualReadMode(mode); + + // 3 dual read fetches take 90ms + TestLoadBalancer balancer = new TestLoadBalancer(0, 50); + AtomicInteger completedWarmUpCount = balancer.getCompletedRequestCount(); + LoadBalancer warmUpLb = new WarmUpLoadBalancer(balancer, balancer, Executors.newSingleThreadScheduledExecutor(), + _tmpdir.getAbsolutePath(), MY_SERVICES_FS, _FSBasedDownstreamServicesFetcher, timeoutMillis, + WarmUpLoadBalancer.DEFAULT_CONCURRENT_REQUESTS, _dualReadStateManager, isIndis, + TestDataHelper.getTimeSupplier(50, TIME_FREEZED_CALL)); + + FutureCallback callback = new FutureCallback<>(); + warmUpLb.start(callback); + + callback.get(timeoutMillis, TimeUnit.MILLISECONDS); + // verify that at most 2 service data were fetched within the timeout + verify(_dualReadStateManager, atMost(2)).updateCluster(any(), any()); + // warmups are not started + Assert.assertEquals(completedWarmUpCount.get(), 0); + } + + @Test(dataProvider = "modesToWarmUpDataProvider", retryAnalyzer = ThreeRetries.class) + public void testDualReadCompleteWarmUpHitTimeout(DualReadModeProvider.DualReadMode mode, Boolean isIndis) + throws InterruptedException, ExecutionException, TimeoutException + { + int timeoutMillis = 200; + createDefaultServicesIniFiles(); + setDualReadMode(mode); + + // 3 dual read fetches take 150ms, 3 warmups take 3 * (50 +/- 5) ms + TestLoadBalancer balancer = new TestLoadBalancer(50, 50); + AtomicInteger completedWarmUpCount = balancer.getCompletedRequestCount(); + LoadBalancer warmUpLb = new WarmUpLoadBalancer(balancer, balancer, Executors.newSingleThreadScheduledExecutor(), + _tmpdir.getAbsolutePath(), MY_SERVICES_FS, _FSBasedDownstreamServicesFetcher, timeoutMillis, + WarmUpLoadBalancer.DEFAULT_CONCURRENT_REQUESTS, _dualReadStateManager, isIndis, + TestDataHelper.getTimeSupplier(50, TIME_FREEZED_CALL)); + + FutureCallback callback = new FutureCallback<>(); + warmUpLb.start(callback); + + callback.get(timeoutMillis, TimeUnit.MILLISECONDS); + // verify dual read (service data) are all fetched + verify(_dualReadStateManager, times(VALID_FILES.size())).updateCluster(any(), any()); + // only partial warmups completed + Assert.assertTrue(completedWarmUpCount.get() < VALID_FILES.size()); + } + + @DataProvider // to test dual read modes under which the specific type of warmup load balancer should skip warmup + public Object[][] modesToSkipDataProvider() + { + return new Object[][] + { // @params: {dual read mode, isIndis} + {NEW_LB_ONLY, false}, + {OLD_LB_ONLY, true} + }; + } + @Test(dataProvider = "modesToSkipDataProvider", retryAnalyzer = ThreeRetries.class) + public void testSkipWarmup(DualReadModeProvider.DualReadMode mode, Boolean isIndis) + throws ExecutionException, InterruptedException, TimeoutException { + int timeoutMillis = 40; + createDefaultServicesIniFiles(); + setDualReadMode(mode); + + TestLoadBalancer balancer = new TestLoadBalancer(0, 0); + AtomicInteger completedWarmUpCount = balancer.getCompletedRequestCount(); + LoadBalancer warmUpLb = new WarmUpLoadBalancer(balancer, balancer, Executors.newSingleThreadScheduledExecutor(), + _tmpdir.getAbsolutePath(), MY_SERVICES_FS, _FSBasedDownstreamServicesFetcher, timeoutMillis, + WarmUpLoadBalancer.DEFAULT_CONCURRENT_REQUESTS, _dualReadStateManager, isIndis, + TestDataHelper.getTimeSupplier(0, TIME_FREEZED_CALL)); + + FutureCallback callback = new FutureCallback<>(); + warmUpLb.start(callback); + + callback.get(timeoutMillis, TimeUnit.MILLISECONDS); // skipping warmup should call back nearly immediately + // no service data fetched + verify(_dualReadStateManager, never()).updateCluster(any(), any()); + // warmups are not started + Assert.assertEquals(completedWarmUpCount.get(), 0); } // ############################# Util Section ############################# diff --git a/d2/src/test/java/com/linkedin/d2/util/TestDataHelper.java b/d2/src/test/java/com/linkedin/d2/util/TestDataHelper.java index 7be1f2ec87..cb76dac514 100644 --- a/d2/src/test/java/com/linkedin/d2/util/TestDataHelper.java +++ b/d2/src/test/java/com/linkedin/d2/util/TestDataHelper.java @@ -7,11 +7,16 @@ import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter; import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; +import java.util.stream.Collectors; import org.testng.Assert; import static org.testng.Assert.*; @@ -216,4 +221,23 @@ public void verifyZeroEmissionOfSDStatusUpdateReceiptEvents() { assertTrue(_receiptMarkDownClusters.isEmpty()); } } + + // A time supplier that proceed with speedMillis but could freeze on special calls specified in freezedCalls. + // This is for convenience when the code being tested has calls where the time shouldn't move forward (no + // time-consuming work is done before this call). + public static Supplier getTimeSupplier(long speedMillis, int... freezedCalls) + { + return new Supplier() { + private AtomicLong _time = new AtomicLong(0); + private Set _freezedCalls = Arrays.stream(freezedCalls).boxed().collect(Collectors.toSet()); + private AtomicInteger _callCount = new AtomicInteger(0); + + @Override + public Long get() { + return _freezedCalls.contains(_callCount.getAndIncrement()) + ? _time.get() // freeze on special calls + : _time.addAndGet(speedMillis); + } + }; + } } diff --git a/d2/src/test/java/com/linkedin/d2/xds/XdsToD2SampleClient.java b/d2/src/test/java/com/linkedin/d2/xds/XdsToD2SampleClient.java index 9f52f7f7a1..224ba44e6b 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/XdsToD2SampleClient.java +++ b/d2/src/test/java/com/linkedin/d2/xds/XdsToD2SampleClient.java @@ -89,7 +89,7 @@ public static void main(String[] args) throws Exception XdsChannelFactory xdsChannelFactory = new XdsChannelFactory(sslContext, xdsServer); XdsClient xdsClient = new XdsClientImpl(node, xdsChannelFactory.createChannel(), - Executors.newSingleThreadScheduledExecutor()); + Executors.newSingleThreadScheduledExecutor(), XdsClientImpl.DEFAULT_READY_TIMEOUT_MILLIS); DualReadStateManager dualReadStateManager = new DualReadStateManager( () -> DualReadModeProvider.DualReadMode.DUAL_READ, diff --git a/gradle.properties b/gradle.properties index 1065fc8c24..5d6343e153 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.49.3 +version=29.49.4 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true