Skip to content

Commit

Permalink
HADOOP-18925. S3A: option to enable/disable CopyFromLocalOperation (#…
Browse files Browse the repository at this point in the history
…6163)


Add a new option:
fs.s3a.optimized.copy.from.local.enabled

This will enable (default) or disable the
optimized CopyFromLocalOperation upload operation
when copyFromLocalFile() is invoked.

When false the superclass implementation is used; duration
statistics are still collected, though audit span entries
in logs will be for the individual fs operations, not the
overall operation.

Contributed by Steve Loughran
  • Loading branch information
steveloughran authored Nov 6, 2023
1 parent 077263d commit ef7fb64
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1347,4 +1347,17 @@ private Constants() {
*/
public static final boolean DIRECTORY_OPERATIONS_PURGE_UPLOADS_DEFAULT = false;


/**
* Is the higher performance copy from local file to S3 enabled?
* This switch allows for it to be disabled if there are problems.
* Value: {@value}.
*/
public static final String OPTIMIZED_COPY_FROM_LOCAL = "fs.s3a.optimized.copy.from.local.enabled";

/**
* Default value for {@link #OPTIMIZED_COPY_FROM_LOCAL}.
* Value: {@value}.
*/
public static final boolean OPTIMIZED_COPY_FROM_LOCAL_DEFAULT = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private String scheme = FS_S3A;

/**
* Flag to indicate that the higher performance copyFromLocalFile implementation
* should be used.
*/
private boolean optimizedCopyFromLocal;

/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
Expand Down Expand Up @@ -696,6 +702,9 @@ public void initialize(URI name, Configuration originalConf)
AWS_S3_VECTOR_ACTIVE_RANGE_READS, DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS, 1);
vectoredIOContext = populateVectoredIOContext(conf);
scheme = (this.uri != null && this.uri.getScheme() != null) ? this.uri.getScheme() : FS_S3A;
optimizedCopyFromLocal = conf.getBoolean(OPTIMIZED_COPY_FROM_LOCAL,
OPTIMIZED_COPY_FROM_LOCAL_DEFAULT);
LOG.debug("Using optimized copyFromLocal implementation: {}", optimizedCopyFromLocal);
} catch (SdkException e) {
// amazon client exception: stop all services then throw the translation
cleanupWithLogger(LOG, span);
Expand Down Expand Up @@ -4021,45 +4030,69 @@ private boolean s3Exists(final Path path, final Set<StatusProbeEnum> probes)
* the given dst name.
*
* This version doesn't need to create a temporary file to calculate the md5.
* Sadly this doesn't seem to be used by the shell cp :(
* If {@link Constants#OPTIMIZED_COPY_FROM_LOCAL} is set to false,
* the superclass implementation is used.
*
* delSrc indicates if the source should be removed
* @param delSrc whether to delete the src
* @param overwrite whether to overwrite an existing file
* @param src path
* @param dst path
* @throws IOException IO problem
* @throws FileAlreadyExistsException the destination file exists and
* overwrite==false
* @throws SdkException failure in the AWS SDK
*/
@Override
@AuditEntryPoint
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
Path dst) throws IOException {
checkNotClosed();
LOG.debug("Copying local file from {} to {}", src, dst);
trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst,
() -> new CopyFromLocalOperation(
createStoreContext(),
src,
dst,
delSrc,
overwrite,
createCopyFromLocalCallbacks()).execute());
LOG.debug("Copying local file from {} to {} (delSrc={} overwrite={}",
src, dst, delSrc, overwrite);
if (optimizedCopyFromLocal) {
trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () ->
new CopyFromLocalOperation(
createStoreContext(),
src,
dst,
delSrc,
overwrite,
createCopyFromLocalCallbacks(getActiveAuditSpan()))
.execute());
} else {
// call the superclass, but still count statistics.
// there is no overall span here, as each FS API call will
// be in its own span.
LOG.debug("Using base copyFromLocalFile implementation");
trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () -> {
super.copyFromLocalFile(delSrc, overwrite, src, dst);
return null;
});
}
}

