Skip to content

Commit d2c1b92

Browse files
authored
Prevent multipart download from hanging (#5466)
* remove cancel lock in downloader subscriber * re-add synchronized block
1 parent 5047516 commit d2c1b92

File tree

2 files changed

+13
-22
lines changed

2 files changed

+13
-22
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,10 @@ private SplittingTransformer(AsyncResponseTransformer<ResponseT, ResultT> upstre
123123
maximumBufferSizeInBytes, "maximumBufferSizeInBytes");
124124

125125
this.resultFuture.whenComplete((r, e) -> {
126-
log.trace(() -> "result future whenComplete");
127126
if (e == null) {
128127
return;
129128
}
130129
if (isCancelled.compareAndSet(false, true)) {
131-
log.trace(() -> "result future whenComplete with isCancelled=true");
132130
handleFutureCancel(e);
133131
}
134132
});
@@ -171,7 +169,6 @@ public void request(long n) {
171169
public void cancel() {
172170
log.trace(() -> String.format("received cancel signal. Current cancel state is 'isCancelled=%s'", isCancelled.get()));
173171
if (isCancelled.compareAndSet(false, true)) {
174-
log.trace(() -> "Cancelling splitting transformer");
175172
handleSubscriptionCancel();
176173
}
177174
}
@@ -221,20 +218,17 @@ private void handleSubscriptionCancel() {
221218
}
222219
if (!onStreamCalled.get()) {
223220
// we never subscribe publisherToUpstream to the upstream, it would not complete
224-
log.trace(() -> "publisherToUpstream never subscribed, skipping downstreamSubscriber.onComplete()");
225221
downstreamSubscriber = null;
226222
return;
227223
}
228-
log.trace(() -> "publisherToUpstream.complete()");
229224
publisherToUpstream.complete().whenComplete((v, t) -> {
230225
if (downstreamSubscriber == null) {
231-
log.trace(() -> "[in future] downstreamSubscriber already null, skipping downstreamSubscriber.onComplete()");
232226
return;
233227
}
234228
if (t != null) {
235229
downstreamSubscriber.onError(t);
236230
} else {
237-
log.trace(() -> "[in future] calling downstreamSubscriber.onComplete");
231+
log.trace(() -> "calling downstreamSubscriber.onComplete()");
238232
downstreamSubscriber.onComplete();
239233
}
240234
downstreamSubscriber = null;
@@ -289,7 +283,6 @@ public CompletableFuture<ResponseT> prepare() {
289283
});
290284
individualFuture.whenComplete((r, e) -> {
291285
if (isCancelled.get()) {
292-
log.trace(() -> "Individual future completed .");
293286
handleSubscriptionCancel();
294287
}
295288
});

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,16 @@ public class MultipartDownloaderSubscriber implements Subscriber<AsyncResponseTr
6969
*/
7070
private final CompletableFuture<Void> future = new CompletableFuture<>();
7171

72-
private final Object lock = new Object();
73-
7472
/**
7573
* The etag of the object being downloaded.
7674
*/
7775
private volatile String eTag;
7876

77+
/**
78+
* The Subscription lock
79+
*/
80+
private final Object lock = new Object();
81+
7982
public MultipartDownloaderSubscriber(S3AsyncClient s3, GetObjectRequest getObjectRequest) {
8083
this(s3, getObjectRequest, 0);
8184
}
@@ -88,7 +91,6 @@ public MultipartDownloaderSubscriber(S3AsyncClient s3, GetObjectRequest getObjec
8891

8992
@Override
9093
public void onSubscribe(Subscription s) {
91-
log.trace(() -> "onSubscribe");
9294
if (this.subscription != null) {
9395
s.cancel();
9496
return;
@@ -99,20 +101,19 @@ public void onSubscribe(Subscription s) {
99101

100102
@Override
101103
public void onNext(AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> asyncResponseTransformer) {
102-
log.trace(() -> String.format("onNext, completed part = %d", completedParts.get()));
103104
if (asyncResponseTransformer == null) {
104105
subscription.cancel();
105106
throw new NullPointerException("onNext must not be called with null asyncResponseTransformer");
106107
}
107108

108109
int nextPartToGet = completedParts.get() + 1;
109110

110-
if (totalParts != null && nextPartToGet > totalParts) {
111-
synchronized (lock) {
112-
logMulitpartComplete(totalParts);
111+
synchronized (lock) {
112+
if (totalParts != null && nextPartToGet > totalParts) {
113+
log.debug(() -> String.format("Completing multipart download after a total of %d parts downloaded.", totalParts));
113114
subscription.cancel();
115+
return;
114116
}
115-
return;
116117
}
117118

118119
GetObjectRequest actualRequest = nextRequest(nextPartToGet);
@@ -128,10 +129,6 @@ public void onNext(AsyncResponseTransformer<GetObjectResponse, GetObjectResponse
128129
});
129130
}
130131

131-
private void logMulitpartComplete(int totalParts) {
132-
log.debug(() -> String.format("Completing multipart download after a total of %d parts downloaded.", totalParts));
133-
}
134-
135132
private void requestMoreIfNeeded(GetObjectResponse response) {
136133
int totalComplete = completedParts.incrementAndGet();
137134
MultipartDownloadUtils.multipartDownloadResumeContext(getObjectRequest)
@@ -153,14 +150,15 @@ private void requestMoreIfNeeded(GetObjectResponse response) {
153150
if (partCount != null && totalParts == null) {
154151
log.debug(() -> String.format("Total amount of parts of the object to download: %d", partCount));
155152
MultipartDownloadUtils.multipartDownloadResumeContext(getObjectRequest)
156-
.ifPresent(ctx -> ctx.totalParts(partCount));
153+
.ifPresent(ctx -> ctx.totalParts(partCount));
157154
totalParts = partCount;
158155
}
156+
159157
synchronized (lock) {
160158
if (totalParts != null && totalParts > 1 && totalComplete < totalParts) {
161159
subscription.request(1);
162160
} else {
163-
logMulitpartComplete(totalParts);
161+
log.debug(() -> String.format("Completing multipart download after a total of %d parts downloaded.", totalParts));
164162
subscription.cancel();
165163
}
166164
}

0 commit comments

Comments
 (0)