From 8f77653ab402b29a346c6d27d3fa1127cda16b3f Mon Sep 17 00:00:00 2001 From: Felix GV Date: Sat, 21 Dec 2024 04:56:30 -0500 Subject: [PATCH] [router] RouterExceptionAndTrackingUtils allocation reduction (#1413) Removed all Optionals from RouterExceptionAndTrackingUtils. Miscellaneous: - Tweaked flaky unit test RouterRequestThrottlingTest. - Deleted a disabled test in TestZKStore. - Tweaked the main README page. --- docs/README.md | 5 +- docs/assets/icons/bluesky-icon.svg | 4 + docs/assets/icons/github-icon.svg | 19 ++- docs/assets/style/venice_full_lion_logo.svg | 27 ++-- .../com/linkedin/venice/HttpConstants.java | 5 + .../com/linkedin/venice/meta/TestZKStore.java | 28 ----- .../utils/MockVeniceRouterWrapper.java | 1 + .../com/linkedin/venice/utils/TestUtils.java | 19 --- .../api/RouterExceptionAndTrackingUtils.java | 118 ++++++++---------- .../venice/router/api/VeniceDelegateMode.java | 73 ++++++----- .../venice/router/api/VeniceDispatcher.java | 17 ++- .../venice/router/api/VenicePathParser.java | 46 +++---- .../router/api/VeniceResponseAggregator.java | 45 +++---- .../api/VeniceResponseDecompressor.java | 13 +- .../router/api/path/VeniceComputePath.java | 9 +- .../router/api/path/VeniceMultiGetPath.java | 5 +- .../router/api/path/VeniceMultiKeyPath.java | 17 ++- .../router/api/path/VeniceSingleGetPath.java | 17 ++- .../throttle/RouterRequestThrottlingTest.java | 73 ++++++++--- 19 files changed, 266 insertions(+), 275 deletions(-) create mode 100644 docs/assets/icons/bluesky-icon.svg diff --git a/docs/README.md b/docs/README.md index f3ef9346005..dc4670855c5 100644 --- a/docs/README.md +++ b/docs/README.md @@ -137,8 +137,9 @@ Feel free to engage with the community using our: - [](./dev_guide/how_to/how_to.md) [Contributor's guide](./dev_guide/how_to/how_to.md) Follow us to hear more about the progress of the Venice project and community: -- [](https://blog.venicedb.org) [Official blog](https://blog.venicedb.org) +- [](https://blog.venicedb.org) [Blog](https://blog.venicedb.org) +- [](https://bsky.app/profile/venicedb.org) [Bluesky handle](https://bsky.app/profile/venicedb.org) - [](https://www.linkedin.com/company/venicedb) [LinkedIn page](https://www.linkedin.com/company/venicedb) -- [](https://x.com/VeniceDataBase) [X page](https://x.com/VeniceDataBase) +- [](https://x.com/VeniceDataBase) [X handle](https://x.com/VeniceDataBase) - [](https://youtube.com/@venicedb) [YouTube channel](https://youtube.com/@venicedb) diff --git a/docs/assets/icons/bluesky-icon.svg b/docs/assets/icons/bluesky-icon.svg new file mode 100644 index 00000000000..c71e2018a61 --- /dev/null +++ b/docs/assets/icons/bluesky-icon.svg @@ -0,0 +1,4 @@ + + + + diff --git a/docs/assets/icons/github-icon.svg b/docs/assets/icons/github-icon.svg index a3fcb98dcbc..c368e311841 100644 --- a/docs/assets/icons/github-icon.svg +++ b/docs/assets/icons/github-icon.svg @@ -1 +1,18 @@ - \ No newline at end of file + + + + + + + + + + + + + \ No newline at end of file diff --git a/docs/assets/style/venice_full_lion_logo.svg b/docs/assets/style/venice_full_lion_logo.svg index 60d66f1a083..5d5e7a07b5a 100644 --- a/docs/assets/style/venice_full_lion_logo.svg +++ b/docs/assets/style/venice_full_lion_logo.svg @@ -6,7 +6,7 @@ id="svg2" width="1333.3333" height="1333.3333" - viewBox="310 160 800 800" + viewBox="160 160 960 800" sodipodi:docname="VENICE black logo RGB.ai" xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape" xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd" @@ -40,6 +40,23 @@ d="M 0,0 H 1000 V 1000 H 0 Z" style="fill:#fefefe;fill-opacity:0;fill-rule:nonzero;stroke:none" id="path10" /> + + + + + + + + + + - - - diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/HttpConstants.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/HttpConstants.java index a91a65bcbe6..9c533393489 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/HttpConstants.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/HttpConstants.java @@ -23,6 +23,11 @@ public class HttpConstants { public static final String VENICE_RETRY = "X-VENICE-RETRY"; public static final String VENICE_COMPRESSION_STRATEGY = "X-VENICE-COMPRESSION-STRATEGY"; + /** + * N.B.: There seems to be an assumption that this header's value will contain just a single compression scheme (GZIP) + * and that it's not correct to pass a list of supported compression strategies (as one might expect). So we + * need to be careful if we ever intend to evolve the way this header is used to accommodate multiple schemes. + */ public static final String VENICE_SUPPORTED_COMPRESSION_STRATEGY = "X-VENICE-SUPPORTED-COMPRESSION-STRATEGY"; public static final String VENICE_STREAMING = "X-VENICE-STREAMING"; diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestZKStore.java b/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestZKStore.java index 75d1dd03d16..5d4283d1ee7 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestZKStore.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestZKStore.java @@ -395,34 +395,6 @@ public void invalidStoreNameThrows() { 1); } - /** - * We're not relying on push ID uniqueness. We can reenable this test if we start relying on (and enforcing) push ID uniqueness - */ - @Test(groups = { "flaky" }) - public void cannotAddDifferentVersionsWithSamePushId() { - String storeName = "storeName"; - Store store = new ZKStore( - storeName, - "owner", - System.currentTimeMillis(), - PersistenceType.IN_MEMORY, - RoutingStrategy.CONSISTENT_HASH, - ReadStrategy.ANY_OF_ONLINE, - OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION, - 1); - String duplicatePushJobId = "pushId"; - Version versionOne = new VersionImpl(storeName, 1, duplicatePushJobId); - store.addVersion(versionOne); - Version versionTwo = new VersionImpl(storeName, 2, duplicatePushJobId); - try { - store.addVersion(versionTwo); - Assert.fail("Store must not allow adding a new version with same pushId"); - } catch (Exception e) { - // expected - LOGGER.info("Expected exception: {}", e.getLocalizedMessage()); - } - } - @Test public void testStoreLevelAcl() { Store store = new ZKStore( diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/MockVeniceRouterWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/MockVeniceRouterWrapper.java index ded7b4e31ac..0fe0e873158 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/MockVeniceRouterWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/MockVeniceRouterWrapper.java @@ -84,6 +84,7 @@ static StatefulServiceProvider generateService( doReturn(CompressionStrategy.NO_OP).when(mockStore).getCompressionStrategy(); HelixReadOnlyStoreRepository mockMetadataRepository = Mockito.mock(HelixReadOnlyStoreRepository.class); doReturn(mockStore).when(mockMetadataRepository).getStore(Mockito.anyString()); + doReturn(mockStore).when(mockMetadataRepository).getStoreOrThrow(Mockito.anyString()); Version mockVersion = Mockito.mock(Version.class); doReturn(mockVersion).when(mockStore).getVersion(Mockito.anyInt()); diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java index 48e3d3576c3..35d1a047279 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java @@ -11,7 +11,6 @@ import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import com.github.luben.zstd.Zstd; @@ -46,7 +45,6 @@ import com.linkedin.venice.helix.SafeHelixManager; import com.linkedin.venice.helix.VeniceOfflinePushMonitorAccessor; import com.linkedin.venice.kafka.protocol.state.PartitionState; -import com.linkedin.venice.meta.DataReplicationPolicy; import com.linkedin.venice.meta.IngestionMode; import com.linkedin.venice.meta.Instance; import com.linkedin.venice.meta.OfflinePushStrategy; @@ -656,23 +654,6 @@ public static void verifyDCConfigNativeAndActiveRepl( }); } - public static void verifyHybridStoreDataReplicationPolicy( - String storeName, - DataReplicationPolicy dataReplicationPolicy, - ControllerClient... controllerClients) { - TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> { - for (ControllerClient controllerClient: controllerClients) { - StoreResponse storeResponse = assertCommand(controllerClient.getStore(storeName)); - assertNotNull(storeResponse.getStore(), "Store should not be null"); - assertNotNull(storeResponse.getStore().getHybridStoreConfig(), "Hybrid store config should not be null"); - assertEquals( - storeResponse.getStore().getHybridStoreConfig().getDataReplicationPolicy(), - dataReplicationPolicy, - "The data replication policy does not match."); - } - }); - } - public static StoreIngestionTaskFactory.Builder getStoreIngestionTaskBuilder(String storeName) { VeniceServerConfig mockVeniceServerConfig = mock(VeniceServerConfig.class); doReturn(false).when(mockVeniceServerConfig).isHybridQuotaEnabled(); diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/RouterExceptionAndTrackingUtils.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/RouterExceptionAndTrackingUtils.java index 5c79e02630b..e29e46b8994 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/RouterExceptionAndTrackingUtils.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/RouterExceptionAndTrackingUtils.java @@ -12,7 +12,8 @@ import com.linkedin.venice.router.stats.RouterStats; import com.linkedin.venice.utils.RedundantExceptionFilter; import io.netty.handler.codec.http.HttpResponseStatus; -import java.util.Optional; +import java.util.Objects; +import java.util.function.BiConsumer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -23,14 +24,27 @@ * * TODO: If later on DDS router could support a better way to register a handler to handle the exceptional cases, * we should update the logic here. - * - * TODO: Remove all {@link Optional} from this class. */ - public class RouterExceptionAndTrackingUtils { public enum FailureType { - REGULAR, SMART_RETRY_ABORTED_BY_SLOW_ROUTE, SMART_RETRY_ABORTED_BY_DELAY_CONSTRAINT, - SMART_RETRY_ABORTED_BY_MAX_RETRY_ROUTE_LIMIT, RESOURCE_NOT_FOUND, RETRY_ABORTED_BY_NO_AVAILABLE_REPLICA + REGULAR, RESOURCE_NOT_FOUND, + SMART_RETRY_ABORTED_BY_SLOW_ROUTE(AggRouterHttpRequestStats::recordSlowRouteAbortedRetryRequest), + SMART_RETRY_ABORTED_BY_DELAY_CONSTRAINT(AggRouterHttpRequestStats::recordDelayConstraintAbortedRetryRequest), + SMART_RETRY_ABORTED_BY_MAX_RETRY_ROUTE_LIMIT(AggRouterHttpRequestStats::recordRetryRouteLimitAbortedRetryRequest), + RETRY_ABORTED_BY_NO_AVAILABLE_REPLICA(AggRouterHttpRequestStats::recordNoAvailableReplicaAbortedRetryRequest); + + final boolean reportUnhealthy; + final BiConsumer statsRecorder; + + FailureType() { + this.reportUnhealthy = true; + this.statsRecorder = (stats, store) -> {}; // No op + } + + FailureType(BiConsumer statsRecorder) { + this.reportUnhealthy = false; + this.statsRecorder = Objects.requireNonNull(statsRecorder); + } } private static RouterStats ROUTER_STATS; @@ -45,8 +59,8 @@ public static void setRouterStats(RouterStats routerS } public static RouterException newRouterExceptionAndTracking( - Optional storeName, - Optional requestType, + String storeName, + RequestType requestType, HttpResponseStatus responseStatus, String msg, FailureType failureType) { @@ -63,17 +77,16 @@ public static RouterException newRouterExceptionAndTracking( /** We do not fill in the stacktrace at all for "expected exceptions" (quota, etc) */ false) : new RouterException(HttpResponseStatus.class, responseStatus, responseStatus.code(), msg, false); - String name = storeName.isPresent() ? storeName.get() : ""; - if (!EXCEPTION_FILTER.isRedundantException(name, String.valueOf(e.code()))) { + if (!EXCEPTION_FILTER.isRedundantException(storeName, String.valueOf(e.code()))) { if (responseStatus == BAD_REQUEST) { - String error = "Received bad request for store: " + name; + String error = "Received bad request for store: " + storeName; if (!EXCEPTION_FILTER.isRedundantException(error)) { LOGGER.warn(error, e); } } else if (failureType == FailureType.RESOURCE_NOT_FOUND) { - LOGGER.error("Could not find resources for store: {} ", name, e); + LOGGER.error("Could not find resources for store: {} ", storeName, e); } else { - LOGGER.warn("Got an exception for store: {} ", name, e); + LOGGER.warn("Got an exception for store: {} ", storeName, e); } } return e; @@ -91,16 +104,16 @@ private static boolean isExpected(HttpResponseStatus responseStatus, FailureType } public static RouterException newRouterExceptionAndTracking( - Optional storeName, - Optional requestType, + String storeName, + RequestType requestType, HttpResponseStatus responseStatus, String msg) { return newRouterExceptionAndTracking(storeName, requestType, responseStatus, msg, FailureType.REGULAR); } public static RouterException newRouterExceptionAndTrackingResourceNotFound( - Optional storeName, - Optional requestType, + String storeName, + RequestType requestType, HttpResponseStatus responseStatus, String msg) { return newRouterExceptionAndTracking(storeName, requestType, responseStatus, msg, FailureType.RESOURCE_NOT_FOUND); @@ -108,28 +121,27 @@ public static RouterException newRouterExceptionAndTrackingResourceNotFound( @Deprecated public static VeniceException newVeniceExceptionAndTracking( - Optional storeName, - Optional requestType, + String storeName, + RequestType requestType, HttpResponseStatus responseStatus, String msg, FailureType failureType) { metricTracking(storeName, requestType, responseStatus, failureType); - String name = storeName.isPresent() ? storeName.get() : ""; VeniceException e = isExpected(responseStatus, failureType) // Do not dump stack-trace for Quota exceed exception as it might blow up memory on high load ? new VeniceException(msg, false) : new VeniceException(msg); - if (!EXCEPTION_FILTER.isRedundantException(name, e)) { - LOGGER.warn("Got an exception for store: {}", name, e); + if (!EXCEPTION_FILTER.isRedundantException(storeName, e)) { + LOGGER.warn("Got an exception for store: {}", storeName, e); } return e; } @Deprecated public static VeniceException newVeniceExceptionAndTracking( - Optional storeName, - Optional requestType, + String storeName, + RequestType requestType, HttpResponseStatus responseStatus, String msg) { return newVeniceExceptionAndTracking(storeName, requestType, responseStatus, msg, FailureType.REGULAR); @@ -141,8 +153,8 @@ public static void recordUnavailableReplicaStreamingRequest(String storeName, Re } private static void metricTracking( - Optional storeName, - Optional requestType, + String storeName, + RequestType requestType, HttpResponseStatus responseStatus, FailureType failureType) { if (ROUTER_STATS == null) { @@ -150,14 +162,14 @@ private static void metricTracking( throw new VeniceException("'ROUTER_STATS' hasn't been setup yet, so there must be some bug causing this."); } AggRouterHttpRequestStats stats = - ROUTER_STATS.getStatsByType(requestType.isPresent() ? requestType.get() : RequestType.SINGLE_GET); + ROUTER_STATS.getStatsByType(requestType == null ? RequestType.SINGLE_GET : requestType); // If we don't know the actual store name, this error will only be aggregated in server level, but not // in store level if (responseStatus.equals(BAD_REQUEST) || responseStatus.equals(REQUEST_ENTITY_TOO_LARGE)) { - stats.recordBadRequest(storeName.orElse(null), responseStatus); + stats.recordBadRequest(storeName, responseStatus); } else if (responseStatus.equals(TOO_MANY_REQUESTS)) { - if (storeName.isPresent()) { - if (requestType.isPresent()) { + if (storeName != null) { + if (requestType != null) { /** * Once we stop throwing quota exceptions from within the {@link VeniceDelegateMode} then we can * process everything through {@link VeniceResponseAggregator} and remove the metric tracking @@ -165,46 +177,26 @@ private static void metricTracking( * * TODO: Remove this metric after the above work is done... */ - stats.recordThrottledRequest(storeName.get(), responseStatus); + stats.recordThrottledRequest(storeName, responseStatus); } } else { // not possible to have empty store name in this scenario throw new VeniceException("Received a TOO_MANY_REQUESTS error without store name present"); } } else { - /** - * It is on purpose that here doesn't record retry request abort as unhealthy request. - */ - switch (failureType) { - case SMART_RETRY_ABORTED_BY_SLOW_ROUTE: - if (storeName.isPresent()) { - stats.recordSlowRouteAbortedRetryRequest(storeName.get()); - } - return; - case SMART_RETRY_ABORTED_BY_DELAY_CONSTRAINT: - if (storeName.isPresent()) { - stats.recordDelayConstraintAbortedRetryRequest(storeName.get()); - } - return; - case SMART_RETRY_ABORTED_BY_MAX_RETRY_ROUTE_LIMIT: - if (storeName.isPresent()) { - stats.recordRetryRouteLimitAbortedRetryRequest(storeName.get()); - } - return; - case RETRY_ABORTED_BY_NO_AVAILABLE_REPLICA: - if (storeName.isPresent()) { - stats.recordNoAvailableReplicaAbortedRetryRequest(storeName.get()); - } - return; + if (storeName != null) { + failureType.statsRecorder.accept(stats, storeName); } - - stats.recordUnhealthyRequest(storeName.orElse(null), responseStatus); - - if (responseStatus.equals(SERVICE_UNAVAILABLE)) { - if (storeName.isPresent()) { - stats.recordUnavailableRequest(storeName.get()); - } else { - throw new VeniceException("Received a SERVICE_UNAVAILABLE error without store name present"); + if (failureType.reportUnhealthy) { + /** It is on purpose that we do not record unhealthy request for certain types of aborted retry scenarios. */ + stats.recordUnhealthyRequest(storeName, responseStatus); + + if (responseStatus.equals(SERVICE_UNAVAILABLE)) { + if (storeName != null) { + stats.recordUnavailableRequest(storeName); + } else { + throw new VeniceException("Received a SERVICE_UNAVAILABLE error without store name present"); + } } } } diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceDelegateMode.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceDelegateMode.java index d543cc95fc2..5eea6fdd45a 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceDelegateMode.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceDelegateMode.java @@ -30,7 +30,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import javax.annotation.Nonnull; @@ -125,8 +124,8 @@ public void initReadRequestThrottler(RouterThrottler requestThrottler) { public void initHelixGroupSelector(HelixGroupSelector helixGroupSelector) { if (this.helixGroupSelector != null) { throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking( - Optional.empty(), - Optional.empty(), + null, + null, INTERNAL_SERVER_ERROR, "HelixGroupSelector has already been initialized before, and no further update expected!"); } @@ -145,24 +144,24 @@ public , K, R> Scatter scatter( @Nonnull R roles) throws RouterException { if (readRequestThrottler == null) { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.empty(), - Optional.empty(), + null, + null, INTERNAL_SERVER_ERROR, "Read request throttler has not been setup yet"); } if (multiKeyRoutingStrategy.equals(VeniceMultiKeyRoutingStrategy.HELIX_ASSISTED_ROUTING) && helixGroupSelector == null) { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.empty(), - Optional.empty(), + null, + null, INTERNAL_SERVER_ERROR, "HelixGroupSelector has not been setup yet"); } P path = scatter.getPath(); if (!(path instanceof VenicePath)) { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.empty(), - Optional.empty(), + null, + null, INTERNAL_SERVER_ERROR, "VenicePath is expected, but received " + path.getClass()); } @@ -179,8 +178,8 @@ public , K, R> Scatter scatter( // Check whether retry request is too late or not if (venicePath.isRetryRequestTooLate()) { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.of(storeName), - Optional.of(venicePath.getRequestType()), + storeName, + venicePath.getRequestType(), SERVICE_UNAVAILABLE, "The retry request aborted because of delay constraint of smart long-tail retry", RouterExceptionAndTrackingUtils.FailureType.SMART_RETRY_ABORTED_BY_DELAY_CONSTRAINT); @@ -198,8 +197,8 @@ public , K, R> Scatter scatter( break; default: throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.of(storeName), - Optional.of(venicePath.getRequestType()), + storeName, + venicePath.getRequestType(), INTERNAL_SERVER_ERROR, "Unknown request type: " + venicePath.getRequestType()); } @@ -228,8 +227,8 @@ public , K, R> Scatter scatter( String errMsg = resourceName + ", partition " + partition + " is not available to serve " + isRetry + "request of type: " + venicePath.getRequestType(); throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.of(storeName), - Optional.of(venicePath.getRequestType()), + storeName, + venicePath.getRequestType(), SERVICE_UNAVAILABLE, errMsg, failureType); @@ -240,8 +239,8 @@ public , K, R> Scatter scatter( int hostCount = part.getHosts().size(); if (hostCount == 0) { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.of(storeName), - Optional.of(venicePath.getRequestType()), + storeName, + venicePath.getRequestType(), SERVICE_UNAVAILABLE, "Could not find ready-to-serve replica for request: " + part); } @@ -256,8 +255,8 @@ public , K, R> Scatter scatter( } if (!(host instanceof Instance)) { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.of(storeName), - Optional.of(venicePath.getRequestType()), + storeName, + venicePath.getRequestType(), INTERNAL_SERVER_ERROR, "Ready-to-serve host must be an 'Instance'"); } @@ -279,8 +278,8 @@ public , K, R> Scatter scatter( * with the corresponding response status directly. */ throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.of(storeName), - Optional.of(venicePath.getRequestType()), + storeName, + venicePath.getRequestType(), TOO_MANY_REQUESTS, "Quota exceeded for '" + storeName + "' while serving a " + venicePath.getRequestType() + " request! msg: " + e.getMessage()); @@ -297,8 +296,8 @@ public , K, R> Scatter scatter( || !venicePath.isLongTailRetryWithinBudget(onlineRequestNum)) { routerStats.getStatsByType(venicePath.getRequestType()).recordDisallowedRetryRequest(storeName); throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.of(storeName), - Optional.of(venicePath.getRequestType()), + storeName, + venicePath.getRequestType(), SERVICE_UNAVAILABLE, "The retry request aborted because there are too many retries for current request", RouterExceptionAndTrackingUtils.FailureType.SMART_RETRY_ABORTED_BY_MAX_RETRY_ROUTE_LIMIT); @@ -328,15 +327,15 @@ private H selectLeastLoadedHost(List hosts, VenicePath path) throws Route if (minHost == null) { if (path.isRetryRequest()) { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.of(path.getStoreName()), - Optional.of(path.getRequestType()), + path.getStoreName(), + path.getRequestType(), SERVICE_UNAVAILABLE, "Retry request aborted because of slow route for request path: " + path.getResourceName(), RouterExceptionAndTrackingUtils.FailureType.SMART_RETRY_ABORTED_BY_SLOW_ROUTE); } else { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.of(path.getStoreName()), - Optional.of(path.getRequestType()), + path.getStoreName(), + path.getRequestType(), SERVICE_UNAVAILABLE, "Could not find ready-to-serve replica for request path: " + path.getResourceName()); } @@ -376,8 +375,8 @@ public , K, R> Scatter scatter( veniceHostHealthMonitor = (HostHealthMonitor) hostHealthMonitor; } catch (ClassCastException e) { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.empty(), - Optional.empty(), + null, + null, INTERNAL_SERVER_ERROR, "VenicePath, VeniceHostFinder and HostHealthMonitor are expected, " + "but received: " + path.getClass() + " and " + hostFinder.getClass() + ", " + hostHealthMonitor.getClass()); @@ -479,8 +478,8 @@ public , K, R> Scatter scatter( veniceHostHealthMonitor = (HostHealthMonitor) hostHealthMonitor; } catch (ClassCastException e) { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.empty(), - Optional.empty(), + null, + null, INTERNAL_SERVER_ERROR, "Scatter, VenicePath, VeniceHostFinder and " + "HostHealthMonitor are expected, but received: " + scatter.getClass() + ", " @@ -658,8 +657,8 @@ protected void selectHostForPartition( for (H host: partitionReplicas) { if (!(host instanceof Instance)) { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.of(venicePath.getStoreName()), - Optional.of(venicePath.getRequestType()), + venicePath.getStoreName(), + venicePath.getRequestType(), INTERNAL_SERVER_ERROR, "The chosen host is not an 'Instance'"); } @@ -685,15 +684,15 @@ protected void selectHostForPartition( if (selectedHost == null) { if (venicePath.isRetryRequest()) { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.of(venicePath.getStoreName()), - Optional.of(venicePath.getRequestType()), + venicePath.getStoreName(), + venicePath.getRequestType(), SERVICE_UNAVAILABLE, "Retry request aborted! Could not find any healthy replica.", RouterExceptionAndTrackingUtils.FailureType.SMART_RETRY_ABORTED_BY_SLOW_ROUTE); } else { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.of(venicePath.getStoreName()), - Optional.of(venicePath.getRequestType()), + venicePath.getStoreName(), + venicePath.getRequestType(), SERVICE_UNAVAILABLE, "Could not find any healthy replica."); } diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceDispatcher.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceDispatcher.java index cf75f003e41..087f8170b42 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceDispatcher.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceDispatcher.java @@ -44,7 +44,6 @@ import java.io.IOException; import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -133,8 +132,8 @@ public void dispatch( if (part.getHosts().size() != 1) { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.of(storeName), - Optional.of(requestType), + storeName, + requestType, INTERNAL_SERVER_ERROR, "There should be only one chosen replica for the request: " + part); } @@ -190,8 +189,8 @@ protected CompletableFuture sendRequest( AggRouterHttpRequestStats stats = routerStats.getStatsByType(requestType); stats.recordRequestThrottledByRouterCapacity(storeName); throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.of(storeName), - Optional.of(requestType), + storeName, + requestType, SERVICE_UNAVAILABLE, "Maximum number of pending request threshold reached! Current pending request count: " + pendingRequestThrottler.getCurrentPendingRequestCount()); @@ -216,8 +215,8 @@ protected CompletableFuture sendRequest( return responseFuture; } else { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.of(storeName), - Optional.of(requestType), + storeName, + requestType, SERVICE_UNAVAILABLE, "Too many pending request to storage node : " + hostName); } @@ -302,8 +301,8 @@ protected VeniceFullHttpResponse buildResponse(VenicePath path, PortableHttpResp break; default: throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking( - Optional.empty(), - Optional.empty(), + null, + null, INTERNAL_SERVER_ERROR, "Unknown request type: " + path.getRequestType()); } diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/VenicePathParser.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/VenicePathParser.java index 5f1e81b68f0..fa7c8adbba5 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/VenicePathParser.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/VenicePathParser.java @@ -43,7 +43,6 @@ import java.util.Collections; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -147,14 +146,14 @@ public VenicePathParser( this.retryManagerScheduler = retryManagerScheduler; this.routerSingleKeyRetryManagers = new VeniceConcurrentHashMap<>(); this.routerMultiKeyRetryManagers = new VeniceConcurrentHashMap<>(); - }; + } @Override public VenicePath parseResourceUri(String uri, HTTP_REQUEST request) throws RouterException { if (!(request instanceof BasicFullHttpRequest)) { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.empty(), - Optional.empty(), + null, + null, BAD_GATEWAY, "parseResourceUri should receive a BasicFullHttpRequest"); } @@ -164,18 +163,15 @@ public VenicePath parseResourceUri(String uri, HTTP_REQUEST request) throws Rout RouterResourceType resourceType = pathHelper.getResourceType(); if (resourceType != RouterResourceType.TYPE_STORAGE && resourceType != RouterResourceType.TYPE_COMPUTE) { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.empty(), - Optional.empty(), + null, + null, BAD_REQUEST, "Requested resource type: " + resourceType + " is not a valid type"); } String storeName = pathHelper.getResourceName(); if (StringUtils.isEmpty(storeName)) { - throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.empty(), - Optional.empty(), - BAD_REQUEST, - "Request URI must have storeName. Uri is: " + uri); + throw RouterExceptionAndTrackingUtils + .newRouterExceptionAndTracking(null, null, BAD_REQUEST, "Request URI must have storeName. Uri is: " + uri); } VenicePath path = null; @@ -251,8 +247,8 @@ public VenicePath parseResourceUri(String uri, HTTP_REQUEST request) throws Rout } else { if (!request.headers().contains(HttpConstants.VENICE_CLIENT_COMPUTE)) { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.of(storeName), - Optional.of(computePath.getRequestType()), + storeName, + computePath.getRequestType(), METHOD_NOT_ALLOWED, "Read compute is not enabled for the store. Please contact Venice team to enable the feature."); } @@ -263,18 +259,15 @@ public VenicePath parseResourceUri(String uri, HTTP_REQUEST request) throws Rout } } else { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.of(storeName), - Optional.empty(), + storeName, + null, BAD_REQUEST, "The passed in request must be either a GET or " + "be a POST with a resource type of " + TYPE_STORAGE + " or " + TYPE_COMPUTE + ", but instead it was: " + request.toString()); } } else { - throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.empty(), - Optional.empty(), - BAD_REQUEST, - "Method: " + method + " is not allowed"); + throw RouterExceptionAndTrackingUtils + .newRouterExceptionAndTracking(null, null, BAD_REQUEST, "Method: " + method + " is not allowed"); } RequestType requestType = path.getRequestType(); if (StreamingUtils.isStreamingEnabled(request)) { @@ -333,16 +326,15 @@ public VenicePath parseResourceUri(String uri, HTTP_REQUEST request) throws Rout aggRouterHttpRequestStats.recordRequest(storeName); aggRouterHttpRequestStats.recordRequestSize(storeName, path.getRequestSize()); } catch (VeniceException e) { - Optional requestTypeOptional = - (path == null) ? Optional.empty() : Optional.of(path.getRequestType()); + RequestType requestType = path == null ? null : path.getRequestType(); HttpResponseStatus responseStatus = BAD_REQUEST; if (e instanceof VeniceStoreIsMigratedException) { - requestTypeOptional = Optional.empty(); + requestType = null; responseStatus = MOVED_PERMANENTLY; } if (e instanceof VeniceKeyCountLimitException) { VeniceKeyCountLimitException keyCountLimitException = (VeniceKeyCountLimitException) e; - requestTypeOptional = Optional.of(keyCountLimitException.getRequestType()); + requestType = keyCountLimitException.getRequestType(); responseStatus = REQUEST_ENTITY_TOO_LARGE; routerStats.getStatsByType(keyCountLimitException.getRequestType()) .recordBadRequestKeyCount( @@ -353,7 +345,7 @@ public VenicePath parseResourceUri(String uri, HTTP_REQUEST request) throws Rout * Tracking the bad requests in {@link RouterExceptionAndTrackingUtils} by logging and metrics. */ throw RouterExceptionAndTrackingUtils - .newRouterExceptionAndTracking(Optional.of(storeName), requestTypeOptional, responseStatus, e.getMessage()); + .newRouterExceptionAndTracking(storeName, requestType, responseStatus, e.getMessage()); } finally { // Always record request usage in the single get stats, so we could compare it with the quota easily. // Right now we use key num as request usage, in the future we might consider the Capacity unit. @@ -366,8 +358,8 @@ public VenicePath parseResourceUri(String uri, HTTP_REQUEST request) throws Rout @Override public VenicePath parseResourceUri(String uri) throws RouterException { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.empty(), - Optional.empty(), + null, + null, BAD_REQUEST, "parseResourceUri without param: request should not be invoked"); } diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceResponseAggregator.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceResponseAggregator.java index 338d4b63b49..110469ab30c 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceResponseAggregator.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceResponseAggregator.java @@ -120,8 +120,8 @@ public VeniceResponseAggregator withComputeTardyThreshold(long timeout, TimeUnit public void initHelixGroupSelector(HelixGroupSelector helixGroupSelector) { if (this.helixGroupSelector != null) { throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking( - Optional.empty(), - Optional.empty(), + null, + null, INTERNAL_SERVER_ERROR, "HelixGroupSelector has already been initialized before, and no further update expected!"); } @@ -136,14 +136,11 @@ public FullHttpResponse buildResponse( @Nonnull List gatheredResponses) { if (gatheredResponses.isEmpty()) { throw RouterExceptionAndTrackingUtils - .newVeniceExceptionAndTracking(Optional.empty(), Optional.empty(), BAD_GATEWAY, "Received empty response!"); + .newVeniceExceptionAndTracking(null, null, BAD_GATEWAY, "Received empty response!"); } if (metrics == null) { - throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking( - Optional.empty(), - Optional.empty(), - INTERNAL_SERVER_ERROR, - "'metrics' should not be null"); + throw RouterExceptionAndTrackingUtils + .newVeniceExceptionAndTracking(null, null, INTERNAL_SERVER_ERROR, "'metrics' should not be null"); } VenicePath venicePath = metrics.getPath(); if (venicePath == null) { @@ -169,7 +166,7 @@ public FullHttpResponse buildResponse( } } catch (URISyntaxException e) { throw RouterExceptionAndTrackingUtils - .newVeniceExceptionAndTracking(Optional.empty(), Optional.empty(), BAD_REQUEST, "Failed to parse uri"); + .newVeniceExceptionAndTracking(null, null, BAD_REQUEST, "Failed to parse uri"); } return response; } @@ -211,11 +208,8 @@ public FullHttpResponse buildResponse( finalResponse = processComputeResponses(gatheredResponses, storeName, optionalHeaders); break; default: - throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking( - Optional.empty(), - Optional.empty(), - INTERNAL_SERVER_ERROR, - "Unknown request type: " + requestType); + throw RouterExceptionAndTrackingUtils + .newVeniceExceptionAndTracking(null, null, INTERNAL_SERVER_ERROR, "Unknown request type: " + requestType); } } stats.recordFanoutRequestCount(storeName, gatheredResponses.size()); @@ -349,11 +343,8 @@ private CompressionStrategy validateAndExtractCompressionStrategy( compressionStrategy.getValue(), responseCompression.getValue(), response.headers().toString()); - throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking( - Optional.of(storeName), - Optional.of(RequestType.MULTI_GET), - BAD_GATEWAY, - errorMsg); + throw RouterExceptionAndTrackingUtils + .newVeniceExceptionAndTracking(storeName, RequestType.MULTI_GET, BAD_GATEWAY, errorMsg); } } @@ -377,15 +368,15 @@ protected FullHttpResponse processComputeResponses( String currentValue = response.headers().get(headerName); if (currentValue == null) { throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking( - Optional.of(storeName), - Optional.of(RequestType.COMPUTE), + storeName, + RequestType.COMPUTE, BAD_GATEWAY, "Header: " + headerName + " is expected in compute sub-response"); } if (!headerValue.equals(currentValue)) { throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking( - Optional.of(storeName), - Optional.of(RequestType.COMPUTE), + storeName, + RequestType.COMPUTE, BAD_GATEWAY, "Incompatible header received for " + headerName + ", values: " + headerValue + ", " + currentValue); } @@ -448,15 +439,15 @@ protected FullHttpResponse processMultiGetResponses( String currentValue = response.headers().get(headerName); if (currentValue == null) { throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking( - Optional.of(storeName), - Optional.of(RequestType.MULTI_GET), + storeName, + RequestType.MULTI_GET, BAD_GATEWAY, "Header: " + headerName + " is expected in multi-get sub-response"); } if (!headerValue.equals(currentValue)) { throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking( - Optional.of(storeName), - Optional.of(RequestType.MULTI_GET), + storeName, + RequestType.MULTI_GET, BAD_GATEWAY, "Incompatible header received for " + headerName + ", values: " + headerValue + ", " + currentValue); } diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceResponseDecompressor.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceResponseDecompressor.java index a62a8469e73..874eb324abd 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceResponseDecompressor.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/VeniceResponseDecompressor.java @@ -26,7 +26,6 @@ import io.netty.handler.codec.http.HttpRequest; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Optional; import org.apache.avro.io.OptimizedBinaryDecoderFactory; @@ -178,8 +177,8 @@ private ByteBuffer decompressRecord( compressor = compressorFactory.getVersionSpecificCompressor(kafkaTopic); if (compressor == null) { throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking( - Optional.of(storeName), - Optional.of(requestType), + this.storeName, + requestType, SERVICE_UNAVAILABLE, "Compressor not available for resource " + kafkaTopic + ". Dictionary not downloaded."); } @@ -192,7 +191,7 @@ private ByteBuffer decompressRecord( String errorMsg = String .format("Failed to decompress data. Store: %s; Version: %d, error: %s", storeName, version, e.getMessage()); throw RouterExceptionAndTrackingUtils - .newVeniceExceptionAndTracking(Optional.of(storeName), Optional.of(requestType), BAD_GATEWAY, errorMsg); + .newVeniceExceptionAndTracking(this.storeName, requestType, BAD_GATEWAY, errorMsg); } } @@ -211,8 +210,8 @@ private ByteBuf decompressMultiGetRecords( compressor = compressorFactory.getVersionSpecificCompressor(kafkaTopic); if (compressor == null) { throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking( - Optional.of(storeName), - Optional.of(requestType), + this.storeName, + requestType, SERVICE_UNAVAILABLE, "Compressor not available for resource " + kafkaTopic + ". Dictionary not downloaded."); } @@ -226,7 +225,7 @@ private ByteBuf decompressMultiGetRecords( String errorMsg = String .format("Failed to decompress data. Store: %s; Version: %d, error: %s", storeName, version, e.getMessage()); throw RouterExceptionAndTrackingUtils - .newVeniceExceptionAndTracking(Optional.of(storeName), Optional.of(requestType), BAD_GATEWAY, errorMsg); + .newVeniceExceptionAndTracking(this.storeName, requestType, BAD_GATEWAY, errorMsg); } return Unpooled.wrappedBuffer(recordSerializer.serializeObjects(records)); diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceComputePath.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceComputePath.java index 7c9f22be3c2..53e64940312 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceComputePath.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceComputePath.java @@ -27,7 +27,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.function.BiConsumer; import javax.annotation.Nonnull; import org.apache.avro.Schema; @@ -110,8 +109,8 @@ public VeniceComputePath( int computeRequestVersion = Integer.parseInt(this.computeRequestVersionHeader); if (computeRequestVersion <= 0 || computeRequestVersion > LATEST_SCHEMA_VERSION_FOR_COMPUTE_REQUEST) { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.of(getStoreName()), - Optional.of(getRequestType()), + getStoreName(), + getRequestType(), BAD_REQUEST, "Compute API version " + computeRequestVersion + " is invalid. Latest version is " + LATEST_SCHEMA_VERSION_FOR_COMPUTE_REQUEST); @@ -133,8 +132,8 @@ public VeniceComputePath( computeRequestLengthInBytes = requestContent.length - decoder.inputStream().available(); } catch (IOException e) { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.of(getStoreName()), - Optional.of(getRequestType()), + getStoreName(), + getRequestType(), BAD_REQUEST, "Exception while getting available number of bytes in request content"); } diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceMultiGetPath.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceMultiGetPath.java index 172102adcdc..a0d64c697ac 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceMultiGetPath.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceMultiGetPath.java @@ -23,7 +23,6 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Map; -import java.util.Optional; import javax.annotation.Nonnull; import org.apache.avro.io.OptimizedBinaryDecoderFactory; @@ -66,8 +65,8 @@ public VeniceMultiGetPath( int apiVersion = Integer.parseInt(request.headers().get(HttpConstants.VENICE_API_VERSION)); if (apiVersion != EXPECTED_PROTOCOL.getProtocolVersion()) { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.of(getStoreName()), - Optional.of(getRequestType()), + getStoreName(), + getRequestType(), BAD_REQUEST, "Expected api version: " + EXPECTED_PROTOCOL.getProtocolVersion() + ", but received: " + apiVersion); } diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceMultiKeyPath.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceMultiKeyPath.java index 04581f32838..9ea8e4f1fe6 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceMultiKeyPath.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceMultiKeyPath.java @@ -21,7 +21,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nonnull; import org.apache.http.client.methods.HttpPost; @@ -102,8 +101,8 @@ public void initialize( partitioner = partitionFinder.findPartitioner(getStoreName(), getVersionNumber()); } catch (VeniceNoHelixResourceException e) { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTrackingResourceNotFound( - Optional.of(getStoreName()), - Optional.of(RequestType.COMPUTE), + getStoreName(), + RequestType.COMPUTE, e.getHttpResponseStatus(), e.getMessage()); } @@ -121,8 +120,8 @@ public void initialize( * If application is using Venice client to send batch-get request, this piece of logic shouldn't be triggered. */ throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.of(getStoreName()), - Optional.of(getRequestType()), + getStoreName(), + getRequestType(), BAD_REQUEST, "Key count in multi-get request should not be zero"); } @@ -160,8 +159,8 @@ public VenicePath substitutePartitionKey(RouterKey s) { K routerRequestKey = routerKeyMap.get(s); if (routerRequestKey == null) { throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking( - Optional.of(getStoreName()), - Optional.of(getRequestType()), + getStoreName(), + getRequestType(), BAD_GATEWAY, "RouterKey: " + s + " should exist in the original path"); } @@ -199,8 +198,8 @@ public VenicePath substitutePartitionKey(@Nonnull Collection s) { K routerRequestKey = routerKeyMap.get(key); if (routerRequestKey == null) { throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking( - Optional.of(getStoreName()), - Optional.of(getRequestType()), + getStoreName(), + getRequestType(), BAD_GATEWAY, "RouterKey: " + key + " should exist in the original path"); } diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceSingleGetPath.java b/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceSingleGetPath.java index 94a1bd0495d..868c9a549e7 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceSingleGetPath.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/api/path/VeniceSingleGetPath.java @@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Collections; -import java.util.Optional; import javax.annotation.Nonnull; import org.apache.commons.lang.StringUtils; import org.apache.http.client.methods.HttpGet; @@ -47,8 +46,8 @@ public VeniceSingleGetPath( super(storeName, versionNumber, resourceName, false, -1, retryManager); if (StringUtils.isEmpty(key)) { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTracking( - Optional.empty(), - Optional.empty(), + null, + null, BAD_REQUEST, "Request URI must have non-empty key. Uri is: " + uri); } @@ -70,8 +69,8 @@ public VeniceSingleGetPath( this.partition = partition; } catch (VeniceNoHelixResourceException e) { throw RouterExceptionAndTrackingUtils.newRouterExceptionAndTrackingResourceNotFound( - Optional.of(getStoreName()), - Optional.of(RequestType.SINGLE_GET), + getStoreName(), + RequestType.SINGLE_GET, e.getHttpResponseStatus(), e.getMessage()); } @@ -94,8 +93,8 @@ protected RequestType getStreamingRequestType() { public VenicePath substitutePartitionKey(RouterKey s) { if (!routerKey.equals(s)) { throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking( - Optional.of(getStoreName()), - Optional.of(getRequestType()), + getStoreName(), + getRequestType(), INTERNAL_SERVER_ERROR, "RouterKey: " + routerKey + " is expected, but received: " + s); } @@ -105,8 +104,8 @@ public VenicePath substitutePartitionKey(RouterKey s) { @Override public VenicePath substitutePartitionKey(@Nonnull Collection s) { throw RouterExceptionAndTrackingUtils.newVeniceExceptionAndTracking( - Optional.of(getStoreName()), - Optional.of(getRequestType()), + getStoreName(), + getRequestType(), INTERNAL_SERVER_ERROR, "substitutePartitionKey(@Nonnull Collection s) is not expected to be invoked for single-get request"); } diff --git a/services/venice-router/src/test/java/com/linkedin/venice/router/throttle/RouterRequestThrottlingTest.java b/services/venice-router/src/test/java/com/linkedin/venice/router/throttle/RouterRequestThrottlingTest.java index 05ba49ed96e..6e3671c30b1 100644 --- a/services/venice-router/src/test/java/com/linkedin/venice/router/throttle/RouterRequestThrottlingTest.java +++ b/services/venice-router/src/test/java/com/linkedin/venice/router/throttle/RouterRequestThrottlingTest.java @@ -4,10 +4,12 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertTrue; import com.linkedin.alpini.router.api.HostFinder; import com.linkedin.alpini.router.api.HostHealthMonitor; import com.linkedin.alpini.router.api.PartitionFinder; +import com.linkedin.alpini.router.api.RouterException; import com.linkedin.alpini.router.api.Scatter; import com.linkedin.alpini.router.api.ScatterGatherRequest; import com.linkedin.venice.helix.ZkRoutersClusterManager; @@ -72,6 +74,11 @@ public static Object[][] requestType() { return new Object[][] { { RequestType.MULTI_GET }, { RequestType.COMPUTE } }; } + /** A simple Runnable-like interface just to reduce boilerplate in the test... */ + private interface ScatterCall { + void run() throws RouterException; + } + @Test(timeOut = 30000, dataProvider = "multiGet_compute") public void testMultiKeyThrottling(RequestType requestType) throws Exception { // Allow 10 multi-key requests per second @@ -115,18 +122,20 @@ public void testMultiKeyThrottling(RequestType requestType) throws Exception { HostFinder hostFinder = mock(VeniceHostFinder.class); HostHealthMonitor hostHealthMonitor = mock(HostHealthMonitor.class); + ScatterCall scatterCall = () -> delegateMode.scatter( + scatter, + HttpMethod.POST.name(), + storeName + "_v1", + partitionFinder, + hostFinder, + hostHealthMonitor, + VeniceRole.REPLICA); + // The router shouldn't throttle any request if the multi-get QPS is below 10 for (int iter = 0; iter < 3; iter++) { for (int i = 0; i < allowedQPS; i++) { try { - delegateMode.scatter( - scatter, - HttpMethod.POST.name(), - storeName + "_v1", - partitionFinder, - hostFinder, - hostHealthMonitor, - VeniceRole.REPLICA); + scatterCall.run(); } catch (Exception e) { Assert.fail("router shouldn't throttle any multi-get requests if the QPS is below " + allowedQPS); } @@ -138,16 +147,11 @@ public void testMultiKeyThrottling(RequestType requestType) throws Exception { // Router should throttle the multi-get requests if QPS exceeds 10 boolean multiGetThrottled = false; - for (int i = 0; i < allowedQPS + 1; i++) { + int queriesSent = allowedQPS + 1; + long startTime = System.currentTimeMillis(); + for (int i = 0; i < queriesSent; i++) { try { - delegateMode.scatter( - scatter, - HttpMethod.POST.name(), - storeName + "_v1", - partitionFinder, - hostFinder, - hostHealthMonitor, - VeniceRole.REPLICA); + scatterCall.run(); } catch (Exception e) { multiGetThrottled = true; if (i < allowedQPS) { @@ -156,9 +160,38 @@ public void testMultiKeyThrottling(RequestType requestType) throws Exception { } } } + long elapsedTime = System.currentTimeMillis() - startTime; - // restore the throttler so that it doesn't affect the following test case - throttler.restoreAllThrottlers(); - Assert.assertTrue(multiGetThrottled); + if (!multiGetThrottled) { + int additionalQueriesNeeded = -1; + for (int i = 1; i < queriesSent * 10; i++) { + try { + scatterCall.run(); + } catch (Exception e) { + additionalQueriesNeeded = i; + break; + } + } + long totalElapsedTime = System.currentTimeMillis() - startTime; + if (additionalQueriesNeeded < 0) { + Assert.fail( + "Never triggered quota at all, even after sending 10x more than it should have needed! Original elapsed time: " + + elapsedTime + "; total elapsed time: " + totalElapsedTime); + } else { + /** + * N.B.: This test used to be flaky because it would be 1 request short of triggering the quota. It's not clear + * why it sometimes takes a little more, and sometimes doesn't (floating point arithmetic being wonky, perhaps?) + * but here we're adding a small tolerance threshold, while still ensuring that larger deviations still result + * in failure. + */ + int toleranceThreshold = 1; + assertTrue( + additionalQueriesNeeded <= toleranceThreshold, + "Should have triggered quota in " + queriesSent + " requests, but it took " + additionalQueriesNeeded + + " additional request(s) before finally triggering, which is higher than the tolerance threshold of " + + toleranceThreshold + ". Original elapsed time: " + elapsedTime + "; total elapsed time: " + + totalElapsedTime); + } + } } }