Skip to content

Commit 336fd35

Browse files
authored
Implementation of MultipartDownloaderSubscriber (#4931)
* Implementation of MultipartDownloaderSubscriber and previous SplittingTransformer PR comments. Wiremock tests. - Use builder and template methods in DelegatingBufferingSubscriber * fix DelegatingBufferingSubscriber not considering downstream demand * fix DelegatingBufferingSubscriber not always respecting downstream demand * test for missing byte bug * test for missing byte bug * fix merge * remove buffering to see if byte length error is still triggered * missing byte investigation * testing errors with InputStream and Publisher * checkstyle * checkstyle * prevent test memory heap error * fix merge * handle cancel signal correctly, and make IndividualPartSubscriber non-static to do so. - removed trace/debug logging * attempt fix thread hanging for PublisherAsyncResponseTransformer
1 parent dd3eb20 commit 336fd35

File tree

13 files changed

+952
-201
lines changed

13 files changed

+952
-201
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java

Lines changed: 51 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public class SplittingTransformer<ResponseT, ResultT> implements SdkPublisher<As
8383
/**
8484
* The buffer size used to buffer the content received from the downstream subscriber
8585
*/
86-
private final long maximumBufferSize;
86+
private final long maximumBufferInBytes;
8787

8888
/**
8989
* This publisher is used to send the bytes received from the downstream subscriber's transformers to a
@@ -116,7 +116,7 @@ private SplittingTransformer(AsyncResponseTransformer<ResponseT, ResultT> upstre
116116
this.resultFuture = Validate.paramNotNull(
117117
resultFuture, "resultFuture");
118118
Validate.notNull(maximumBufferSizeInBytes, "maximumBufferSizeInBytes");
119-
this.maximumBufferSize = Validate.isPositive(
119+
this.maximumBufferInBytes = Validate.isPositive(
120120
maximumBufferSizeInBytes, "maximumBufferSizeInBytes");
121121
}
122122

@@ -160,8 +160,7 @@ public void request(long n) {
160160
public void cancel() {
161161
if (isCancelled.compareAndSet(false, true)) {
162162
log.trace(() -> "Cancelling splitting transformer");
163-
publisherToUpstream.complete();
164-
downstreamSubscriber = null;
163+
handleCancelState();
165164
}
166165
}
167166
}
@@ -196,25 +195,53 @@ private boolean doEmit() {
196195
return false;
197196
}
198197

198+
private void handleCancelState() {
199+
synchronized (this) {
200+
if (downstreamSubscriber == null) {
201+
return;
202+
}
203+
if (!onStreamCalled.get()) {
204+
// we never subscribe publisherToUpstream to the upstream, it would not complete
205+
downstreamSubscriber = null;
206+
return;
207+
}
208+
publisherToUpstream.complete().whenComplete((v, t) -> {
209+
if (downstreamSubscriber == null) {
210+
return;
211+
}
212+
if (t != null) {
213+
downstreamSubscriber.onError(t);
214+
} else {
215+
downstreamSubscriber.onComplete();
216+
}
217+
downstreamSubscriber = null;
218+
});
219+
}
220+
}
221+
199222
/**
200223
* The AsyncResponseTransformer for each of the individual requests that is sent back to the downstreamSubscriber when
201224
* requested. A future is created per request that is completed when onComplete is called on the subscriber for that request
202225
* body publisher.
203226
*/
204227
private class IndividualTransformer implements AsyncResponseTransformer<ResponseT, ResponseT> {
205-
206228
private ResponseT response;
207229
private CompletableFuture<ResponseT> individualFuture;
208230

209231
@Override
210232
public CompletableFuture<ResponseT> prepare() {
211233
this.individualFuture = new CompletableFuture<>();
212234
if (preparedCalled.compareAndSet(false, true)) {
213-
log.trace(() -> "calling prepare on the upstream transformer");
214235
CompletableFuture<ResultT> upstreamFuture = upstreamResponseTransformer.prepare();
215-
CompletableFutureUtils.forwardExceptionTo(resultFuture, upstreamFuture);
216-
CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture);
236+
if (!resultFuture.isDone()) {
237+
CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture);
238+
}
217239
}
240+
individualFuture.whenComplete((r, e) -> {
241+
if (isCancelled.get()) {
242+
handleCancelState();
243+
}
244+
});
218245
return this.individualFuture;
219246
}
220247

