Skip to content

Added UploadDirectoryRequest.Builder#uploadRequestTransformer that al… #2799

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.AfterClass;
Expand Down Expand Up @@ -124,6 +125,26 @@ public void uploadDirectory_withDelimiter_filesSentCorrectly() {
});
}

@Test
public void uploadDirectory_withRequestTransformer_usesRequestTransformer() throws Exception {
String prefix = "requestTransformerTest";
Path newSourceForEachUpload = Paths.get(directory.toString(), "bar.txt");

CompletedDirectoryUpload result =
tm.uploadDirectory(r -> r.sourceDirectory(directory)
.bucket(TEST_BUCKET)
.prefix(prefix)
.overrideConfiguration(c -> c.uploadFileRequestTransformer(f -> f.source(newSourceForEachUpload))
.recursive(true)))
.completionFuture()
.get(10, TimeUnit.SECONDS);
assertThat(result.failedTransfers()).isEmpty();

s3Client.listObjectsV2Paginator(b -> b.bucket(TEST_BUCKET).prefix(prefix)).contents().forEach(object -> {
verifyContent(object.key(), "bar.txt" + randomString);
});
}

private static Path createLocalTestDirectory() throws IOException {
Path directory = Files.createTempDirectory("test");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkPreviewApi;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.utils.ToString;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.builder.CopyableBuilder;
Expand All @@ -39,11 +41,13 @@ public final class UploadDirectoryOverrideConfiguration implements ToCopyableBui
private final Boolean followSymbolicLinks;
private final Integer maxDepth;
private final Boolean recursive;
private final Consumer<UploadFileRequest.Builder> uploadFileRequestTransformer;

public UploadDirectoryOverrideConfiguration(DefaultBuilder builder) {
this.followSymbolicLinks = builder.followSymbolicLinks;
this.maxDepth = Validate.isPositiveOrNull(builder.maxDepth, "maxDepth");
this.recursive = builder.recursive;
this.uploadFileRequestTransformer = builder.uploadFileRequestTransformer;
}

/**
Expand All @@ -70,6 +74,14 @@ public Optional<Boolean> recursive() {
return Optional.ofNullable(recursive);
}

/**
* @return the optional upload request transformer
* @see UploadDirectoryOverrideConfiguration.Builder#uploadFileRequestTransformer(Consumer)
*/
public Optional<Consumer<UploadFileRequest.Builder>> uploadFileRequestTransformer() {
return Optional.ofNullable(uploadFileRequestTransformer);
}

@Override
public Builder toBuilder() {
return new DefaultBuilder(this);
Expand All @@ -92,6 +104,9 @@ public boolean equals(Object o) {
if (!Objects.equals(maxDepth, that.maxDepth)) {
return false;
}
if (!Objects.equals(uploadFileRequestTransformer, that.uploadFileRequestTransformer)) {
return false;
}
return Objects.equals(recursive, that.recursive);
}

Expand All @@ -100,6 +115,7 @@ public int hashCode() {
int result = followSymbolicLinks != null ? followSymbolicLinks.hashCode() : 0;
result = 31 * result + (maxDepth != null ? maxDepth.hashCode() : 0);
result = 31 * result + (recursive != null ? recursive.hashCode() : 0);
result = 31 * result + (uploadFileRequestTransformer != null ? uploadFileRequestTransformer.hashCode() : 0);
return result;
}

Expand All @@ -109,6 +125,7 @@ public String toString() {
.add("followSymbolicLinks", followSymbolicLinks)
.add("maxDepth", maxDepth)
.add("recursive", recursive)
.add("uploadFileRequestTransformer", uploadFileRequestTransformer)
.build();
}

Expand Down Expand Up @@ -156,6 +173,53 @@ public interface Builder extends CopyableBuilder<Builder, UploadDirectoryOverrid
*/
Builder maxDepth(Integer maxDepth);

/**
* Specify a function used to transform the {@link UploadFileRequest}s generated by this {@link UploadDirectoryRequest}.
* The provided function is called once for each file that is uploaded, allowing you to modify the paths resolved by
* TransferManager on a per-file basis, modify the created {@link PutObjectRequest} before it is passed to S3, or
* configure a {@link TransferRequestOverrideConfiguration}.
*
* <p>The factory receives the {@link UploadFileRequest}s created by Transfer Manager for each file in the directory
* being uploaded, and returns a (potentially modified) {@code UploadFileRequest}.
*
* <p>
* <b>Usage Example:</b>
* <pre>
* {@code
* // Add a LoggingTransferListener to every transfer within the upload directory request
* TransferRequestOverrideConfiguration fileUploadConfiguration =
* TransferRequestOverrideConfiguration.builder()
* .addListener(LoggingTransferListener.create())
Copy link
Contributor

@Bennett-Lynch Bennett-Lynch Nov 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of scope of this PR but worth noting: This works, but it's going to result in using the same listener for every invocation, when the current LoggingTransferListener implementation assumes that it's not going to be reused. We've so far refrained from adding listeners at the client-level for this reason. A user could implement a reusable listener by internally mapping something like TransferRequest to internal listener state, but this is a lot of complexity. I wonder if we should change listeners to be declared with a Supplier instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, yeah, seems like it's easy to make that mistake, then. I certainly thought this would be fine. What do you recommend I do for the purposes of this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we moved the LoggingTransferListener.create() call to be inside your lambda, it would create a new one for every request and work as expected, but that may make the code less readable. If we can update the example to do that, without severely affecting legibility, then I would recommend that. Otherwise I think it's okay to use your example as-is and we can do a fast-follow-up on whether to accept a Supplier or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would unfortuantely be very difficult to make it readable with the lambdas. I'll leave it as-is and we can move it to a supplier asynchronously.

* .build();
*
* UploadDirectoryOverrideConfiguration directoryUploadConfiguration =
* UploadDirectoryOverrideConfiguration.builder()
* .uploadFileRequestTransformer(request -> request.overrideConfiguration(fileUploadConfiguration))
* .build();
*
* UploadDirectoryRequest request =
* UploadDirectoryRequest.builder()
* .sourceDirectory(Paths.get("."))
* .bucket("bucket")
* .prefix("prefix")
* .overrideConfiguration(directoryUploadConfiguration)
* .build()
*
* UploadDirectoryTransfer uploadDirectory = transferManager.uploadDirectory(request);
*
* // Wait for the transfer to complete
* CompletedUploadDirectory completedUploadDirectory = uploadDirectory.completionFuture().join();
*
* // Print out the failed uploads
* completedUploadDirectory.failedUploads().forEach(System.out::println);
* }
* </pre>
*
* @param uploadFileRequestTransformer A transformer to use for modifying the file-level upload requests before execution
* @return This builder for method chaining
*/
Builder uploadFileRequestTransformer(Consumer<UploadFileRequest.Builder> uploadFileRequestTransformer);

@Override
UploadDirectoryOverrideConfiguration build();
}
Expand All @@ -164,11 +228,13 @@ private static final class DefaultBuilder implements Builder {
private Boolean followSymbolicLinks;
private Integer maxDepth;
private Boolean recursive;
private Consumer<UploadFileRequest.Builder> uploadFileRequestTransformer;

private DefaultBuilder(UploadDirectoryOverrideConfiguration configuration) {
this.followSymbolicLinks = configuration.followSymbolicLinks;
this.maxDepth = configuration.maxDepth;
this.recursive = configuration.recursive;
this.uploadFileRequestTransformer = configuration.uploadFileRequestTransformer;
}

private DefaultBuilder() {
Expand Down Expand Up @@ -216,6 +282,20 @@ public Integer getMaxDepth() {
return maxDepth;
}

@Override
public Builder uploadFileRequestTransformer(Consumer<UploadFileRequest.Builder> uploadFileRequestTransformer) {
this.uploadFileRequestTransformer = uploadFileRequestTransformer;
return this;
}

public Consumer<UploadFileRequest.Builder> getUploadFileRequestTransformer() {
return uploadFileRequestTransformer;
}

public void setUploadFileRequestTransformer(Consumer<UploadFileRequest.Builder> uploadFileRequestTransformer) {
this.uploadFileRequestTransformer = uploadFileRequestTransformer;
}

@Override
public UploadDirectoryOverrideConfiguration build() {
return new UploadDirectoryOverrideConfiguration(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ public String toString() {
}

public interface Builder extends CopyableBuilder<Builder, UploadDirectoryRequest> {

/**
* Specify the source directory to upload. The source directory must exist.
* Fle wildcards are not supported and treated literally. Hidden files/directories are visited.
Expand Down Expand Up @@ -261,6 +260,7 @@ default Builder overrideConfiguration(Consumer<UploadDirectoryOverrideConfigurat
UploadDirectoryRequest build();
}


private static final class DefaultBuilder implements Builder {

private Path sourceDirectory;
Expand All @@ -277,6 +277,7 @@ private DefaultBuilder(UploadDirectoryRequest request) {
this.bucket = request.bucket;
this.prefix = request.prefix;
this.configuration = request.overrideConfiguration;
this.delimiter = request.delimiter;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import software.amazon.awssdk.transfer.s3.FailedFileUpload;
import software.amazon.awssdk.transfer.s3.FileUpload;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.UploadDirectoryOverrideConfiguration;
import software.amazon.awssdk.transfer.s3.UploadDirectoryRequest;
import software.amazon.awssdk.transfer.s3.UploadFileRequest;
import software.amazon.awssdk.utils.CompletableFutureUtils;
Expand Down Expand Up @@ -230,9 +231,16 @@ private UploadFileRequest constructUploadRequest(UploadDirectoryRequest uploadDi
.bucket(uploadDirectoryRequest.bucket())
.key(key)
.build();
return UploadFileRequest.builder()
.source(path)
.putObjectRequest(putObjectRequest)
.build();

UploadFileRequest.Builder requestBuilder = UploadFileRequest.builder()
.source(path)
.putObjectRequest(putObjectRequest);

uploadDirectoryRequest.overrideConfiguration()
.flatMap(UploadDirectoryOverrideConfiguration::uploadFileRequestTransformer)
.ifPresent(c -> c.accept(requestBuilder));

return requestBuilder.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -36,12 +38,16 @@
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.transfer.s3.CompletedDirectoryUpload;
import software.amazon.awssdk.transfer.s3.CompletedFileUpload;
import software.amazon.awssdk.transfer.s3.DirectoryUpload;
import software.amazon.awssdk.transfer.s3.FileUpload;
import software.amazon.awssdk.transfer.s3.TransferRequestOverrideConfiguration;
import software.amazon.awssdk.transfer.s3.UploadDirectoryOverrideConfiguration;
import software.amazon.awssdk.transfer.s3.UploadDirectoryRequest;
import software.amazon.awssdk.transfer.s3.UploadFileRequest;
import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgress;
Expand Down Expand Up @@ -156,6 +162,59 @@ public void uploadDirectory_partialSuccess_shouldProvideFailedUploads() throws E
assertThat(completedDirectoryUpload.failedTransfers().iterator().next().request().source().toString()).isEqualTo("test/2");
}

@Test
public void uploadDirectory_withRequestTransformer_usesRequestTransformer() throws Exception {
PutObjectResponse putObjectResponse = PutObjectResponse.builder().eTag("1234").build();
CompletedFileUpload completedFileUpload = CompletedFileUpload.builder().response(putObjectResponse).build();
CompletableFuture<CompletedFileUpload> successfulFuture = new CompletableFuture<>();

FileUpload upload = newUpload(successfulFuture);
successfulFuture.complete(completedFileUpload);

PutObjectResponse putObjectResponse2 = PutObjectResponse.builder().eTag("5678").build();
CompletedFileUpload completedFileUpload2 = CompletedFileUpload.builder().response(putObjectResponse2).build();
CompletableFuture<CompletedFileUpload> failedFuture = new CompletableFuture<>();
FileUpload upload2 = newUpload(failedFuture);
failedFuture.complete(completedFileUpload2);

ArgumentCaptor<UploadFileRequest> uploadRequestCaptor = ArgumentCaptor.forClass(UploadFileRequest.class);

when(singleUploadFunction.apply(uploadRequestCaptor.capture())).thenReturn(upload, upload2);

Path newSource = Paths.get("/new/path");
PutObjectRequest newPutObjectRequest = PutObjectRequest.builder().build();
TransferRequestOverrideConfiguration newOverrideConfig = TransferRequestOverrideConfiguration.builder()
.build();

UploadDirectoryOverrideConfiguration uploadConfig =
UploadDirectoryOverrideConfiguration.builder()
.uploadFileRequestTransformer(r -> r.source(newSource)
.putObjectRequest(newPutObjectRequest)
.overrideConfiguration(newOverrideConfig))
.build();

uploadDirectoryHelper.uploadDirectory(UploadDirectoryRequest.builder()
.sourceDirectory(directory)
.bucket("bucket")
.overrideConfiguration(uploadConfig)
.build())
.completionFuture()
.get(5, TimeUnit.SECONDS);

List<UploadFileRequest> uploadRequests = uploadRequestCaptor.getAllValues();
assertThat(uploadRequests).hasSize(2);
assertThat(uploadRequests).element(0).satisfies(r -> {
assertThat(r.source()).isEqualTo(newSource);
assertThat(r.putObjectRequest()).isEqualTo(newPutObjectRequest);
assertThat(r.overrideConfiguration()).hasValue(newOverrideConfig);
});
assertThat(uploadRequests).element(1).satisfies(r -> {
assertThat(r.source()).isEqualTo(newSource);
assertThat(r.putObjectRequest()).isEqualTo(newPutObjectRequest);
assertThat(r.overrideConfiguration()).hasValue(newOverrideConfig);
});
}

private FileUpload newUpload(CompletableFuture<CompletedFileUpload> future) {
return new DefaultFileUpload(future,
new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder().build())
Expand Down