Skip to content

Add S3CrtAsyncClient as MultipartUpload Client #90

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>software.amazon.awssdk.crt</groupId>
<artifactId>aws-crt</artifactId>
<version>0.21.5</version>
<optional>true</optional>
</dependency>

<dependency>
<groupId>net.jcip</groupId>
<artifactId>jcip-annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,21 @@
public class S3AsyncEncryptionClient implements S3AsyncClient {

private final S3AsyncClient _wrappedClient;
private final S3AsyncClient _wrappedCrtClient;
private final CryptographicMaterialsManager _cryptoMaterialsManager;
private final SecureRandom _secureRandom;
private final boolean _enableLegacyWrappingAlgorithms;
private final boolean _enableLegacyUnauthenticatedModes;
private final boolean _enableDelayedAuthenticationMode;
private final boolean _enableMultipartPutObject;

private S3AsyncEncryptionClient(Builder builder) {
_wrappedClient = builder._wrappedClient;
_wrappedCrtClient = builder._wrappedCrtClient;
_cryptoMaterialsManager = builder._cryptoMaterialsManager;
_secureRandom = builder._secureRandom;
_enableLegacyWrappingAlgorithms = builder._enableLegacyWrappingAlgorithms;
_enableMultipartPutObject = builder._enableMultipartPutObject;
_enableLegacyUnauthenticatedModes = builder._enableLegacyUnauthenticatedModes;
_enableDelayedAuthenticationMode = builder._enableDelayedAuthenticationMode;
}
Expand All @@ -69,6 +73,8 @@ public CompletableFuture<PutObjectResponse> putObject(PutObjectRequest putObject
throws AwsServiceException, SdkClientException {
PutEncryptedObjectPipeline pipeline = PutEncryptedObjectPipeline.builder()
.s3AsyncClient(_wrappedClient)
.crtClient(_wrappedCrtClient)
.enableMultipartPutObject(_enableMultipartPutObject)
.cryptoMaterialsManager(_cryptoMaterialsManager)
.secureRandom(_secureRandom)
.build();
Expand Down Expand Up @@ -130,13 +136,15 @@ public void close() {
// TODO: The async / non-async clients can probably share a builder - revisit after implementing async
public static class Builder {
private S3AsyncClient _wrappedClient = S3AsyncClient.builder().build();
private S3AsyncClient _wrappedCrtClient = null;
private CryptographicMaterialsManager _cryptoMaterialsManager;
private Keyring _keyring;
private SecretKey _aesKey;
private PartialRsaKeyPair _rsaKeyPair;
private String _kmsKeyId;
private boolean _enableLegacyWrappingAlgorithms = false;
private boolean _enableLegacyUnauthenticatedModes = false;
private boolean _enableMultipartPutObject = false;
private boolean _enableDelayedAuthenticationMode = false;
private Provider _cryptoProvider = null;
private SecureRandom _secureRandom = new SecureRandom();
Expand All @@ -153,7 +161,8 @@ public Builder wrappedClient(S3AsyncClient wrappedClient) {
if (wrappedClient instanceof S3AsyncEncryptionClient) {
throw new S3EncryptionClientException("Cannot use S3EncryptionClient as wrapped client");
}

// Initializes only when wrappedAsyncClient is configured by user.
this._wrappedCrtClient = wrappedClient;
this._wrappedClient = wrappedClient;
return this;
}
Expand Down Expand Up @@ -239,6 +248,11 @@ public Builder enableDelayedAuthenticationMode(boolean shouldEnableDelayedAuthen
return this;
}

public Builder enableMultipartPutObject(boolean _enableMultipartPutObject) {
this._enableMultipartPutObject = _enableMultipartPutObject;
return this;
}

public Builder cryptoProvider(Provider cryptoProvider) {
this._cryptoProvider = cryptoProvider;
return this;
Expand Down
141 changes: 23 additions & 118 deletions src/main/java/software/amazon/encryption/s3/S3EncryptionClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
Expand All @@ -32,33 +31,25 @@
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.encryption.s3.internal.GetEncryptedObjectPipeline;
import software.amazon.encryption.s3.internal.MultiFileOutputStream;
import software.amazon.encryption.s3.internal.MultipartUploadObjectPipeline;
import software.amazon.encryption.s3.internal.PutEncryptedObjectPipeline;
import software.amazon.encryption.s3.internal.UploadObjectObserver;
import software.amazon.encryption.s3.materials.AesKeyring;
import software.amazon.encryption.s3.materials.CryptographicMaterialsManager;
import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager;
import software.amazon.encryption.s3.materials.Keyring;
import software.amazon.encryption.s3.materials.KmsKeyring;
import software.amazon.encryption.s3.materials.MultipartConfiguration;
import software.amazon.encryption.s3.materials.PartialRsaKeyPair;
import software.amazon.encryption.s3.materials.RsaKeyring;

import javax.crypto.SecretKey;
import java.io.IOException;
import java.security.KeyPair;
import java.security.Provider;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Consumer;

import static software.amazon.encryption.s3.S3EncryptionClientUtilities.INSTRUCTION_FILE_SUFFIX;
Expand All @@ -72,11 +63,11 @@ public class S3EncryptionClient implements S3Client {

// Used for request-scoped encryption contexts for supporting keys
public static final ExecutionAttribute<Map<String, String>> ENCRYPTION_CONTEXT = new ExecutionAttribute<>("EncryptionContext");
public static final ExecutionAttribute<MultipartConfiguration> CONFIGURATION = new ExecutionAttribute<>("MultipartConfiguration");
// TODO: Replace with UploadPartRequest.isLastPart() when launched.
// Used for multipart uploads
public static final ExecutionAttribute<Boolean> IS_LAST_PART = new ExecutionAttribute<>("isLastPart");
private final S3AsyncClient _wrappedAsyncClient;
private final S3AsyncClient _wrappedClient;
private final S3AsyncClient _wrappedCrtClient;
private final CryptographicMaterialsManager _cryptoMaterialsManager;
private final SecureRandom _secureRandom;
private final boolean _enableLegacyWrappingAlgorithms;
Expand All @@ -86,7 +77,8 @@ public class S3EncryptionClient implements S3Client {
private final MultipartUploadObjectPipeline _multipartPipeline;

private S3EncryptionClient(Builder builder) {
_wrappedAsyncClient = builder._wrappedAsyncClient;
_wrappedClient = builder._wrappedClient;
_wrappedCrtClient = builder._wrappedCrtClient;
_cryptoMaterialsManager = builder._cryptoMaterialsManager;
_secureRandom = builder._secureRandom;
_enableLegacyWrappingAlgorithms = builder._enableLegacyWrappingAlgorithms;
Expand All @@ -106,13 +98,6 @@ public static Consumer<AwsRequestOverrideConfiguration.Builder> withAdditionalCo
builder.putExecutionAttribute(S3EncryptionClient.ENCRYPTION_CONTEXT, encryptionContext);
}

// Helper function to attach encryption contexts to a request
public static Consumer<AwsRequestOverrideConfiguration.Builder> withAdditionalConfiguration(Map<String, String> encryptionContext, MultipartConfiguration multipartConfiguration) {
return builder ->
builder.putExecutionAttribute(S3EncryptionClient.ENCRYPTION_CONTEXT, encryptionContext)
.putExecutionAttribute(S3EncryptionClient.CONFIGURATION, multipartConfiguration);
}

// Helper function to determine last upload part during multipart uploads
public static Consumer<AwsRequestOverrideConfiguration.Builder> isLastPart(Boolean isLastPart) {
return builder ->
Expand All @@ -123,21 +108,11 @@ public static Consumer<AwsRequestOverrideConfiguration.Builder> isLastPart(Boole
public PutObjectResponse putObject(PutObjectRequest putObjectRequest, RequestBody requestBody)
throws AwsServiceException, SdkClientException {

if (_enableMultipartPutObject) {
try {
// TODO: Confirm best way to wrap CompleteMultipartUploadResponse with PutObjectResponse
CompleteMultipartUploadResponse completeResponse = multipartPutObject(putObjectRequest, requestBody);
PutObjectResponse response = PutObjectResponse.builder()
.eTag(completeResponse.eTag())
.build();
return response;
} catch (Throwable e) {
throw new S3EncryptionClientException("Exception while performing Multipart Upload PutObject", e);
}
}
PutEncryptedObjectPipeline pipeline = PutEncryptedObjectPipeline.builder()
.s3AsyncClient(_wrappedAsyncClient)
.s3AsyncClient(_wrappedClient)
.crtClient(_wrappedCrtClient)
.cryptoMaterialsManager(_cryptoMaterialsManager)
.enableMultipartPutObject(_enableMultipartPutObject)
.secureRandom(_secureRandom)
.build();

Expand All @@ -151,7 +126,7 @@ public <T> T getObject(GetObjectRequest getObjectRequest,
throws AwsServiceException, SdkClientException {

GetEncryptedObjectPipeline pipeline = GetEncryptedObjectPipeline.builder()
.s3AsyncClient(_wrappedAsyncClient)
.s3AsyncClient(_wrappedClient)
.cryptoMaterialsManager(_cryptoMaterialsManager)
.enableLegacyWrappingAlgorithms(_enableLegacyWrappingAlgorithms)
.enableLegacyUnauthenticatedModes(_enableLegacyUnauthenticatedModes)
Expand All @@ -168,76 +143,14 @@ public <T> T getObject(GetObjectRequest getObjectRequest,
}
}

private CompleteMultipartUploadResponse multipartPutObject(PutObjectRequest request, RequestBody requestBody) throws Throwable {

AwsRequestOverrideConfiguration overrideConfig = request.overrideConfiguration().get();
// If MultipartConfiguration is null, Initialize MultipartConfiguration
MultipartConfiguration multipartConfiguration = overrideConfig
.executionAttributes()
.getOptionalAttribute(S3EncryptionClient.CONFIGURATION)
.orElse(MultipartConfiguration.builder().build());

ExecutorService es = multipartConfiguration.executorService();
final boolean defaultExecutorService = es == null;
if (es == null) {
throw new S3EncryptionClientException("ExecutorService should not be null, Please initialize during MultipartConfiguration");
}

UploadObjectObserver observer = multipartConfiguration.uploadObjectObserver();
if (observer == null) {
throw new S3EncryptionClientException("UploadObjectObserver should not be null, Please initialize during MultipartConfiguration");
}

observer.init(request, _wrappedAsyncClient, this, es);
final String uploadId = observer.onUploadCreation(request);
final List<CompletedPart> partETags = new ArrayList<>();

MultiFileOutputStream outputStream = multipartConfiguration.multiFileOutputStream();
if (outputStream == null) {
throw new S3EncryptionClientException("MultiFileOutputStream should not be null, Please initialize during MultipartConfiguration");
}

try {
// initialize the multi-file output stream
outputStream.init(observer, multipartConfiguration.partSize(), multipartConfiguration.diskLimit());
// Kicks off the encryption-upload pipeline;
// Note outputStream is automatically closed upon method completion.
_multipartPipeline.putLocalObject(requestBody, uploadId, outputStream);
// block till all part have been uploaded
for (Future<Map<Integer, UploadPartResponse>> future : observer.futures()) {
Map<Integer, UploadPartResponse> partResponseMap = future.get();
partResponseMap.forEach((partNumber, uploadPartResponse) -> partETags.add(CompletedPart.builder()
.partNumber(partNumber)
.eTag(uploadPartResponse.eTag())
.build()));
}
} catch (IOException | InterruptedException | ExecutionException | RuntimeException | Error ex) {
throw onAbort(observer, ex);
} finally {
if (defaultExecutorService) {
// shut down the locally created thread pool
es.shutdownNow();
}
// delete left-over temp files
outputStream.cleanup();
}
// Complete upload
return observer.onCompletion(partETags);
}

private <T extends Throwable> T onAbort(UploadObjectObserver observer, T t) {
observer.onAbort();
return t;
}

@Override
public DeleteObjectResponse deleteObject(DeleteObjectRequest deleteObjectRequest) throws AwsServiceException,
SdkClientException {
// Delete the object
DeleteObjectResponse deleteObjectResponse = _wrappedAsyncClient.deleteObject(deleteObjectRequest).join();
DeleteObjectResponse deleteObjectResponse = _wrappedClient.deleteObject(deleteObjectRequest).join();
// If Instruction file exists, delete the instruction file as well.
String instructionObjectKey = deleteObjectRequest.key() + INSTRUCTION_FILE_SUFFIX;
_wrappedAsyncClient.deleteObject(builder -> builder
_wrappedClient.deleteObject(builder -> builder
.bucket(deleteObjectRequest.bucket())
.key(instructionObjectKey)).join();
return deleteObjectResponse;
Expand All @@ -247,10 +160,10 @@ public DeleteObjectResponse deleteObject(DeleteObjectRequest deleteObjectRequest
public DeleteObjectsResponse deleteObjects(DeleteObjectsRequest deleteObjectsRequest) throws AwsServiceException,
SdkClientException {
// Delete the objects
DeleteObjectsResponse deleteObjectsResponse = _wrappedAsyncClient.deleteObjects(deleteObjectsRequest).join();
DeleteObjectsResponse deleteObjectsResponse = _wrappedClient.deleteObjects(deleteObjectsRequest).join();
// If Instruction files exists, delete the instruction files as well.
List<ObjectIdentifier> deleteObjects = instructionFileKeysToDelete(deleteObjectsRequest);
_wrappedAsyncClient.deleteObjects(DeleteObjectsRequest.builder()
_wrappedClient.deleteObjects(DeleteObjectsRequest.builder()
.bucket(deleteObjectsRequest.bucket())
.delete(builder -> builder.objects(deleteObjects))
.build()).join();
Expand Down Expand Up @@ -295,16 +208,17 @@ public AbortMultipartUploadResponse abortMultipartUpload(AbortMultipartUploadReq

@Override
public String serviceName() {
return _wrappedAsyncClient.serviceName();
return _wrappedClient.serviceName();
}

@Override
public void close() {
_wrappedAsyncClient.close();
_wrappedClient.close();
}

public static class Builder {
private S3AsyncClient _wrappedAsyncClient = S3AsyncClient.create();
private S3AsyncClient _wrappedClient = S3AsyncClient.create();
private S3AsyncClient _wrappedCrtClient = null;

private MultipartUploadObjectPipeline _multipartPipeline;
private CryptographicMaterialsManager _cryptoMaterialsManager;
Expand All @@ -327,22 +241,13 @@ private Builder() {
* S3AsyncClient will be reflected in this Builder.
*/
@SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "Pass mutability into wrapping client")
public Builder wrappedAsyncClient(S3AsyncClient _wrappedAsyncClient) {
if (_wrappedAsyncClient instanceof S3AsyncEncryptionClient) {
throw new S3EncryptionClientException("Cannot use S3EncryptionClient as wrapped client");
public Builder wrappedClient(S3AsyncClient wrappedClient) {
if (wrappedClient instanceof S3AsyncEncryptionClient) {
throw new S3EncryptionClientException("Cannot use S3AsyncEncryptionClient as wrapped client");
}

this._wrappedAsyncClient = _wrappedAsyncClient;
return this;
}

@SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "Pass mutability into wrapping client")
public Builder _wrappedAsyncClient(S3AsyncClient _wrappedAsyncClient) {
if (_wrappedAsyncClient instanceof S3AsyncEncryptionClient) {
throw new S3EncryptionClientException("Cannot use S3EncryptionClient as wrapped client");
}

this._wrappedAsyncClient = _wrappedAsyncClient;
// Initializes only when wrappedClient is configured by user.
this._wrappedCrtClient = wrappedClient;
this._wrappedClient = wrappedClient;
return this;
}

Expand Down Expand Up @@ -480,7 +385,7 @@ public S3EncryptionClient build() {
}

_multipartPipeline = MultipartUploadObjectPipeline.builder()
.s3AsyncClient(_wrappedAsyncClient)
.s3AsyncClient(_wrappedClient)
.cryptoMaterialsManager(_cryptoMaterialsManager)
.secureRandom(_secureRandom)
.build();
Expand Down

This file was deleted.

Loading