@@ -229,38 +256,40 @@ public void onResponse(ResponseT response) {
229256

230257
@Override
231258
public void onStream(SdkPublisher<ByteBuffer> publisher) {
259+
if (downstreamSubscriber == null) {
260+
return;
261+
}
232262
if (onStreamCalled.compareAndSet(false, true)) {
233263
log.trace(() -> "calling onStream on the upstream transformer");
234-
upstreamResponseTransformer.onStream(
235-
upstreamSubscriber ->
236-
publisherToUpstream.subscribe(new DelegatingBufferingSubscriber(maximumBufferSize, upstreamSubscriber))
237-
);
264+
upstreamResponseTransformer.onStream(upstreamSubscriber -> publisherToUpstream.subscribe(
265+
DelegatingBufferingSubscriber.builder()
266+
.maximumBufferInBytes(maximumBufferInBytes)
267+
.delegate(upstreamSubscriber)
268+
.build()
269+
));
238270
}
239-
publisher.subscribe(new IndividualPartSubscriber<>(this.individualFuture, response, publisherToUpstream));
271+
publisher.subscribe(new IndividualPartSubscriber<>(this.individualFuture, response));
240272
}
241273

242274
@Override
243275
public void exceptionOccurred(Throwable error) {
244-
individualFuture.completeExceptionally(error);
276+
publisherToUpstream.error(error);
245277
upstreamResponseTransformer.exceptionOccurred(error);
246278
}
247279
}
248280

