Skip to content

Commit 24be141

Browse files
authored
feat: Add S3CrtAsyncClient as MultipartPutobject (#90)
1 parent 322e88c commit 24be141

12 files changed

+145
-701
lines changed

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@
7474
<optional>true</optional>
7575
</dependency>
7676

77+
<dependency>
78+
<groupId>software.amazon.awssdk.crt</groupId>
79+
<artifactId>aws-crt</artifactId>
80+
<version>0.21.5</version>
81+
<optional>true</optional>
82+
</dependency>
83+
7784
<dependency>
7885
<groupId>net.jcip</groupId>
7986
<artifactId>jcip-annotations</artifactId>

src/main/java/software/amazon/encryption/s3/S3AsyncEncryptionClient.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,21 @@
3939
public class S3AsyncEncryptionClient implements S3AsyncClient {
4040

4141
private final S3AsyncClient _wrappedClient;
42+
private final S3AsyncClient _wrappedCrtClient;
4243
private final CryptographicMaterialsManager _cryptoMaterialsManager;
4344
private final SecureRandom _secureRandom;
4445
private final boolean _enableLegacyWrappingAlgorithms;
4546
private final boolean _enableLegacyUnauthenticatedModes;
4647
private final boolean _enableDelayedAuthenticationMode;
48+
private final boolean _enableMultipartPutObject;
4749

4850
private S3AsyncEncryptionClient(Builder builder) {
4951
_wrappedClient = builder._wrappedClient;
52+
_wrappedCrtClient = builder._wrappedCrtClient;
5053
_cryptoMaterialsManager = builder._cryptoMaterialsManager;
5154
_secureRandom = builder._secureRandom;
5255
_enableLegacyWrappingAlgorithms = builder._enableLegacyWrappingAlgorithms;
56+
_enableMultipartPutObject = builder._enableMultipartPutObject;
5357
_enableLegacyUnauthenticatedModes = builder._enableLegacyUnauthenticatedModes;
5458
_enableDelayedAuthenticationMode = builder._enableDelayedAuthenticationMode;
5559
}
@@ -69,6 +73,8 @@ public CompletableFuture<PutObjectResponse> putObject(PutObjectRequest putObject
6973
throws AwsServiceException, SdkClientException {
7074
PutEncryptedObjectPipeline pipeline = PutEncryptedObjectPipeline.builder()
7175
.s3AsyncClient(_wrappedClient)
76+
.crtClient(_wrappedCrtClient)
77+
.enableMultipartPutObject(_enableMultipartPutObject)
7278
.cryptoMaterialsManager(_cryptoMaterialsManager)
7379
.secureRandom(_secureRandom)
7480
.build();
@@ -130,13 +136,15 @@ public void close() {
130136
// TODO: The async / non-async clients can probably share a builder - revisit after implementing async
131137
public static class Builder {
132138
private S3AsyncClient _wrappedClient = S3AsyncClient.builder().build();
139+
private S3AsyncClient _wrappedCrtClient = null;
133140
private CryptographicMaterialsManager _cryptoMaterialsManager;
134141
private Keyring _keyring;
135142
private SecretKey _aesKey;
136143
private PartialRsaKeyPair _rsaKeyPair;
137144
private String _kmsKeyId;
138145
private boolean _enableLegacyWrappingAlgorithms = false;
139146
private boolean _enableLegacyUnauthenticatedModes = false;
147+
private boolean _enableMultipartPutObject = false;
140148
private boolean _enableDelayedAuthenticationMode = false;
141149
private Provider _cryptoProvider = null;
142150
private SecureRandom _secureRandom = new SecureRandom();
@@ -153,7 +161,8 @@ public Builder wrappedClient(S3AsyncClient wrappedClient) {
153161
if (wrappedClient instanceof S3AsyncEncryptionClient) {
154162
throw new S3EncryptionClientException("Cannot use S3EncryptionClient as wrapped client");
155163
}
156-
164+
// Initializes only when wrappedAsyncClient is configured by user.
165+
this._wrappedCrtClient = wrappedClient;
157166
this._wrappedClient = wrappedClient;
158167
return this;
159168
}
@@ -239,6 +248,11 @@ public Builder enableDelayedAuthenticationMode(boolean shouldEnableDelayedAuthen
239248
return this;
240249
}
241250

251+
public Builder enableMultipartPutObject(boolean _enableMultipartPutObject) {
252+
this._enableMultipartPutObject = _enableMultipartPutObject;
253+
return this;
254+
}
255+
242256
public Builder cryptoProvider(Provider cryptoProvider) {
243257
this._cryptoProvider = cryptoProvider;
244258
return this;

src/main/java/software/amazon/encryption/s3/S3EncryptionClient.java

Lines changed: 23 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse;
1818
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
1919
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
20-
import software.amazon.awssdk.services.s3.model.CompletedPart;
2120
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
2221
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
2322
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
@@ -32,33 +31,25 @@
3231
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
3332
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
3433
import software.amazon.encryption.s3.internal.GetEncryptedObjectPipeline;
35-
import software.amazon.encryption.s3.internal.MultiFileOutputStream;
3634
import software.amazon.encryption.s3.internal.MultipartUploadObjectPipeline;
3735
import software.amazon.encryption.s3.internal.PutEncryptedObjectPipeline;
38-
import software.amazon.encryption.s3.internal.UploadObjectObserver;
3936
import software.amazon.encryption.s3.materials.AesKeyring;
4037
import software.amazon.encryption.s3.materials.CryptographicMaterialsManager;
4138
import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager;
4239
import software.amazon.encryption.s3.materials.Keyring;
4340
import software.amazon.encryption.s3.materials.KmsKeyring;
44-
import software.amazon.encryption.s3.materials.MultipartConfiguration;
4541
import software.amazon.encryption.s3.materials.PartialRsaKeyPair;
4642
import software.amazon.encryption.s3.materials.RsaKeyring;
4743

4844
import javax.crypto.SecretKey;
49-
import java.io.IOException;
5045
import java.security.KeyPair;
5146
import java.security.Provider;
5247
import java.security.SecureRandom;
53-
import java.util.ArrayList;
5448
import java.util.List;
5549
import java.util.Map;
5650
import java.util.concurrent.CompletableFuture;
5751
import java.util.concurrent.CompletionException;
58-
import java.util.concurrent.ExecutionException;
59-
import java.util.concurrent.ExecutorService;
6052
import java.util.concurrent.Executors;
61-
import java.util.concurrent.Future;
6253
import java.util.function.Consumer;
6354

6455
import static software.amazon.encryption.s3.S3EncryptionClientUtilities.INSTRUCTION_FILE_SUFFIX;
@@ -72,11 +63,11 @@ public class S3EncryptionClient implements S3Client {
7263

7364
// Used for request-scoped encryption contexts for supporting keys
7465
public static final ExecutionAttribute<Map<String, String>> ENCRYPTION_CONTEXT = new ExecutionAttribute<>("EncryptionContext");
75-
public static final ExecutionAttribute<MultipartConfiguration> CONFIGURATION = new ExecutionAttribute<>("MultipartConfiguration");
7666
// TODO: Replace with UploadPartRequest.isLastPart() when launched.
7767
// Used for multipart uploads
7868
public static final ExecutionAttribute<Boolean> IS_LAST_PART = new ExecutionAttribute<>("isLastPart");
79-
private final S3AsyncClient _wrappedAsyncClient;
69+
private final S3AsyncClient _wrappedClient;
70+
private final S3AsyncClient _wrappedCrtClient;
8071
private final CryptographicMaterialsManager _cryptoMaterialsManager;
8172
private final SecureRandom _secureRandom;
8273
private final boolean _enableLegacyWrappingAlgorithms;
@@ -86,7 +77,8 @@ public class S3EncryptionClient implements S3Client {
8677
private final MultipartUploadObjectPipeline _multipartPipeline;
8778

8879
private S3EncryptionClient(Builder builder) {
89-
_wrappedAsyncClient = builder._wrappedAsyncClient;
80+
_wrappedClient = builder._wrappedClient;
81+
_wrappedCrtClient = builder._wrappedCrtClient;
9082
_cryptoMaterialsManager = builder._cryptoMaterialsManager;
9183
_secureRandom = builder._secureRandom;
9284
_enableLegacyWrappingAlgorithms = builder._enableLegacyWrappingAlgorithms;
@@ -106,13 +98,6 @@ public static Consumer<AwsRequestOverrideConfiguration.Builder> withAdditionalCo
10698
builder.putExecutionAttribute(S3EncryptionClient.ENCRYPTION_CONTEXT, encryptionContext);
10799
}
108100

109-
// Helper function to attach encryption contexts to a request
110-
public static Consumer<AwsRequestOverrideConfiguration.Builder> withAdditionalConfiguration(Map<String, String> encryptionContext, MultipartConfiguration multipartConfiguration) {
111-
return builder ->
112-
builder.putExecutionAttribute(S3EncryptionClient.ENCRYPTION_CONTEXT, encryptionContext)
113-
.putExecutionAttribute(S3EncryptionClient.CONFIGURATION, multipartConfiguration);
114-
}
115-
116101
// Helper function to determine last upload part during multipart uploads
117102
public static Consumer<AwsRequestOverrideConfiguration.Builder> isLastPart(Boolean isLastPart) {
118103
return builder ->
@@ -123,21 +108,11 @@ public static Consumer<AwsRequestOverrideConfiguration.Builder> isLastPart(Boole
123108
public PutObjectResponse putObject(PutObjectRequest putObjectRequest, RequestBody requestBody)
124109
throws AwsServiceException, SdkClientException {
125110

126-
if (_enableMultipartPutObject) {
127-
try {
128-
// TODO: Confirm best way to wrap CompleteMultipartUploadResponse with PutObjectResponse
129-
CompleteMultipartUploadResponse completeResponse = multipartPutObject(putObjectRequest, requestBody);
130-
PutObjectResponse response = PutObjectResponse.builder()
131-
.eTag(completeResponse.eTag())
132-
.build();
133-
return response;
134-
} catch (Throwable e) {
135-
throw new S3EncryptionClientException("Exception while performing Multipart Upload PutObject", e);
136-
}
137-
}
138111
PutEncryptedObjectPipeline pipeline = PutEncryptedObjectPipeline.builder()
139-
.s3AsyncClient(_wrappedAsyncClient)
112+
.s3AsyncClient(_wrappedClient)
113+
.crtClient(_wrappedCrtClient)
140114
.cryptoMaterialsManager(_cryptoMaterialsManager)
115+
.enableMultipartPutObject(_enableMultipartPutObject)
141116
.secureRandom(_secureRandom)
142117
.build();
143118

@@ -151,7 +126,7 @@ public <T> T getObject(GetObjectRequest getObjectRequest,
151126
throws AwsServiceException, SdkClientException {
152127

153128
GetEncryptedObjectPipeline pipeline = GetEncryptedObjectPipeline.builder()
154-
.s3AsyncClient(_wrappedAsyncClient)
129+
.s3AsyncClient(_wrappedClient)
155130
.cryptoMaterialsManager(_cryptoMaterialsManager)
156131
.enableLegacyWrappingAlgorithms(_enableLegacyWrappingAlgorithms)
157132
.enableLegacyUnauthenticatedModes(_enableLegacyUnauthenticatedModes)
@@ -168,76 +143,14 @@ public <T> T getObject(GetObjectRequest getObjectRequest,
168143
}
169144
}
170145

171-
private CompleteMultipartUploadResponse multipartPutObject(PutObjectRequest request, RequestBody requestBody) throws Throwable {
172-
173-
AwsRequestOverrideConfiguration overrideConfig = request.overrideConfiguration().get();
174-
// If MultipartConfiguration is null, Initialize MultipartConfiguration
175-
MultipartConfiguration multipartConfiguration = overrideConfig
176-
.executionAttributes()
177-
.getOptionalAttribute(S3EncryptionClient.CONFIGURATION)
178-
.orElse(MultipartConfiguration.builder().build());
179-
180-
ExecutorService es = multipartConfiguration.executorService();
181-
final boolean defaultExecutorService = es == null;
182-
if (es == null) {
183-
throw new S3EncryptionClientException("ExecutorService should not be null, Please initialize during MultipartConfiguration");
184-
}
185-
186-
UploadObjectObserver observer = multipartConfiguration.uploadObjectObserver();
187-
if (observer == null) {
188-
throw new S3EncryptionClientException("UploadObjectObserver should not be null, Please initialize during MultipartConfiguration");
189-
}
190-
191-
observer.init(request, _wrappedAsyncClient, this, es);
192-
final String uploadId = observer.onUploadCreation(request);
193-
final List<CompletedPart> partETags = new ArrayList<>();
194-
195-
MultiFileOutputStream outputStream = multipartConfiguration.multiFileOutputStream();
196-
if (outputStream == null) {
197-
throw new S3EncryptionClientException("MultiFileOutputStream should not be null, Please initialize during MultipartConfiguration");
198-
}
199-
200-
try {
201-
// initialize the multi-file output stream
202-
outputStream.init(observer, multipartConfiguration.partSize(), multipartConfiguration.diskLimit());
203-
// Kicks off the encryption-upload pipeline;
204-
// Note outputStream is automatically closed upon method completion.
205-
_multipartPipeline.putLocalObject(requestBody, uploadId, outputStream);
206-
// block till all part have been uploaded
207-
for (Future<Map<Integer, UploadPartResponse>> future : observer.futures()) {
208-
Map<Integer, UploadPartResponse> partResponseMap = future.get();
209-
partResponseMap.forEach((partNumber, uploadPartResponse) -> partETags.add(CompletedPart.builder()
210-
.partNumber(partNumber)
211-
.eTag(uploadPartResponse.eTag())
212-
.build()));
213-
}
214-
} catch (IOException | InterruptedException | ExecutionException | RuntimeException | Error ex) {
215-
throw onAbort(observer, ex);
216-
} finally {
217-
if (defaultExecutorService) {
218-
// shut down the locally created thread pool
219-
es.shutdownNow();
220-
}
221-
// delete left-over temp files
222-
outputStream.cleanup();
223-
}
224-
// Complete upload
225-
return observer.onCompletion(partETags);
226-
}
227-
228-
private <T extends Throwable> T onAbort(UploadObjectObserver observer, T t) {
229-
observer.onAbort();
230-
return t;
231-
}
232-
233146
@Override
234147
public DeleteObjectResponse deleteObject(DeleteObjectRequest deleteObjectRequest) throws AwsServiceException,
235148
SdkClientException {
236149
// Delete the object
237-
DeleteObjectResponse deleteObjectResponse = _wrappedAsyncClient.deleteObject(deleteObjectRequest).join();
150+
DeleteObjectResponse deleteObjectResponse = _wrappedClient.deleteObject(deleteObjectRequest).join();
238151
// If Instruction file exists, delete the instruction file as well.
239152
String instructionObjectKey = deleteObjectRequest.key() + INSTRUCTION_FILE_SUFFIX;
240-
_wrappedAsyncClient.deleteObject(builder -> builder
153+
_wrappedClient.deleteObject(builder -> builder
241154
.bucket(deleteObjectRequest.bucket())
242155
.key(instructionObjectKey)).join();
243156
return deleteObjectResponse;
@@ -247,10 +160,10 @@ public DeleteObjectResponse deleteObject(DeleteObjectRequest deleteObjectRequest
247160
public DeleteObjectsResponse deleteObjects(DeleteObjectsRequest deleteObjectsRequest) throws AwsServiceException,
248161
SdkClientException {
249162
// Delete the objects
250-
DeleteObjectsResponse deleteObjectsResponse = _wrappedAsyncClient.deleteObjects(deleteObjectsRequest).join();
163+
DeleteObjectsResponse deleteObjectsResponse = _wrappedClient.deleteObjects(deleteObjectsRequest).join();
251164
// If Instruction files exists, delete the instruction files as well.
252165
List<ObjectIdentifier> deleteObjects = instructionFileKeysToDelete(deleteObjectsRequest);
253-
_wrappedAsyncClient.deleteObjects(DeleteObjectsRequest.builder()
166+
_wrappedClient.deleteObjects(DeleteObjectsRequest.builder()
254167
.bucket(deleteObjectsRequest.bucket())
255168
.delete(builder -> builder.objects(deleteObjects))
256169
.build()).join();
@@ -295,16 +208,17 @@ public AbortMultipartUploadResponse abortMultipartUpload(AbortMultipartUploadReq
295208

296209
@Override
297210
public String serviceName() {
298-
return _wrappedAsyncClient.serviceName();
211+
return _wrappedClient.serviceName();
299212
}
300213

301214
@Override
302215
public void close() {
303-
_wrappedAsyncClient.close();
216+
_wrappedClient.close();
304217
}
305218

306219
public static class Builder {
307-
private S3AsyncClient _wrappedAsyncClient = S3AsyncClient.create();
220+
private S3AsyncClient _wrappedClient = S3AsyncClient.create();
221+
private S3AsyncClient _wrappedCrtClient = null;
308222

309223
private MultipartUploadObjectPipeline _multipartPipeline;
310224
private CryptographicMaterialsManager _cryptoMaterialsManager;
@@ -327,22 +241,13 @@ private Builder() {
327241
* S3AsyncClient will be reflected in this Builder.
328242
*/
329243
@SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "Pass mutability into wrapping client")
330-
public Builder wrappedAsyncClient(S3AsyncClient _wrappedAsyncClient) {
331-
if (_wrappedAsyncClient instanceof S3AsyncEncryptionClient) {
332-
throw new S3EncryptionClientException("Cannot use S3EncryptionClient as wrapped client");
244+
public Builder wrappedClient(S3AsyncClient wrappedClient) {
245+
if (wrappedClient instanceof S3AsyncEncryptionClient) {
246+
throw new S3EncryptionClientException("Cannot use S3AsyncEncryptionClient as wrapped client");
333247
}
334-
335-
this._wrappedAsyncClient = _wrappedAsyncClient;
336-
return this;
337-
}
338-
339-
@SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "Pass mutability into wrapping client")
340-
public Builder _wrappedAsyncClient(S3AsyncClient _wrappedAsyncClient) {
341-
if (_wrappedAsyncClient instanceof S3AsyncEncryptionClient) {
342-
throw new S3EncryptionClientException("Cannot use S3EncryptionClient as wrapped client");
343-
}
344-
345-
this._wrappedAsyncClient = _wrappedAsyncClient;
248+
// Initializes only when wrappedClient is configured by user.
249+
this._wrappedCrtClient = wrappedClient;
250+
this._wrappedClient = wrappedClient;
346251
return this;
347252
}
348253

@@ -480,7 +385,7 @@ public S3EncryptionClient build() {
480385
}
481386

482387
_multipartPipeline = MultipartUploadObjectPipeline.builder()
483-
.s3AsyncClient(_wrappedAsyncClient)
388+
.s3AsyncClient(_wrappedClient)
484389
.cryptoMaterialsManager(_cryptoMaterialsManager)
485390
.secureRandom(_secureRandom)
486391
.build();

src/main/java/software/amazon/encryption/s3/internal/FileDeletionEvent.java

Lines changed: 0 additions & 9 deletions
This file was deleted.

0 commit comments

Comments
 (0)