Skip to content

Commit

Permalink
Add injection of custom executor service to S3Base supplyAsync calls
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasva committed Mar 18, 2024
1 parent 40b054f commit 74b14cf
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 8 deletions.
17 changes: 14 additions & 3 deletions api/src/main/java/io/minio/MinioAsyncClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.regex.Matcher;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
Expand Down Expand Up @@ -139,7 +141,8 @@ private MinioAsyncClient(
boolean useVirtualStyle,
String region,
Provider provider,
OkHttpClient httpClient) {
OkHttpClient httpClient,
ExecutorService executorService) {
super(
baseUrl,
awsS3Prefix,
Expand All @@ -148,7 +151,8 @@ private MinioAsyncClient(
useVirtualStyle,
region,
provider,
httpClient);
httpClient,
executorService);
}

protected MinioAsyncClient(MinioAsyncClient client) {
Expand Down Expand Up @@ -3221,6 +3225,7 @@ public static final class Builder {
private String region;
private Provider provider;
private OkHttpClient httpClient;
private ExecutorService executorService = ForkJoinPool.commonPool();

private void setAwsInfo(String host, boolean https) {
this.awsS3Prefix = null;
Expand Down Expand Up @@ -3327,6 +3332,11 @@ public Builder httpClient(OkHttpClient httpClient) {
return this;
}

public Builder executorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
}

public MinioAsyncClient build() {
HttpUtils.validateNotNull(this.baseUrl, "endpoint");

Expand All @@ -3352,7 +3362,8 @@ public MinioAsyncClient build() {
useVirtualStyle,
region,
provider,
httpClient);
httpClient,
executorService);
}
}
}
20 changes: 15 additions & 5 deletions api/src/main/java/io/minio/S3Base.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -136,6 +137,7 @@ public abstract class S3Base {
protected String region;
protected Provider provider;
protected OkHttpClient httpClient;
private final ExecutorService executorService;

protected S3Base(
HttpUrl baseUrl,
Expand All @@ -145,7 +147,8 @@ protected S3Base(
boolean useVirtualStyle,
String region,
Provider provider,
OkHttpClient httpClient) {
OkHttpClient httpClient,
ExecutorService executorService) {
this.baseUrl = baseUrl;
this.awsS3Prefix = awsS3Prefix;
this.awsDomainSuffix = awsDomainSuffix;
Expand All @@ -154,6 +157,7 @@ protected S3Base(
this.region = region;
this.provider = provider;
this.httpClient = httpClient;
this.executorService = executorService;
}

/** @deprecated This method is no longer supported. */
Expand All @@ -167,7 +171,8 @@ protected S3Base(
boolean isDualStackHost,
boolean useVirtualStyle,
Provider provider,
OkHttpClient httpClient) {
OkHttpClient httpClient,
ExecutorService executorService) {
this.baseUrl = baseUrl;
if (isAwsHost) this.awsS3Prefix = "s3.";
if (isFipsHost) this.awsS3Prefix = "s3-fips.";
Expand All @@ -182,6 +187,7 @@ protected S3Base(
this.region = region;
this.provider = provider;
this.httpClient = httpClient;
this.executorService = executorService;
}

protected S3Base(S3Base client) {
Expand All @@ -193,6 +199,7 @@ protected S3Base(S3Base client) {
this.region = client.region;
this.provider = client.provider;
this.httpClient = client.httpClient;
this.executorService = client.executorService;
}

/** Check whether argument is valid or not. */
Expand Down Expand Up @@ -1135,7 +1142,8 @@ protected CompletableFuture<Integer> calculatePartCountAsync(List<ComposeSource>
long[] objectSize = {0};
int index = 0;

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 0);
CompletableFuture<Integer> completableFuture =
CompletableFuture.supplyAsync(() -> 0, executorService);
for (ComposeSource src : sources) {
index++;
final int i = index;
Expand Down Expand Up @@ -2854,7 +2862,8 @@ private CompletableFuture<ObjectWriteResponse> putMultipartObjectAsync(
}
}
return response;
});
},
executorService);
}

/**
Expand Down Expand Up @@ -2900,7 +2909,8 @@ protected CompletableFuture<ObjectWriteResponse> putObjectAsync(
} catch (NoSuchAlgorithmException | IOException e) {
throw new CompletionException(e);
}
})
},
executorService)
.thenCompose(
partSource -> {
try {
Expand Down

0 comments on commit 74b14cf

Please sign in to comment.