Skip to content

Commit f01ada0

Browse files
authored
Request compression async streaming (#4262)
* Refactor to common class AwsChunkedInputStream * Sync streaming compression * Sync streaming compression functional tests * Sync streaming compression integ tests * Fix integ test * Async streaming compression * Address comments * Refactor ChunkBuffer class * Address comments * Address comments * Remove unused field * Handle demand in Subscriber * Address comments * Add back final modifier
1 parent 4bae174 commit f01ada0

File tree

12 files changed

+933
-116
lines changed

12 files changed

+933
-116
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,45 +21,43 @@
2121
import java.util.ArrayList;
2222
import java.util.Collections;
2323
import java.util.List;
24+
import java.util.Optional;
2425
import java.util.concurrent.atomic.AtomicLong;
2526
import software.amazon.awssdk.annotations.SdkInternalApi;
2627
import software.amazon.awssdk.utils.Logger;
27-
import software.amazon.awssdk.utils.Validate;
2828
import software.amazon.awssdk.utils.builder.SdkBuilder;
2929

3030
/**
31-
* Class that will buffer incoming BufferBytes of totalBytes length to chunks of bufferSize*
31+
* Class that will buffer incoming BufferBytes to chunks of bufferSize.
32+
* If totalBytes is not provided, i.e. content-length is unknown, {@link #getBufferedData()} should be used in the Subscriber's
33+
* {@code onComplete()} to check for a final chunk that is smaller than the chunk size, and send if present.
3234
*/
3335
@SdkInternalApi
3436
public final class ChunkBuffer {
3537
private static final Logger log = Logger.loggerFor(ChunkBuffer.class);
3638
private final AtomicLong transferredBytes;
3739
private final ByteBuffer currentBuffer;
3840
private final int chunkSize;
39-
private final long totalBytes;
41+
private final Long totalBytes;
4042

4143
private ChunkBuffer(Long totalBytes, Integer bufferSize) {
42-
Validate.notNull(totalBytes, "The totalBytes must not be null");
43-
4444
int chunkSize = bufferSize != null ? bufferSize : DEFAULT_ASYNC_CHUNK_SIZE;
4545
this.chunkSize = chunkSize;
4646
this.currentBuffer = ByteBuffer.allocate(chunkSize);
47-
this.totalBytes = totalBytes;
4847
this.transferredBytes = new AtomicLong(0);
48+
this.totalBytes = totalBytes;
4949
}
5050

5151
public static Builder builder() {
5252
return new DefaultBuilder();
5353
}
5454

55-
5655
/**
5756
* Split the input {@link ByteBuffer} into multiple smaller {@link ByteBuffer}s, each of which contains {@link #chunkSize}
5857
* worth of bytes. If the last chunk of the input ByteBuffer contains less than {@link #chunkSize} data, the last chunk will
5958
* be buffered.
6059
*/
6160
public synchronized Iterable<ByteBuffer> split(ByteBuffer inputByteBuffer) {
62-
6361
if (!inputByteBuffer.hasRemaining()) {
6462
return Collections.singletonList(inputByteBuffer);
6563
}
@@ -71,7 +69,7 @@ public synchronized Iterable<ByteBuffer> split(ByteBuffer inputByteBuffer) {
7169
fillCurrentBuffer(inputByteBuffer);
7270

7371
if (isCurrentBufferFull()) {
74-
addCurrentBufferToIterable(byteBuffers, chunkSize);
72+
addCurrentBufferToIterable(byteBuffers);
7573
}
7674
}
7775

@@ -82,8 +80,7 @@ public synchronized Iterable<ByteBuffer> split(ByteBuffer inputByteBuffer) {
8280

8381
// If this is the last chunk, add data buffered to the iterable
8482
if (isLastChunk()) {
85-
int remainingBytesInBuffer = currentBuffer.position();
86-
addCurrentBufferToIterable(byteBuffers, remainingBytesInBuffer);
83+
addCurrentBufferToIterable(byteBuffers);
8784
}
8885
return byteBuffers;
8986
}
@@ -111,19 +108,38 @@ private void splitRemainingInputByteBuffer(ByteBuffer inputByteBuffer, List<Byte
111108
}
112109
}
113110

111+
/**
112+
* Retrieve the current buffered data.
113+
*/
114+
public Optional<ByteBuffer> getBufferedData() {
115+
int remainingBytesInBuffer = currentBuffer.position();
116+
117+
if (remainingBytesInBuffer == 0) {
118+
return Optional.empty();
119+
}
120+
121+
ByteBuffer bufferedChunk = ByteBuffer.allocate(remainingBytesInBuffer);
122+
currentBuffer.flip();
123+
bufferedChunk.put(currentBuffer);
124+
bufferedChunk.flip();
125+
return Optional.of(bufferedChunk);
126+
}
127+
114128
private boolean isLastChunk() {
129+
if (totalBytes == null) {
130+
return false;
131+
}
115132
long remainingBytes = totalBytes - transferredBytes.get();
116133
return remainingBytes != 0 && remainingBytes == currentBuffer.position();
117134
}
118135

119-
private void addCurrentBufferToIterable(List<ByteBuffer> byteBuffers, int capacity) {
120-
ByteBuffer bufferedChunk = ByteBuffer.allocate(capacity);
121-
currentBuffer.flip();
122-
bufferedChunk.put(currentBuffer);
123-
bufferedChunk.flip();
124-
byteBuffers.add(bufferedChunk);
125-
transferredBytes.addAndGet(bufferedChunk.remaining());
126-
currentBuffer.clear();
136+
private void addCurrentBufferToIterable(List<ByteBuffer> byteBuffers) {
137+
Optional<ByteBuffer> bufferedChunk = getBufferedData();
138+
if (bufferedChunk.isPresent()) {
139+
byteBuffers.add(bufferedChunk.get());
140+
transferredBytes.addAndGet(bufferedChunk.get().remaining());
141+
currentBuffer.clear();
142+
}
127143
}
128144

129145
private void fillCurrentBuffer(ByteBuffer inputByteBuffer) {
@@ -151,8 +167,6 @@ public interface Builder extends SdkBuilder<Builder, ChunkBuffer> {
151167
Builder bufferSize(int bufferSize);
152168

153169
Builder totalBytes(long totalBytes);
154-
155-
156170
}
157171

158172
private static final class DefaultBuilder implements Builder {
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import static software.amazon.awssdk.core.internal.io.AwsChunkedInputStream.DEFAULT_CHUNK_SIZE;
19+
20+
import java.nio.ByteBuffer;
21+
import java.util.Collections;
22+
import java.util.Optional;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.concurrent.atomic.AtomicLong;
25+
import org.reactivestreams.Subscriber;
26+
import org.reactivestreams.Subscription;
27+
import software.amazon.awssdk.annotations.SdkInternalApi;
28+
import software.amazon.awssdk.core.async.AsyncRequestBody;
29+
import software.amazon.awssdk.core.async.SdkPublisher;
30+
import software.amazon.awssdk.core.internal.compression.Compressor;
31+
import software.amazon.awssdk.utils.Validate;
32+
import software.amazon.awssdk.utils.async.DelegatingSubscriber;
33+
import software.amazon.awssdk.utils.async.FlatteningSubscriber;
34+
import software.amazon.awssdk.utils.builder.SdkBuilder;
35+
36+
/**
37+
* Wrapper class to wrap an AsyncRequestBody.
38+
* This will chunk and compress the payload with the provided {@link Compressor}.
39+
*/
40+
@SdkInternalApi
41+
public class CompressionAsyncRequestBody implements AsyncRequestBody {
42+
43+
private final AsyncRequestBody wrapped;
44+
private final Compressor compressor;
45+
private final int chunkSize;
46+
47+
private CompressionAsyncRequestBody(DefaultBuilder builder) {
48+
this.wrapped = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody");
49+
this.compressor = Validate.paramNotNull(builder.compressor, "compressor");
50+
this.chunkSize = builder.chunkSize != null ? builder.chunkSize : DEFAULT_CHUNK_SIZE;
51+
}
52+
53+
@Override
54+
public void subscribe(Subscriber<? super ByteBuffer> s) {
55+
Validate.notNull(s, "Subscription MUST NOT be null.");
56+
57+
SdkPublisher<Iterable<ByteBuffer>> split = split(wrapped);
58+
SdkPublisher<ByteBuffer> flattening = flattening(split);
59+
flattening.map(compressor::compress).subscribe(s);
60+
}
61+
62+
@Override
63+
public Optional<Long> contentLength() {
64+
return wrapped.contentLength();
65+
}
66+
67+
@Override
68+
public String contentType() {
69+
return wrapped.contentType();
70+
}
71+
72+
private SdkPublisher<Iterable<ByteBuffer>> split(SdkPublisher<ByteBuffer> source) {
73+
return subscriber -> source.subscribe(new SplittingSubscriber(subscriber, chunkSize));
74+
}
75+
76+
private SdkPublisher<ByteBuffer> flattening(SdkPublisher<Iterable<ByteBuffer>> source) {
77+
return subscriber -> source.subscribe(new FlatteningSubscriber<>(subscriber));
78+
}
79+
80+
/**
81+
* @return Builder instance to construct a {@link CompressionAsyncRequestBody}.
82+
*/
83+
public static Builder builder() {
84+
return new DefaultBuilder();
85+
}
86+
87+
public interface Builder extends SdkBuilder<CompressionAsyncRequestBody.Builder, CompressionAsyncRequestBody> {
88+
89+
/**
90+
* Sets the AsyncRequestBody that will be wrapped.
91+
* @param asyncRequestBody
92+
* @return This builder for method chaining.
93+
*/
94+
Builder asyncRequestBody(AsyncRequestBody asyncRequestBody);
95+
96+
/**
97+
* Sets the compressor to compress the request.
98+
* @param compressor
99+
* @return This builder for method chaining.
100+
*/
101+
Builder compressor(Compressor compressor);
102+
103+
/**
104+
* Sets the chunk size. Default size is 128 * 1024.
105+
* @param chunkSize
106+
* @return This builder for method chaining.
107+
*/
108+
Builder chunkSize(Integer chunkSize);
109+
}
110+
111+
private static final class DefaultBuilder implements Builder {
112+
113+
private AsyncRequestBody asyncRequestBody;
114+
private Compressor compressor;
115+
private Integer chunkSize;
116+
117+
@Override
118+
public CompressionAsyncRequestBody build() {
119+
return new CompressionAsyncRequestBody(this);
120+
}
121+
122+
@Override
123+
public Builder asyncRequestBody(AsyncRequestBody asyncRequestBody) {
124+
this.asyncRequestBody = asyncRequestBody;
125+
return this;
126+
}
127+
128+
@Override
129+
public Builder compressor(Compressor compressor) {
130+
this.compressor = compressor;
131+
return this;
132+
}
133+
134+
@Override
135+
public Builder chunkSize(Integer chunkSize) {
136+
this.chunkSize = chunkSize;
137+
return this;
138+
}
139+
}
140+
141+
private static final class SplittingSubscriber extends DelegatingSubscriber<ByteBuffer, Iterable<ByteBuffer>> {
142+
private final ChunkBuffer chunkBuffer;
143+
private final AtomicBoolean upstreamDone = new AtomicBoolean(false);
144+
private final AtomicLong downstreamDemand = new AtomicLong();
145+
private final Object lock = new Object();
146+
private volatile boolean sentFinalChunk = false;
147+
148+
protected SplittingSubscriber(Subscriber<? super Iterable<ByteBuffer>> subscriber, int chunkSize) {
149+
super(subscriber);
150+
this.chunkBuffer = ChunkBuffer.builder()
151+
.bufferSize(chunkSize)
152+
.build();
153+
}
154+
155+
@Override
156+
public void onSubscribe(Subscription s) {
157+
subscriber.onSubscribe(new Subscription() {
158+
@Override
159+
public void request(long n) {
160+
if (n <= 0) {
161+
throw new IllegalArgumentException("n > 0 required but it was " + n);
162+
}
163+
164+
downstreamDemand.getAndAdd(n);
165+
166+
if (upstreamDone.get()) {
167+
sendFinalChunk();
168+
} else {
169+
s.request(n);
170+
}
171+
}
172+
173+
@Override
174+
public void cancel() {
175+
s.cancel();
176+
}
177+
});
178+
}
179+
180+
@Override
181+
public void onNext(ByteBuffer byteBuffer) {
182+
downstreamDemand.decrementAndGet();
183+
Iterable<ByteBuffer> buffers = chunkBuffer.split(byteBuffer);
184+
subscriber.onNext(buffers);
185+
}
186+
187+
@Override
188+
public void onComplete() {
189+
upstreamDone.compareAndSet(false, true);
190+
if (downstreamDemand.get() > 0) {
191+
sendFinalChunk();
192+
}
193+
}
194+
195+
@Override
196+
public void onError(Throwable t) {
197+
upstreamDone.compareAndSet(false, true);
198+
super.onError(t);
199+
}
200+
201+
private void sendFinalChunk() {
202+
synchronized (lock) {
203+
if (!sentFinalChunk) {
204+
sentFinalChunk = true;
205+
Optional<ByteBuffer> byteBuffer = chunkBuffer.getBufferedData();
206+
byteBuffer.ifPresent(buffer -> subscriber.onNext(Collections.singletonList(buffer)));
207+
subscriber.onComplete();
208+
}
209+
}
210+
}
211+
}
212+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import software.amazon.awssdk.core.exception.SdkClientException;
3131
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
3232
import software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute;
33+
import software.amazon.awssdk.core.internal.async.CompressionAsyncRequestBody;
3334
import software.amazon.awssdk.core.internal.compression.Compressor;
3435
import software.amazon.awssdk.core.internal.compression.CompressorType;
3536
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
@@ -63,7 +64,6 @@ public SdkHttpFullRequest.Builder execute(SdkHttpFullRequest.Builder input, Requ
6364

6465
Compressor compressor = resolveCompressorType(context.executionAttributes());
6566

66-
// non-streaming
6767
if (!isStreaming(context)) {
6868
compressEntirePayload(input, compressor);
6969
updateContentEncodingHeader(input, compressor);
@@ -76,12 +76,14 @@ public SdkHttpFullRequest.Builder execute(SdkHttpFullRequest.Builder input, Requ
7676
}
7777

7878
if (context.requestProvider() == null) {
79-
// sync streaming
8079
input.contentStreamProvider(new CompressionContentStreamProvider(input.contentStreamProvider(), compressor));
80+
} else {
81+
context.requestProvider(CompressionAsyncRequestBody.builder()
82+
.asyncRequestBody(context.requestProvider())
83+
.compressor(compressor)
84+
.build());
8185
}
8286

83-
// TODO : streaming - async
84-
8587
updateContentEncodingHeader(input, compressor);
8688
return input;
8789
}

0 commit comments

Comments
 (0)