From 74b14cfbcbb533537ad3e20327e6cad1f928b39b Mon Sep 17 00:00:00 2001 From: thomasva Date: Mon, 18 Mar 2024 12:19:59 +0100 Subject: [PATCH] Add injection of custom executor service to S3Base supplyAsync calls --- .../main/java/io/minio/MinioAsyncClient.java | 17 +++++++++++++--- api/src/main/java/io/minio/S3Base.java | 20 ++++++++++++++----- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/api/src/main/java/io/minio/MinioAsyncClient.java b/api/src/main/java/io/minio/MinioAsyncClient.java index 405ecd589..bdcd555e9 100644 --- a/api/src/main/java/io/minio/MinioAsyncClient.java +++ b/api/src/main/java/io/minio/MinioAsyncClient.java @@ -76,6 +76,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; import java.util.regex.Matcher; import okhttp3.HttpUrl; import okhttp3.OkHttpClient; @@ -139,7 +141,8 @@ private MinioAsyncClient( boolean useVirtualStyle, String region, Provider provider, - OkHttpClient httpClient) { + OkHttpClient httpClient, + ExecutorService executorService) { super( baseUrl, awsS3Prefix, @@ -148,7 +151,8 @@ private MinioAsyncClient( useVirtualStyle, region, provider, - httpClient); + httpClient, + executorService); } protected MinioAsyncClient(MinioAsyncClient client) { @@ -3221,6 +3225,7 @@ public static final class Builder { private String region; private Provider provider; private OkHttpClient httpClient; + private ExecutorService executorService = ForkJoinPool.commonPool(); private void setAwsInfo(String host, boolean https) { this.awsS3Prefix = null; @@ -3327,6 +3332,11 @@ public Builder httpClient(OkHttpClient httpClient) { return this; } + public Builder executorService(ExecutorService executorService) { + this.executorService = executorService; + return this; + } + public MinioAsyncClient build() { HttpUtils.validateNotNull(this.baseUrl, "endpoint"); @@ -3352,7 +3362,8 @@ public MinioAsyncClient build() { useVirtualStyle, region, provider, - httpClient); + httpClient, + executorService); } } } diff --git a/api/src/main/java/io/minio/S3Base.java b/api/src/main/java/io/minio/S3Base.java index f9aca6086..5991eb635 100644 --- a/api/src/main/java/io/minio/S3Base.java +++ b/api/src/main/java/io/minio/S3Base.java @@ -84,6 +84,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -136,6 +137,7 @@ public abstract class S3Base { protected String region; protected Provider provider; protected OkHttpClient httpClient; + private final ExecutorService executorService; protected S3Base( HttpUrl baseUrl, @@ -145,7 +147,8 @@ protected S3Base( boolean useVirtualStyle, String region, Provider provider, - OkHttpClient httpClient) { + OkHttpClient httpClient, + ExecutorService executorService) { this.baseUrl = baseUrl; this.awsS3Prefix = awsS3Prefix; this.awsDomainSuffix = awsDomainSuffix; @@ -154,6 +157,7 @@ protected S3Base( this.region = region; this.provider = provider; this.httpClient = httpClient; + this.executorService = executorService; } /** @deprecated This method is no longer supported. */ @@ -167,7 +171,8 @@ protected S3Base( boolean isDualStackHost, boolean useVirtualStyle, Provider provider, - OkHttpClient httpClient) { + OkHttpClient httpClient, + ExecutorService executorService) { this.baseUrl = baseUrl; if (isAwsHost) this.awsS3Prefix = "s3."; if (isFipsHost) this.awsS3Prefix = "s3-fips."; @@ -182,6 +187,7 @@ protected S3Base( this.region = region; this.provider = provider; this.httpClient = httpClient; + this.executorService = executorService; } protected S3Base(S3Base client) { @@ -193,6 +199,7 @@ protected S3Base(S3Base client) { this.region = client.region; this.provider = client.provider; this.httpClient = client.httpClient; + this.executorService = client.executorService; } /** Check whether argument is valid or not. */ @@ -1135,7 +1142,8 @@ protected CompletableFuture calculatePartCountAsync(List long[] objectSize = {0}; int index = 0; - CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> 0); + CompletableFuture completableFuture = + CompletableFuture.supplyAsync(() -> 0, executorService); for (ComposeSource src : sources) { index++; final int i = index; @@ -2854,7 +2862,8 @@ private CompletableFuture putMultipartObjectAsync( } } return response; - }); + }, + executorService); } /** @@ -2900,7 +2909,8 @@ protected CompletableFuture putObjectAsync( } catch (NoSuchAlgorithmException | IOException e) { throw new CompletionException(e); } - }) + }, + executorService) .thenCompose( partSource -> { try {