/**
* Create the CopyFromLocalCallbacks;
* protected to assist in mocking.
* @param span audit span.
* @return the callbacks
* @throws IOException failure to get the local fs.
*/
protected CopyFromLocalOperation.CopyFromLocalOperationCallbacks
createCopyFromLocalCallbacks() throws IOException {
createCopyFromLocalCallbacks(final AuditSpanS3A span) throws IOException {
LocalFileSystem local = getLocal(getConf());
return new CopyFromLocalCallbacksImpl(local);
return new CopyFromLocalCallbacksImpl(span, local);
}

protected final class CopyFromLocalCallbacksImpl implements
CopyFromLocalOperation.CopyFromLocalOperationCallbacks {

/** Span to use for all operations. */
private final AuditSpanS3A span;
private final LocalFileSystem local;

private CopyFromLocalCallbacksImpl(LocalFileSystem local) {
private CopyFromLocalCallbacksImpl(final AuditSpanS3A span,
LocalFileSystem local) {
this.span = span;
this.local = local;
}

Expand All @@ -4081,20 +4114,18 @@ public boolean deleteLocal(Path path, boolean recursive) throws IOException {

@Override
public void copyLocalFileFromTo(File file, Path from, Path to) throws IOException {
trackDurationAndSpan(
OBJECT_PUT_REQUESTS,
to,
() -> {
final String key = pathToKey(to);
Progressable progress = null;
PutObjectRequest.Builder putObjectRequestBuilder =
newPutObjectRequestBuilder(key, file.length(), false);
S3AFileSystem.this.invoker.retry("putObject(" + "" + ")", to.toString(), true,
() -> executePut(putObjectRequestBuilder.build(), progress, putOptionsForPath(to),
file));

return null;
});
// the duration of the put is measured, but the active span is the
// constructor-supplied one -this ensures all audit log events are grouped correctly
span.activate();
trackDuration(getDurationTrackerFactory(), OBJECT_PUT_REQUESTS.getSymbol(), () -> {
final String key = pathToKey(to);
PutObjectRequest.Builder putObjectRequestBuilder =
newPutObjectRequestBuilder(key, file.length(), false);
final String dest = to.toString();
S3AFileSystem.this.invoker.retry("putObject(" + dest + ")", dest, true, () ->
executePut(putObjectRequestBuilder.build(), null, putOptionsForPath(to), file));
return null;
});
}

@Override
Expand Down Expand Up @@ -5399,6 +5430,10 @@ public boolean hasPathCapability(final Path path, final String capability)
case FS_S3A_CREATE_PERFORMANCE_ENABLED:
return performanceCreation;

// is the optimized copy from local enabled.
case OPTIMIZED_COPY_FROM_LOCAL:
return optimizedCopyFromLocal;

default:
return super.hasPathCapability(p, cap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1544,3 +1544,13 @@ software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP

When this happens, try to set `fs.s3a.connection.request.timeout` to a larger value or disable it
completely by setting it to `0`.

### <a name="debug-switches"></a> Debugging Switches

There are some switches which can be set to enable/disable features and assist
in isolating problems and at least make them "go away".


| Key | Default | Action |
|------|---------|----------|
| `fs.s3a.optimized.copy.from.local.enabled` | `true` | [HADOOP-18925](https://issues.apache.org/jira/browse/HADOOP-18925) enable/disable CopyFromLocalOperation. Also a path capability. |
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,76 @@
package org.apache.hadoop.fs.s3a;

import java.io.File;
import java.util.Arrays;
import java.util.Collection;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractCopyFromLocalTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.s3a.S3AContract;

import org.apache.hadoop.fs.Path;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.apache.hadoop.fs.s3a.Constants.OPTIMIZED_COPY_FROM_LOCAL;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;

/**
* Test copying files from the local filesystem to S3A.
* Parameterized on whether or not the optimized
* copyFromLocalFile is enabled.
*/
@RunWith(Parameterized.class)
public class ITestS3ACopyFromLocalFile extends
AbstractContractCopyFromLocalTest {
/**
* Parameterization.
*/
@Parameterized.Parameters(name = "enabled={0}")
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{true},
{false},
});
}
private final boolean enabled;

public ITestS3ACopyFromLocalFile(final boolean enabled) {
this.enabled = enabled;
}

@Override
protected Configuration createConfiguration() {
final Configuration conf = super.createConfiguration();

removeBaseAndBucketOverrides(getTestBucketName(conf), conf,
OPTIMIZED_COPY_FROM_LOCAL);
conf.setBoolean(OPTIMIZED_COPY_FROM_LOCAL, enabled);
disableFilesystemCaching(conf);
return conf;
}

@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}

@Test
public void testOptionPropagation() throws Throwable {
Assertions.assertThat(getFileSystem().hasPathCapability(new Path("/"),
OPTIMIZED_COPY_FROM_LOCAL))
.describedAs("path capability of %s", OPTIMIZED_COPY_FROM_LOCAL)
.isEqualTo(enabled);

}

@Test
public void testLocalFilesOnly() throws Throwable {
describe("Copying into other file systems must fail");
Expand Down

0 comments on commit ef7fb64

Please sign in to comment.