18
18
import software .amazon .awssdk .services .s3 .model .AbortMultipartUploadResponse ;
19
19
import software .amazon .awssdk .services .s3 .model .CompleteMultipartUploadRequest ;
20
20
import software .amazon .awssdk .services .s3 .model .CompleteMultipartUploadResponse ;
21
+ import software .amazon .awssdk .services .s3 .model .CompletedPart ;
21
22
import software .amazon .awssdk .services .s3 .model .CreateMultipartUploadRequest ;
22
23
import software .amazon .awssdk .services .s3 .model .CreateMultipartUploadResponse ;
23
24
import software .amazon .awssdk .services .s3 .model .DeleteObjectRequest ;
32
33
import software .amazon .awssdk .services .s3 .model .UploadPartRequest ;
33
34
import software .amazon .awssdk .services .s3 .model .UploadPartResponse ;
34
35
import software .amazon .encryption .s3 .internal .GetEncryptedObjectPipeline ;
36
+ import software .amazon .encryption .s3 .internal .MultiFileOutputStream ;
35
37
import software .amazon .encryption .s3 .internal .MultipartUploadObjectPipeline ;
36
38
import software .amazon .encryption .s3 .internal .PutEncryptedObjectPipeline ;
39
+ import software .amazon .encryption .s3 .internal .UploadObjectObserver ;
37
40
import software .amazon .encryption .s3 .materials .AesKeyring ;
38
41
import software .amazon .encryption .s3 .materials .CryptographicMaterialsManager ;
39
42
import software .amazon .encryption .s3 .materials .DefaultCryptoMaterialsManager ;
40
43
import software .amazon .encryption .s3 .materials .Keyring ;
41
44
import software .amazon .encryption .s3 .materials .KmsKeyring ;
45
+ import software .amazon .encryption .s3 .materials .MultipartConfiguration ;
42
46
import software .amazon .encryption .s3 .materials .PartialRsaKeyPair ;
43
47
import software .amazon .encryption .s3 .materials .RsaKeyring ;
44
48
45
49
import javax .crypto .SecretKey ;
50
+ import java .io .IOException ;
46
51
import java .security .KeyPair ;
47
52
import java .security .Provider ;
48
53
import java .security .SecureRandom ;
54
+ import java .util .ArrayList ;
49
55
import java .util .List ;
50
56
import java .util .Map ;
51
57
import java .util .concurrent .CompletableFuture ;
52
58
import java .util .concurrent .CompletionException ;
59
+ import java .util .concurrent .ExecutionException ;
60
+ import java .util .concurrent .ExecutorService ;
53
61
import java .util .concurrent .Executors ;
62
+ import java .util .concurrent .Future ;
54
63
import java .util .function .Consumer ;
55
64
56
65
import static software .amazon .encryption .s3 .S3EncryptionClientUtilities .INSTRUCTION_FILE_SUFFIX ;
@@ -64,12 +73,13 @@ public class S3EncryptionClient extends DelegatingS3Client {
64
73
65
74
// Used for request-scoped encryption contexts for supporting keys
66
75
public static final ExecutionAttribute <Map <String , String >> ENCRYPTION_CONTEXT = new ExecutionAttribute <>("EncryptionContext" );
76
+ public static final ExecutionAttribute <MultipartConfiguration > CONFIGURATION = new ExecutionAttribute <>("MultipartConfiguration" );
67
77
// TODO: Replace with UploadPartRequest.isLastPart() when launched.
68
78
// Used for multipart uploads
69
79
public static final ExecutionAttribute <Boolean > IS_LAST_PART = new ExecutionAttribute <>("isLastPart" );
70
80
71
- private final S3AsyncClient _wrappedClient ;
72
- private final S3AsyncClient _wrappedCrtClient ;
81
+ private final S3Client _wrappedClient ;
82
+ private final S3AsyncClient _wrappedAsyncClient ;
73
83
private final CryptographicMaterialsManager _cryptoMaterialsManager ;
74
84
private final SecureRandom _secureRandom ;
75
85
private final boolean _enableLegacyWrappingAlgorithms ;
@@ -79,11 +89,9 @@ public class S3EncryptionClient extends DelegatingS3Client {
79
89
private final MultipartUploadObjectPipeline _multipartPipeline ;
80
90
81
91
private S3EncryptionClient (Builder builder ) {
82
- // The non-encrypted APIs will use a default client.
83
- // In the future, we may want to make this configurable.
84
- super (S3Client .create ());
92
+ super (builder ._wrappedClient );
85
93
_wrappedClient = builder ._wrappedClient ;
86
- _wrappedCrtClient = builder ._wrappedCrtClient ;
94
+ _wrappedAsyncClient = builder ._wrappedAsyncClient ;
87
95
_cryptoMaterialsManager = builder ._cryptoMaterialsManager ;
88
96
_secureRandom = builder ._secureRandom ;
89
97
_enableLegacyWrappingAlgorithms = builder ._enableLegacyWrappingAlgorithms ;
@@ -103,6 +111,13 @@ public static Consumer<AwsRequestOverrideConfiguration.Builder> withAdditionalCo
103
111
builder .putExecutionAttribute (S3EncryptionClient .ENCRYPTION_CONTEXT , encryptionContext );
104
112
}
105
113
114
+ // Helper function to attach encryption contexts to a request
115
+ public static Consumer <AwsRequestOverrideConfiguration .Builder > withAdditionalConfiguration (Map <String , String > encryptionContext , MultipartConfiguration multipartConfiguration ) {
116
+ return builder ->
117
+ builder .putExecutionAttribute (S3EncryptionClient .ENCRYPTION_CONTEXT , encryptionContext )
118
+ .putExecutionAttribute (S3EncryptionClient .CONFIGURATION , multipartConfiguration );
119
+ }
120
+
106
121
// Helper function to determine last upload part during multipart uploads
107
122
public static Consumer <AwsRequestOverrideConfiguration .Builder > isLastPart (Boolean isLastPart ) {
108
123
return builder ->
@@ -113,11 +128,21 @@ public static Consumer<AwsRequestOverrideConfiguration.Builder> isLastPart(Boole
113
128
public PutObjectResponse putObject (PutObjectRequest putObjectRequest , RequestBody requestBody )
114
129
throws AwsServiceException , SdkClientException {
115
130
131
+ if (_enableMultipartPutObject ) {
132
+ try {
133
+ // TODO: Confirm best way to wrap CompleteMultipartUploadResponse with PutObjectResponse
134
+ CompleteMultipartUploadResponse completeResponse = multipartPutObject (putObjectRequest , requestBody );
135
+ PutObjectResponse response = PutObjectResponse .builder ()
136
+ .eTag (completeResponse .eTag ())
137
+ .build ();
138
+ return response ;
139
+ } catch (Throwable e ) {
140
+ throw new S3EncryptionClientException ("Exception while performing Multipart Upload PutObject" , e );
141
+ }
142
+ }
116
143
PutEncryptedObjectPipeline pipeline = PutEncryptedObjectPipeline .builder ()
117
- .s3AsyncClient (_wrappedClient )
118
- .crtClient (_wrappedCrtClient )
144
+ .s3AsyncClient (_wrappedAsyncClient )
119
145
.cryptoMaterialsManager (_cryptoMaterialsManager )
120
- .enableMultipartPutObject (_enableMultipartPutObject )
121
146
.secureRandom (_secureRandom )
122
147
.build ();
123
148
@@ -131,7 +156,7 @@ public <T> T getObject(GetObjectRequest getObjectRequest,
131
156
throws AwsServiceException , SdkClientException {
132
157
133
158
GetEncryptedObjectPipeline pipeline = GetEncryptedObjectPipeline .builder ()
134
- .s3AsyncClient (_wrappedClient )
159
+ .s3AsyncClient (_wrappedAsyncClient )
135
160
.cryptoMaterialsManager (_cryptoMaterialsManager )
136
161
.enableLegacyWrappingAlgorithms (_enableLegacyWrappingAlgorithms )
137
162
.enableLegacyUnauthenticatedModes (_enableLegacyUnauthenticatedModes )
@@ -148,14 +173,76 @@ public <T> T getObject(GetObjectRequest getObjectRequest,
148
173
}
149
174
}
150
175
176
+ private CompleteMultipartUploadResponse multipartPutObject (PutObjectRequest request , RequestBody requestBody ) throws Throwable {
177
+
178
+ AwsRequestOverrideConfiguration overrideConfig = request .overrideConfiguration ().get ();
179
+ // If MultipartConfiguration is null, Initialize MultipartConfiguration
180
+ MultipartConfiguration multipartConfiguration = overrideConfig
181
+ .executionAttributes ()
182
+ .getOptionalAttribute (S3EncryptionClient .CONFIGURATION )
183
+ .orElse (MultipartConfiguration .builder ().build ());
184
+
185
+ ExecutorService es = multipartConfiguration .executorService ();
186
+ final boolean defaultExecutorService = es == null ;
187
+ if (es == null ) {
188
+ throw new S3EncryptionClientException ("ExecutorService should not be null, Please initialize during MultipartConfiguration" );
189
+ }
190
+
191
+ UploadObjectObserver observer = multipartConfiguration .uploadObjectObserver ();
192
+ if (observer == null ) {
193
+ throw new S3EncryptionClientException ("UploadObjectObserver should not be null, Please initialize during MultipartConfiguration" );
194
+ }
195
+
196
+ observer .init (request , _wrappedAsyncClient , this , es );
197
+ final String uploadId = observer .onUploadCreation (request );
198
+ final List <CompletedPart > partETags = new ArrayList <>();
199
+
200
+ MultiFileOutputStream outputStream = multipartConfiguration .multiFileOutputStream ();
201
+ if (outputStream == null ) {
202
+ throw new S3EncryptionClientException ("MultiFileOutputStream should not be null, Please initialize during MultipartConfiguration" );
203
+ }
204
+
205
+ try {
206
+ // initialize the multi-file output stream
207
+ outputStream .init (observer , multipartConfiguration .partSize (), multipartConfiguration .diskLimit ());
208
+ // Kicks off the encryption-upload pipeline;
209
+ // Note outputStream is automatically closed upon method completion.
210
+ _multipartPipeline .putLocalObject (requestBody , uploadId , outputStream );
211
+ // block till all part have been uploaded
212
+ for (Future <Map <Integer , UploadPartResponse >> future : observer .futures ()) {
213
+ Map <Integer , UploadPartResponse > partResponseMap = future .get ();
214
+ partResponseMap .forEach ((partNumber , uploadPartResponse ) -> partETags .add (CompletedPart .builder ()
215
+ .partNumber (partNumber )
216
+ .eTag (uploadPartResponse .eTag ())
217
+ .build ()));
218
+ }
219
+ } catch (IOException | InterruptedException | ExecutionException | RuntimeException | Error ex ) {
220
+ throw onAbort (observer , ex );
221
+ } finally {
222
+ if (defaultExecutorService ) {
223
+ // shut down the locally created thread pool
224
+ es .shutdownNow ();
225
+ }
226
+ // delete left-over temp files
227
+ outputStream .cleanup ();
228
+ }
229
+ // Complete upload
230
+ return observer .onCompletion (partETags );
231
+ }
232
+
233
+ private <T extends Throwable > T onAbort (UploadObjectObserver observer , T t ) {
234
+ observer .onAbort ();
235
+ return t ;
236
+ }
237
+
151
238
@ Override
152
239
public DeleteObjectResponse deleteObject (DeleteObjectRequest deleteObjectRequest ) throws AwsServiceException ,
153
240
SdkClientException {
154
241
// Delete the object
155
- DeleteObjectResponse deleteObjectResponse = _wrappedClient .deleteObject (deleteObjectRequest ).join ();
242
+ DeleteObjectResponse deleteObjectResponse = _wrappedAsyncClient .deleteObject (deleteObjectRequest ).join ();
156
243
// If Instruction file exists, delete the instruction file as well.
157
244
String instructionObjectKey = deleteObjectRequest .key () + INSTRUCTION_FILE_SUFFIX ;
158
- _wrappedClient .deleteObject (builder -> builder
245
+ _wrappedAsyncClient .deleteObject (builder -> builder
159
246
.bucket (deleteObjectRequest .bucket ())
160
247
.key (instructionObjectKey )).join ();
161
248
return deleteObjectResponse ;
@@ -165,10 +252,10 @@ public DeleteObjectResponse deleteObject(DeleteObjectRequest deleteObjectRequest
165
252
public DeleteObjectsResponse deleteObjects (DeleteObjectsRequest deleteObjectsRequest ) throws AwsServiceException ,
166
253
SdkClientException {
167
254
// Delete the objects
168
- DeleteObjectsResponse deleteObjectsResponse = _wrappedClient .deleteObjects (deleteObjectsRequest ).join ();
255
+ DeleteObjectsResponse deleteObjectsResponse = _wrappedAsyncClient .deleteObjects (deleteObjectsRequest ).join ();
169
256
// If Instruction files exists, delete the instruction files as well.
170
257
List <ObjectIdentifier > deleteObjects = instructionFileKeysToDelete (deleteObjectsRequest );
171
- _wrappedClient .deleteObjects (DeleteObjectsRequest .builder ()
258
+ _wrappedAsyncClient .deleteObjects (DeleteObjectsRequest .builder ()
172
259
.bucket (deleteObjectsRequest .bucket ())
173
260
.delete (builder -> builder .objects (deleteObjects ))
174
261
.build ()).join ();
@@ -213,12 +300,14 @@ public AbortMultipartUploadResponse abortMultipartUpload(AbortMultipartUploadReq
213
300
214
301
@ Override
215
302
public void close () {
216
- _wrappedClient .close ();
303
+ _wrappedAsyncClient .close ();
217
304
}
218
305
219
306
public static class Builder {
220
- private S3AsyncClient _wrappedClient = S3AsyncClient .create ();
221
- private S3AsyncClient _wrappedCrtClient = null ;
307
+ // The non-encrypted APIs will use a default client.
308
+ // In the future, we may want to make this configurable.
309
+ private S3Client _wrappedClient = S3Client .create ();
310
+ private S3AsyncClient _wrappedAsyncClient = S3AsyncClient .create ();
222
311
223
312
private MultipartUploadObjectPipeline _multipartPipeline ;
224
313
private CryptographicMaterialsManager _cryptoMaterialsManager ;
@@ -241,13 +330,22 @@ private Builder() {
241
330
* S3AsyncClient will be reflected in this Builder.
242
331
*/
243
332
@ SuppressFBWarnings (value = "EI_EXPOSE_REP2" , justification = "Pass mutability into wrapping client" )
244
- public Builder wrappedClient (S3AsyncClient wrappedClient ) {
245
- if (wrappedClient instanceof S3AsyncEncryptionClient ) {
246
- throw new S3EncryptionClientException ("Cannot use S3AsyncEncryptionClient as wrapped client" );
333
+ public Builder wrappedAsyncClient (S3AsyncClient _wrappedAsyncClient ) {
334
+ if (_wrappedAsyncClient instanceof S3AsyncEncryptionClient ) {
335
+ throw new S3EncryptionClientException ("Cannot use S3EncryptionClient as wrapped client" );
336
+ }
337
+
338
+ this ._wrappedAsyncClient = _wrappedAsyncClient ;
339
+ return this ;
340
+ }
341
+
342
+ @ SuppressFBWarnings (value = "EI_EXPOSE_REP2" , justification = "Pass mutability into wrapping client" )
343
+ public Builder _wrappedAsyncClient (S3AsyncClient _wrappedAsyncClient ) {
344
+ if (_wrappedAsyncClient instanceof S3AsyncEncryptionClient ) {
345
+ throw new S3EncryptionClientException ("Cannot use S3EncryptionClient as wrapped client" );
247
346
}
248
- // Initializes only when wrappedClient is configured by user.
249
- this ._wrappedCrtClient = wrappedClient ;
250
- this ._wrappedClient = wrappedClient ;
347
+
348
+ this ._wrappedAsyncClient = _wrappedAsyncClient ;
251
349
return this ;
252
350
}
253
351
@@ -385,7 +483,7 @@ public S3EncryptionClient build() {
385
483
}
386
484
387
485
_multipartPipeline = MultipartUploadObjectPipeline .builder ()
388
- .s3AsyncClient (_wrappedClient )
486
+ .s3AsyncClient (_wrappedAsyncClient )
389
487
.cryptoMaterialsManager (_cryptoMaterialsManager )
390
488
.secureRandom (_secureRandom )
391
489
.build ();
0 commit comments