Skip to content

Commit c7d4af6

Browse files
davidh44L-Applin
authored andcommitted
Move request checksum interceptors to pipeline stages (#4174)
* Move request checksum interceptors to pipeline stages * Refactor into one combined stage * Refactoring * Fix typo error * Refactoring * Fix typo * Refactoring * HttpChecksumStage test * Refactoring
1 parent 4a447cc commit c7d4af6

File tree

9 files changed

+546
-447
lines changed

9 files changed

+546
-447
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,7 @@
7272
import software.amazon.awssdk.core.internal.http.loader.DefaultSdkAsyncHttpClientBuilder;
7373
import software.amazon.awssdk.core.internal.http.loader.DefaultSdkHttpClientBuilder;
7474
import software.amazon.awssdk.core.internal.http.pipeline.stages.ApplyUserAgentStage;
75-
import software.amazon.awssdk.core.internal.interceptor.AsyncRequestBodyHttpChecksumTrailerInterceptor;
76-
import software.amazon.awssdk.core.internal.interceptor.HttpChecksumInHeaderInterceptor;
77-
import software.amazon.awssdk.core.internal.interceptor.HttpChecksumRequiredInterceptor;
7875
import software.amazon.awssdk.core.internal.interceptor.HttpChecksumValidationInterceptor;
79-
import software.amazon.awssdk.core.internal.interceptor.SyncHttpChecksumInTrailerInterceptor;
8076
import software.amazon.awssdk.core.retry.RetryMode;
8177
import software.amazon.awssdk.core.retry.RetryPolicy;
8278
import software.amazon.awssdk.core.util.SdkUserAgent;
@@ -441,11 +437,7 @@ private List<ExecutionInterceptor> resolveExecutionInterceptors(SdkClientConfigu
441437
*/
442438
private List<ExecutionInterceptor> sdkInterceptors() {
443439
return Collections.unmodifiableList(Arrays.asList(
444-
new HttpChecksumRequiredInterceptor(),
445-
new SyncHttpChecksumInTrailerInterceptor(),
446-
new HttpChecksumValidationInterceptor(),
447-
new AsyncRequestBodyHttpChecksumTrailerInterceptor(),
448-
new HttpChecksumInHeaderInterceptor()
440+
new HttpChecksumValidationInterceptor()
449441
));
450442
}
451443

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/AmazonAsyncHttpClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.concurrent.CompletableFuture;
2121
import software.amazon.awssdk.annotations.SdkInternalApi;
2222
import software.amazon.awssdk.annotations.ThreadSafe;
23+
import software.amazon.awssdk.core.ClientType;
2324
import software.amazon.awssdk.core.Response;
2425
import software.amazon.awssdk.core.SdkRequest;
2526
import software.amazon.awssdk.core.async.AsyncRequestBody;
@@ -37,6 +38,7 @@
3738
import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncExecutionFailureExceptionReportingStage;
3839
import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage;
3940
import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncSigningStage;
41+
import software.amazon.awssdk.core.internal.http.pipeline.stages.HttpChecksumStage;
4042
import software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage;
4143
import software.amazon.awssdk.core.internal.http.pipeline.stages.MakeRequestImmutableStage;
4244
import software.amazon.awssdk.core.internal.http.pipeline.stages.MakeRequestMutableStage;
@@ -169,6 +171,7 @@ public <OutputT> CompletableFuture<OutputT> execute(
169171
.then(ApplyUserAgentStage::new)
170172
.then(MergeCustomHeadersStage::new)
171173
.then(MergeCustomQueryParamsStage::new)
174+
.then(() -> new HttpChecksumStage(ClientType.ASYNC))
172175
.then(MakeRequestImmutableStage::new)
173176
.then(RequestPipelineBuilder
174177
.first(AsyncSigningStage::new)

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/AmazonSyncHttpClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import software.amazon.awssdk.annotations.SdkInternalApi;
1919
import software.amazon.awssdk.annotations.ThreadSafe;
20+
import software.amazon.awssdk.core.ClientType;
2021
import software.amazon.awssdk.core.Response;
2122
import software.amazon.awssdk.core.SdkRequest;
2223
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
@@ -37,6 +38,7 @@
3738
import software.amazon.awssdk.core.internal.http.pipeline.stages.BeforeUnmarshallingExecutionInterceptorsStage;
3839
import software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage;
3940
import software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage;
41+
import software.amazon.awssdk.core.internal.http.pipeline.stages.HttpChecksumStage;
4042
import software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage;
4143
import software.amazon.awssdk.core.internal.http.pipeline.stages.MakeRequestImmutableStage;
4244
import software.amazon.awssdk.core.internal.http.pipeline.stages.MakeRequestMutableStage;
@@ -170,6 +172,7 @@ public <OutputT> OutputT execute(HttpResponseHandler<Response<OutputT>> response
170172
.then(ApplyUserAgentStage::new)
171173
.then(MergeCustomHeadersStage::new)
172174
.then(MergeCustomQueryParamsStage::new)
175+
.then(() -> new HttpChecksumStage(ClientType.SYNC))
173176
.then(MakeRequestImmutableStage::new)
174177
// End of mutating request
175178
.then(RequestPipelineBuilder
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.http.pipeline.stages;
17+
18+
import static software.amazon.awssdk.core.HttpChecksumConstant.AWS_CHUNKED_HEADER;
19+
import static software.amazon.awssdk.core.HttpChecksumConstant.CONTENT_SHA_256_FOR_UNSIGNED_TRAILER;
20+
import static software.amazon.awssdk.core.HttpChecksumConstant.DEFAULT_ASYNC_CHUNK_SIZE;
21+
import static software.amazon.awssdk.core.HttpChecksumConstant.SIGNING_METHOD;
22+
import static software.amazon.awssdk.core.internal.io.AwsChunkedEncodingInputStream.DEFAULT_CHUNK_SIZE;
23+
import static software.amazon.awssdk.core.internal.io.AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength;
24+
import static software.amazon.awssdk.core.internal.util.ChunkContentUtils.calculateChecksumContentLength;
25+
import static software.amazon.awssdk.core.internal.util.HttpChecksumResolver.getResolvedChecksumSpecs;
26+
import static software.amazon.awssdk.http.Header.CONTENT_LENGTH;
27+
28+
import java.io.IOException;
29+
import java.io.InputStream;
30+
import java.io.UncheckedIOException;
31+
import software.amazon.awssdk.annotations.SdkInternalApi;
32+
import software.amazon.awssdk.core.ClientType;
33+
import software.amazon.awssdk.core.HttpChecksumConstant;
34+
import software.amazon.awssdk.core.checksums.ChecksumSpecs;
35+
import software.amazon.awssdk.core.checksums.SdkChecksum;
36+
import software.amazon.awssdk.core.interceptor.InterceptorContext;
37+
import software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute;
38+
import software.amazon.awssdk.core.internal.async.ChecksumCalculatingAsyncRequestBody;
39+
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
40+
import software.amazon.awssdk.core.internal.http.pipeline.MutableRequestToRequestPipeline;
41+
import software.amazon.awssdk.core.internal.io.AwsUnsignedChunkedEncodingInputStream;
42+
import software.amazon.awssdk.core.internal.util.HttpChecksumUtils;
43+
import software.amazon.awssdk.http.ContentStreamProvider;
44+
import software.amazon.awssdk.http.Header;
45+
import software.amazon.awssdk.http.SdkHttpFullRequest;
46+
import software.amazon.awssdk.utils.BinaryUtils;
47+
import software.amazon.awssdk.utils.IoUtils;
48+
import software.amazon.awssdk.utils.Md5Utils;
49+
50+
/**
51+
* Stage to implement the "httpChecksum" and "httpChecksumRequired" C2J traits, and flexible checksums.
52+
*/
53+
@SdkInternalApi
54+
public class HttpChecksumStage implements MutableRequestToRequestPipeline {
55+
56+
private final ClientType clientType;
57+
58+
public HttpChecksumStage(ClientType clientType) {
59+
this.clientType = clientType;
60+
}
61+
62+
@Override
63+
public SdkHttpFullRequest.Builder execute(SdkHttpFullRequest.Builder request, RequestExecutionContext context)
64+
throws Exception {
65+
if (md5ChecksumRequired(request, context)) {
66+
addMd5ChecksumInHeader(request);
67+
return request;
68+
}
69+
70+
ChecksumSpecs resolvedChecksumSpecs = getResolvedChecksumSpecs(context.executionAttributes());
71+
72+
if (flexibleChecksumInTrailerRequired(context, resolvedChecksumSpecs)) {
73+
addFlexibleChecksumInTrailer(request, context, resolvedChecksumSpecs);
74+
return request;
75+
}
76+
77+
if (flexibleChecksumInHeaderRequired(context, resolvedChecksumSpecs)) {
78+
addFlexibleChecksumInHeader(request, context, resolvedChecksumSpecs);
79+
return request;
80+
}
81+
82+
return request;
83+
}
84+
85+
private boolean md5ChecksumRequired(SdkHttpFullRequest.Builder request, RequestExecutionContext context) {
86+
boolean isHttpChecksumRequired =
87+
context.executionAttributes().getAttribute(SdkInternalExecutionAttribute.HTTP_CHECKSUM_REQUIRED) != null ||
88+
HttpChecksumUtils.isMd5ChecksumRequired(context.executionAttributes());
89+
90+
boolean requestAlreadyHasMd5 = request.firstMatchingHeader(Header.CONTENT_MD5).isPresent();
91+
92+
if (!isHttpChecksumRequired || requestAlreadyHasMd5) {
93+
return false;
94+
}
95+
96+
if (context.requestProvider() != null) {
97+
throw new IllegalArgumentException("This operation requires a content-MD5 checksum, but one cannot be calculated "
98+
+ "for non-blocking content.");
99+
}
100+
101+
return context.executionContext().interceptorContext().requestBody().isPresent();
102+
}
103+
104+
/**
105+
* Implements the "httpChecksumRequired" C2J trait. Operations with that trait applied will automatically include a
106+
* "Content-MD5" header, containing a checksum of the payload.
107+
*
108+
* <p>This is NOT supported for asynchronous HTTP content, which is currently only used for streaming upload operations.
109+
* If such operations are added in the future, we'll have to find a way to support them in a non-blocking manner. That will
110+
* likely require interface changes of some sort, because it's not currently possible to do a non-blocking update to
111+
* request headers.
112+
*
113+
* <p>
114+
* Calculates the MD5 checksum of the provided request (and base64 encodes it), and adds the header to the request.
115+
*
116+
* <p>Note: This assumes that the content stream provider can create multiple new streams. If it only supports one (e.g. with
117+
* an input stream that doesn't support mark/reset), we could consider buffering the content in memory here and updating the
118+
* request body to use that buffered content. We obviously don't want to do that for giant streams, so we haven't opted to do
119+
* that yet.
120+
*/
121+
private void addMd5ChecksumInHeader(SdkHttpFullRequest.Builder request) {
122+
try {
123+
String payloadMd5 = Md5Utils.md5AsBase64(request.contentStreamProvider().newStream());
124+
request.putHeader(Header.CONTENT_MD5, payloadMd5);
125+
} catch (IOException e) {
126+
throw new UncheckedIOException(e);
127+
}
128+
}
129+
130+
private boolean flexibleChecksumInTrailerRequired(RequestExecutionContext context, ChecksumSpecs checksumSpecs) {
131+
boolean hasRequestBody = true;
132+
if (clientType == ClientType.SYNC) {
133+
hasRequestBody = context.executionContext().interceptorContext().requestBody().isPresent();
134+
} else if (clientType == ClientType.ASYNC) {
135+
hasRequestBody = context.executionContext().interceptorContext().asyncRequestBody().isPresent();
136+
}
137+
138+
boolean isContentStreaming = context.executionContext().interceptorContext().requestBody()
139+
.map(requestBody -> requestBody.contentStreamProvider() != null).orElse(false);
140+
141+
return checksumSpecs != null
142+
&& checksumSpecs.headerName() != null
143+
&& HttpChecksumUtils.isTrailerBasedChecksumForClientType(
144+
context.executionAttributes(),
145+
context.executionContext().interceptorContext().httpRequest(),
146+
clientType, checksumSpecs, hasRequestBody, isContentStreaming);
147+
}
148+
149+
/**
150+
* Adds flexible checksum to trailers.
151+
*
152+
* <p>The flexible checksum is added only if the following conditions are met:
153+
* <ol>
154+
* <li>Checksum is not already calculated.</li>
155+
* <li>Unsigned payload.</li>
156+
* <li>Request has streaming payload.</li>
157+
* <li>Request has the algorithm checksum mentioned.</li>
158+
* </ol>
159+
*/
160+
private void addFlexibleChecksumInTrailer(SdkHttpFullRequest.Builder request, RequestExecutionContext context,
161+
ChecksumSpecs checksumSpecs) {
162+
long originalContentLength = 0;
163+
int chunkSize = 0;
164+
165+
if (clientType == ClientType.SYNC) {
166+
request.contentStreamProvider(new ChecksumCalculatingStreamProvider(request.contentStreamProvider(), checksumSpecs));
167+
originalContentLength =
168+
context.executionContext().interceptorContext().requestBody().get().optionalContentLength().orElse(0L);
169+
chunkSize = DEFAULT_CHUNK_SIZE;
170+
} else if (clientType == ClientType.ASYNC) {
171+
if (context.requestProvider() != null) {
172+
context.requestProvider(ChecksumCalculatingAsyncRequestBody.builder()
173+
.asyncRequestBody(context.requestProvider())
174+
.algorithm(checksumSpecs.algorithm())
175+
.trailerHeader(checksumSpecs.headerName()).build());
176+
originalContentLength =
177+
context.executionContext().interceptorContext().asyncRequestBody().get().contentLength().orElse(0L);
178+
chunkSize = DEFAULT_ASYNC_CHUNK_SIZE;
179+
}
180+
}
181+
182+
long checksumContentLength = calculateChecksumContentLength(checksumSpecs.algorithm(), checksumSpecs.headerName());
183+
long contentLen = checksumContentLength + calculateStreamContentLength(originalContentLength, chunkSize);
184+
185+
request.putHeader(HttpChecksumConstant.HEADER_FOR_TRAILER_REFERENCE, checksumSpecs.headerName())
186+
.appendHeader("Content-encoding", AWS_CHUNKED_HEADER)
187+
.putHeader("x-amz-content-sha256", CONTENT_SHA_256_FOR_UNSIGNED_TRAILER)
188+
.putHeader("x-amz-decoded-content-length", Long.toString(originalContentLength))
189+
.putHeader(CONTENT_LENGTH, Long.toString(contentLen));
190+
}
191+
192+
private boolean flexibleChecksumInHeaderRequired(RequestExecutionContext context, ChecksumSpecs headerChecksumSpecs) {
193+
if (!context.executionContext().interceptorContext().requestBody().isPresent()) {
194+
return false;
195+
}
196+
197+
InterceptorContext interceptorContext = context.executionContext().interceptorContext();
198+
199+
boolean isContentStreaming = context.executionContext().interceptorContext().requestBody()
200+
.map(requestBody -> requestBody.contentStreamProvider() != null).orElse(false);
201+
202+
return headerChecksumSpecs != null &&
203+
headerChecksumSpecs.algorithm() != null &&
204+
!HttpChecksumUtils.isHttpChecksumPresent(interceptorContext.httpRequest(), headerChecksumSpecs) &&
205+
HttpChecksumUtils.isUnsignedPayload(
206+
context.executionAttributes().getAttribute(SIGNING_METHOD), interceptorContext.httpRequest().protocol(),
207+
isContentStreaming) &&
208+
!headerChecksumSpecs.isRequestStreaming();
209+
}
210+
211+
/**
212+
* Implements the "HttpChecksum" C2J trait for a request.
213+
* HttpChecksum is added in the header only in following cases:
214+
* <ol>
215+
* <li>Non-streaming payload and Unsigned Payload </li>
216+
* <li>Non-streaming payload and Header-based Signing auth</li>
217+
* <li>Streaming payload and Header-based Signing auth</li>
218+
* </ol>
219+
* This stage will inject the Http checksum only for case 1 as above i.e. for unsigned payloads.
220+
* For the other two cases, the http checksum will be injected by the signers.
221+
*
222+
* <p>
223+
* Calculates the checksum of the provided request (and base64 encodes it), and adds the header to the request.
224+
*
225+
* <p>Note: This assumes that the content stream provider can create multiple new streams. If it only supports one (e.g. with
226+
* an input stream that doesn't support mark/reset), we could consider buffering the content in memory here and updating the
227+
* request body to use that buffered content. We obviously don't want to do that for giant streams, so we haven't opted to do
228+
* that yet.
229+
*/
230+
private void addFlexibleChecksumInHeader(SdkHttpFullRequest.Builder request, RequestExecutionContext context,
231+
ChecksumSpecs checksumSpecs) {
232+
try {
233+
String payloadChecksum = BinaryUtils.toBase64(HttpChecksumUtils.computeChecksum(
234+
context.executionContext().interceptorContext().requestBody().get().contentStreamProvider().newStream(),
235+
checksumSpecs.algorithm()));
236+
request.putHeader(checksumSpecs.headerName(), payloadChecksum);
237+
} catch (IOException e) {
238+
throw new UncheckedIOException(e);
239+
}
240+
}
241+
242+
static final class ChecksumCalculatingStreamProvider implements ContentStreamProvider {
243+
private final ContentStreamProvider underlyingInputStreamProvider;
244+
private final String checksumHeaderForTrailer;
245+
private final ChecksumSpecs checksumSpecs;
246+
private InputStream currentStream;
247+
private SdkChecksum sdkChecksum;
248+
249+
ChecksumCalculatingStreamProvider(ContentStreamProvider underlyingInputStreamProvider,
250+
ChecksumSpecs checksumSpecs) {
251+
this.underlyingInputStreamProvider = underlyingInputStreamProvider;
252+
this.sdkChecksum = SdkChecksum.forAlgorithm(checksumSpecs.algorithm());
253+
this.checksumHeaderForTrailer = checksumSpecs.headerName();
254+
this.checksumSpecs = checksumSpecs;
255+
}
256+
257+
@Override
258+
public InputStream newStream() {
259+
closeCurrentStream();
260+
currentStream = AwsUnsignedChunkedEncodingInputStream.builder()
261+
.inputStream(underlyingInputStreamProvider.newStream())
262+
.sdkChecksum(sdkChecksum)
263+
.checksumHeaderForTrailer(checksumHeaderForTrailer)
264+
.build();
265+
return currentStream;
266+
}
267+
268+
private void closeCurrentStream() {
269+
sdkChecksum = SdkChecksum.forAlgorithm(checksumSpecs.algorithm());
270+
if (currentStream != null) {
271+
IoUtils.closeQuietly(currentStream, null);
272+
currentStream = null;
273+
}
274+
}
275+
}
276+
}

0 commit comments

Comments
 (0)