2
2
3
3
import edu .umd .cs .findbugs .annotations .SuppressFBWarnings ;
4
4
import software .amazon .awssdk .awscore .exception .AwsServiceException ;
5
+ import software .amazon .awssdk .core .async .AsyncRequestBody ;
5
6
import software .amazon .awssdk .core .exception .SdkClientException ;
6
7
import software .amazon .awssdk .core .sync .RequestBody ;
8
+ import software .amazon .awssdk .services .s3 .S3AsyncClient ;
7
9
import software .amazon .awssdk .services .s3 .S3Client ;
8
10
import software .amazon .awssdk .services .s3 .model .AbortMultipartUploadRequest ;
9
11
import software .amazon .awssdk .services .s3 .model .AbortMultipartUploadResponse ;
28
30
import java .util .Collections ;
29
31
import java .util .HashMap ;
30
32
import java .util .Map ;
33
+ import java .util .concurrent .Executors ;
31
34
32
35
public class MultipartUploadObjectPipeline {
33
36
34
37
final private S3Client _s3Client ;
38
+ final private S3AsyncClient _s3AsyncClient ;
35
39
final private CryptographicMaterialsManager _cryptoMaterialsManager ;
36
40
final private MultipartContentEncryptionStrategy _contentEncryptionStrategy ;
37
41
final private ContentMetadataEncodingStrategy _contentMetadataEncodingStrategy ;
@@ -42,6 +46,7 @@ public class MultipartUploadObjectPipeline {
42
46
43
47
private MultipartUploadObjectPipeline (Builder builder ) {
44
48
this ._s3Client = builder ._s3Client ;
49
+ this ._s3AsyncClient = S3AsyncClient .create (); // TODO plumbing
45
50
this ._cryptoMaterialsManager = builder ._cryptoMaterialsManager ;
46
51
this ._contentEncryptionStrategy = builder ._contentEncryptionStrategy ;
47
52
this ._contentMetadataEncodingStrategy = builder ._contentMetadataEncodingStrategy ;
@@ -76,38 +81,37 @@ public UploadPartResponse uploadPart(UploadPartRequest request, RequestBody requ
76
81
throws AwsServiceException , SdkClientException {
77
82
final AlgorithmSuite algorithmSuite = AlgorithmSuite .ALG_AES_256_GCM_IV12_TAG16_NO_KDF ;
78
83
final int blockSize = algorithmSuite .cipherBlockSizeBytes ();
79
- final String uploadId = request .uploadId ();
80
84
final long partSize = requestBody .optionalContentLength ().orElse (-1L );
81
85
final int cipherTagLength = isLastPart ? algorithmSuite .cipherTagLengthBytes () : 0 ;
86
+ final long ciphertextLength = partSize + cipherTagLength ;
82
87
final boolean partSizeMultipleOfCipherBlockSize = 0 == (partSize % blockSize );
88
+
83
89
if (!isLastPart && !partSizeMultipleOfCipherBlockSize ) {
84
- throw new S3EncryptionClientException (
85
- "Invalid part size: part sizes for encrypted multipart uploads must be multiples "
86
- + "of the cipher block size ("
87
- + blockSize
88
- + ") with the exception of the last part." );
90
+ throw new S3EncryptionClientException ("Invalid part size: part sizes for encrypted multipart uploads must " +
91
+ "be multiples of the cipher block size (" + blockSize + ") with the exception of the last part." );
89
92
}
93
+
94
+ final String uploadId = request .uploadId ();
90
95
final MultipartUploadContext uploadContext = _multipartUploadContexts .get (uploadId );
91
96
if (uploadContext == null ) {
92
- throw new S3EncryptionClientException (
93
- "No client-side information available on upload ID " + uploadId );
97
+ throw new S3EncryptionClientException ("No client-side information available on upload ID " + uploadId );
94
98
}
95
99
final UploadPartResponse response ;
96
100
// Checks the parts are uploaded in series
97
101
uploadContext .beginPartUpload (request .partNumber ());
98
102
Cipher cipher = uploadContext .getCipher ();
99
103
try {
100
- final InputStream cipherInputStream = new AuthenticatedCipherInputStream ( requestBody .contentStreamProvider ().newStream (), cipher , true , isLastPart );
104
+ final AsyncRequestBody cipherAsyncRequestBody = new CipherAsyncRequestBody ( cipher , AsyncRequestBody . fromInputStream ( requestBody .contentStreamProvider ().newStream (), request . contentLength (), Executors . newSingleThreadExecutor ()), ciphertextLength );
101
105
// The last part of the multipart upload will contain an extra
102
106
// 16-byte mac
103
107
if (isLastPart ) {
104
108
if (uploadContext .hasFinalPartBeenSeen ()) {
105
- throw new S3EncryptionClientException (
106
- "This part was specified as the last part in a multipart upload, but a previous part was already marked as the last part. "
107
- + "Only the last part of the upload should be marked as the last part." );
109
+ throw new S3EncryptionClientException ("This part was specified as the last part in a multipart " +
110
+ "upload, but a previous part was already marked as the last part. Only the last part of the " +
111
+ " upload should be marked as the last part." );
108
112
}
109
113
}
110
- response = _s3Client .uploadPart (request , RequestBody . fromInputStream ( cipherInputStream , partSize + cipherTagLength ) );
114
+ response = _s3AsyncClient .uploadPart (request , cipherAsyncRequestBody ). join ( );
111
115
} finally {
112
116
uploadContext .endPartUpload ();
113
117
}
0 commit comments