249281
/**
250282
* the Subscriber for each of the individual request's ByteBuffer publisher
251283
*/
252-
static class IndividualPartSubscriber<T> implements Subscriber<ByteBuffer> {
284+
class IndividualPartSubscriber<T> implements Subscriber<ByteBuffer> {
253285

254286
private final CompletableFuture<T> future;
255287
private final T response;
256-
private final SimplePublisher<ByteBuffer> bodyPartPublisher;
257288
private Subscription subscription;
258289

259-
IndividualPartSubscriber(CompletableFuture<T> future, T response,
260-
SimplePublisher<ByteBuffer> bodyPartPublisher) {
290+
IndividualPartSubscriber(CompletableFuture<T> future, T response) {
261291
this.future = future;
262292
this.response = response;
263-
this.bodyPartPublisher = bodyPartPublisher;
264293
}
265294

266295
@Override
@@ -270,16 +299,15 @@ public void onSubscribe(Subscription s) {
270299
return;
271300
}
272301
this.subscription = s;
273-
// request everything, data will be buffered by the DelegatingBufferingSubscriber
274-
s.request(Long.MAX_VALUE);
302+
s.request(1);
275303
}
276304

277305
@Override
278306
public void onNext(ByteBuffer byteBuffer) {
279307
if (byteBuffer == null) {
280308
throw new NullPointerException("onNext must not be called with null byteBuffer");
281309
}
282-
bodyPartPublisher.send(byteBuffer).whenComplete((r, t) -> {
310+
publisherToUpstream.send(byteBuffer).whenComplete((r, t) -> {
283311
if (t != null) {
284312
handleError(t);
285313
return;
@@ -290,7 +318,6 @@ public void onNext(ByteBuffer byteBuffer) {
290318

291319
@Override
292320
public void onError(Throwable t) {
293-
bodyPartPublisher.error(t);
294321
handleError(t);
295322
}
296323

@@ -300,11 +327,11 @@ public void onComplete() {
300327
}
301328

302329
private void handleError(Throwable t) {
330+
publisherToUpstream.error(t);
303331
future.completeExceptionally(t);
304332
}
305333
}
306334

307-
308335
public static <ResponseT, ResultT> Builder<ResponseT, ResultT> builder() {
309336
return new Builder<>();
310337
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/IndividualPartSubscriberTckTest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818

1919
import java.nio.ByteBuffer;
2020
import java.util.concurrent.CompletableFuture;
21+
import java.util.concurrent.atomic.AtomicInteger;
2122
import org.reactivestreams.Subscriber;
2223
import org.reactivestreams.Subscription;
2324
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
2425
import org.reactivestreams.tck.TestEnvironment;
26+
import software.amazon.awssdk.core.ResponseBytes;
27+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
2528
import software.amazon.awssdk.utils.async.SimplePublisher;
2629

2730
public class IndividualPartSubscriberTckTest extends SubscriberWhiteboxVerification<ByteBuffer> {
@@ -34,9 +37,15 @@ protected IndividualPartSubscriberTckTest() {
3437

3538
@Override
3639
public Subscriber<ByteBuffer> createSubscriber(WhiteboxSubscriberProbe<ByteBuffer> probe) {
37-
CompletableFuture<Integer> future = new CompletableFuture<>();
40+
CompletableFuture<ByteBuffer> future = new CompletableFuture<>();
3841
SimplePublisher<ByteBuffer> publisher = new SimplePublisher<>();
39-
return new SplittingTransformer.IndividualPartSubscriber<Integer>(future, 0, publisher) {
42+
SplittingTransformer<Object, ResponseBytes<Object>> transformer =
43+
SplittingTransformer.<Object, ResponseBytes<Object>>builder()
44+
.upstreamResponseTransformer(AsyncResponseTransformer.toBytes())
45+
.maximumBufferSizeInBytes(32L)
46+
.resultFuture(new CompletableFuture<>())
47+
.build();
48+
return transformer.new IndividualPartSubscriber<ByteBuffer>(future, ByteBuffer.wrap(new byte[0])) {
4049
@Override
4150
public void onSubscribe(Subscription s) {
4251
super.onSubscribe(s);

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java

Lines changed: 88 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package software.amazon.awssdk.services.s3.internal.multipart;
1717

1818
import java.util.concurrent.CompletableFuture;
19-
import java.util.concurrent.atomic.AtomicBoolean;
2019
import java.util.concurrent.atomic.AtomicInteger;
2120
import org.reactivestreams.Subscriber;
2221
import org.reactivestreams.Subscription;
@@ -27,86 +26,136 @@
2726
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
2827
import software.amazon.awssdk.utils.Logger;
2928

30-
// [WIP]
31-
// Still work in progress, currently only used to help manual testing, please ignore
29+
/**
30+
* A subscriber implementation that will download all individual parts for a multipart get-object request. It receives the
31+
* individual {@link AsyncResponseTransformer} which will be used to perform the individual part requests.
32+
* This is a 'one-shot' class, it should <em>NOT</em> be reused for more than one multipart download
33+
*/
3234
@SdkInternalApi
3335
public class MultipartDownloaderSubscriber implements Subscriber<AsyncResponseTransformer<GetObjectResponse, GetObjectResponse>> {
3436
private static final Logger log = Logger.loggerFor(MultipartDownloaderSubscriber.class);
3537

38+
/**
39+
* The s3 client used to make the individual part requests
40+
*/
3641
private final S3AsyncClient s3;
42+
43+
/**
44+
* The GetObjectRequest that was provided when calling s3.getObject(...). It is copied for each individual request, and the
45+
* copy has the partNumber field updated as more parts are downloaded.
46+
*/
3747
private final GetObjectRequest getObjectRequest;
3848

39-
private AtomicBoolean totalPartKnown = new AtomicBoolean(false);
40-
private AtomicInteger totalParts = new AtomicInteger();
41-
private AtomicInteger completed = new AtomicInteger(0);
42-
private AtomicInteger currentPart = new AtomicInteger(0);
49+
/**
50+
* This value indicates the total number of parts of the object to get. If null, it means we don't know the total amount of
51+
* parts, wither because we haven't received a response from s3 yet to set it, or the object to get is not multipart.
52+
*/
53+
private volatile Integer totalParts;
54+
55+
/**
56+
* The total number of completed parts. A part is considered complete once the completable future associated with its request
57+
* completes successfully.
58+
*/
59+
private final AtomicInteger completedParts = new AtomicInteger(0);
4360

61+
/**
62+
* The subscription received from the publisher this subscriber subscribes to.
63+
*/
4464
private Subscription subscription;
4565

46-
private CompletableFuture<GetObjectResponse> responseFuture;
47-
private GetObjectResponse returnResponse;
66+
/**
67+
* This future will be completed once this subscriber reaches a terminal state, failed or successfully, and will be
68+
* completed accordingly.
69+
*/
70+
private final CompletableFuture<Void> future = new CompletableFuture<>();
71+
72+
private final Object lock = new Object();
73+
74+
/**
75+
* The etag of the object being downloaded.
76+
*/
77+
private String eTag;
4878

4979
public MultipartDownloaderSubscriber(S3AsyncClient s3, GetObjectRequest getObjectRequest) {
5080
this.s3 = s3;
5181
this.getObjectRequest = getObjectRequest;
52-
this.responseFuture = new CompletableFuture<>();
5382
}
5483

5584
@Override
5685
public void onSubscribe(Subscription s) {
57-
if (this.subscription == null) {
58-
this.subscription = s;
86+
if (this.subscription != null) {
87+
s.cancel();
88+
return;
5989
}
60-
s.request(1);
90+
this.subscription = s;
91+
this.subscription.request(1);
6192
}
6293

6394
@Override
6495
public void onNext(AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> asyncResponseTransformer) {
65-
int part = currentPart.incrementAndGet();
66-
if (totalPartKnown.get()) {
67-
log.info(() -> String.format("total part: %s, current part: %s", totalParts.get(), part));
68-
} else {
69-
log.info(() -> String.format("part %s", part));
96+
log.trace(() -> "onNext " + completedParts.get());
97+
if (asyncResponseTransformer == null) {
98+
throw new NullPointerException("onNext must not be called with null asyncResponseTransformer");
7099
}
71-
if (totalPartKnown.get() && part > totalParts.get()) {
72-
log.info(() -> "no more parts available, stopping");
73-
subscription.cancel();
74-
return;
100+
101+
int nextPartToGet = completedParts.get() + 1;
102+
103+
if (totalParts != null && nextPartToGet > totalParts) {
104+
synchronized (lock) {
105+
subscription.cancel();
106+
}
75107
}
76-
GetObjectRequest actualRequest = this.getObjectRequest.copy(req -> req.partNumber(part));
108+
109+
GetObjectRequest actualRequest = nextRequest(nextPartToGet);
77110
CompletableFuture<GetObjectResponse> future = s3.getObject(actualRequest, asyncResponseTransformer);
78-
future.whenComplete((response, e) -> {
79-
if (e != null) {
80-
responseFuture.completeExceptionally(e);
111+
future.whenComplete((response, error) -> {
112+
if (error != null) {
113+
onError(error);
81114
return;
82115
}
83-
completed.incrementAndGet();
84-
returnResponse = response;
85-
log.info(() -> String.format("received '%s'", response.contentRange()));
116+
int totalComplete = completedParts.incrementAndGet();
117+
log.trace(() -> String.format("completed part: %s", totalComplete));
118+
119+
if (eTag == null) {
120+
this.eTag = response.eTag();
121+
log.trace(() -> String.format("Multipart object ETag: %s", this.eTag));
122+
}
123+
86124
Integer partCount = response.partsCount();
87-
if (totalPartKnown.compareAndSet(false, true)) {
88-
totalParts.set(partCount);
89-
totalPartKnown.set(true);
125+
if (partCount != null && totalParts == null) {
126+
log.trace(() -> String.format("total parts: %s", partCount));
127+
totalParts = partCount;
90128
}
91-
log.info(() -> String.format("total parts: %s", partCount));
92-
if (totalParts.get() > 1) {
93-
subscription.request(1);
129+
synchronized (lock) {
130+
if (totalParts != null && totalParts > 1 && totalComplete < totalParts) {
131+
subscription.request(1);
132+
} else {
133+
subscription.cancel();
134+
}
94135
}
95136
});
96137
}
97138

98139
@Override
99140
public void onError(Throwable t) {
100-
responseFuture.completeExceptionally(t);
141+
future.completeExceptionally(t);
101142
}
102143

103144
@Override
104145
public void onComplete() {
105-
responseFuture.complete(returnResponse);
146+
future.complete(null);
106147
}
107148

108-
public CompletableFuture<GetObjectResponse> future() {
109-
return responseFuture;
149+
public CompletableFuture<Void> future() {
150+
return this.future;
110151
}
111152

153+
private GetObjectRequest nextRequest(int nextPartToGet) {
154+
return getObjectRequest.copy(req -> {
155+
req.partNumber(nextPartToGet);
156+
if (eTag != null) {
157+
req.ifMatch(eTag);
158+
}
159+
});
160+
}
112161
}

0 commit comments

Comments
 (0)