Skip to content

Prevent multipart download from hanging #5466

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,10 @@ private SplittingTransformer(AsyncResponseTransformer<ResponseT, ResultT> upstre
maximumBufferSizeInBytes, "maximumBufferSizeInBytes");

this.resultFuture.whenComplete((r, e) -> {
log.trace(() -> "result future whenComplete");
if (e == null) {
return;
}
if (isCancelled.compareAndSet(false, true)) {
log.trace(() -> "result future whenComplete with isCancelled=true");
handleFutureCancel(e);
}
});
Expand Down Expand Up @@ -171,7 +169,6 @@ public void request(long n) {
public void cancel() {
log.trace(() -> String.format("received cancel signal. Current cancel state is 'isCancelled=%s'", isCancelled.get()));
if (isCancelled.compareAndSet(false, true)) {
log.trace(() -> "Cancelling splitting transformer");
handleSubscriptionCancel();
}
}
Expand Down Expand Up @@ -221,20 +218,17 @@ private void handleSubscriptionCancel() {
}
if (!onStreamCalled.get()) {
// we never subscribe publisherToUpstream to the upstream, it would not complete
log.trace(() -> "publisherToUpstream never subscribed, skipping downstreamSubscriber.onComplete()");
downstreamSubscriber = null;
return;
}
log.trace(() -> "publisherToUpstream.complete()");
publisherToUpstream.complete().whenComplete((v, t) -> {
if (downstreamSubscriber == null) {
log.trace(() -> "[in future] downstreamSubscriber already null, skipping downstreamSubscriber.onComplete()");
return;
}
if (t != null) {
downstreamSubscriber.onError(t);
} else {
log.trace(() -> "[in future] calling downstreamSubscriber.onComplete");
log.trace(() -> "calling downstreamSubscriber.onComplete()");
downstreamSubscriber.onComplete();
}
downstreamSubscriber = null;
Expand Down Expand Up @@ -289,7 +283,6 @@ public CompletableFuture<ResponseT> prepare() {
});
individualFuture.whenComplete((r, e) -> {
if (isCancelled.get()) {
log.trace(() -> "Individual future completed .");
handleSubscriptionCancel();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,16 @@ public class MultipartDownloaderSubscriber implements Subscriber<AsyncResponseTr
*/
private final CompletableFuture<Void> future = new CompletableFuture<>();

private final Object lock = new Object();

/**
* The etag of the object being downloaded.
*/
private volatile String eTag;

/**
* The Subscription lock
*/
private final Object lock = new Object();

public MultipartDownloaderSubscriber(S3AsyncClient s3, GetObjectRequest getObjectRequest) {
this(s3, getObjectRequest, 0);
}
Expand All @@ -88,7 +91,6 @@ public MultipartDownloaderSubscriber(S3AsyncClient s3, GetObjectRequest getObjec

@Override
public void onSubscribe(Subscription s) {
log.trace(() -> "onSubscribe");
if (this.subscription != null) {
s.cancel();
return;
Expand All @@ -99,20 +101,19 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> asyncResponseTransformer) {
log.trace(() -> String.format("onNext, completed part = %d", completedParts.get()));
if (asyncResponseTransformer == null) {
subscription.cancel();
throw new NullPointerException("onNext must not be called with null asyncResponseTransformer");
}

int nextPartToGet = completedParts.get() + 1;

if (totalParts != null && nextPartToGet > totalParts) {
synchronized (lock) {
logMulitpartComplete(totalParts);
synchronized (lock) {
if (totalParts != null && nextPartToGet > totalParts) {
log.debug(() -> String.format("Completing multipart download after a total of %d parts downloaded.", totalParts));
subscription.cancel();
return;
}
return;
}

GetObjectRequest actualRequest = nextRequest(nextPartToGet);
Expand All @@ -128,10 +129,6 @@ public void onNext(AsyncResponseTransformer<GetObjectResponse, GetObjectResponse
});
}

private void logMulitpartComplete(int totalParts) {
log.debug(() -> String.format("Completing multipart download after a total of %d parts downloaded.", totalParts));
}

private void requestMoreIfNeeded(GetObjectResponse response) {
int totalComplete = completedParts.incrementAndGet();
MultipartDownloadUtils.multipartDownloadResumeContext(getObjectRequest)
Expand All @@ -153,14 +150,15 @@ private void requestMoreIfNeeded(GetObjectResponse response) {
if (partCount != null && totalParts == null) {
log.debug(() -> String.format("Total amount of parts of the object to download: %d", partCount));
MultipartDownloadUtils.multipartDownloadResumeContext(getObjectRequest)
.ifPresent(ctx -> ctx.totalParts(partCount));
.ifPresent(ctx -> ctx.totalParts(partCount));
totalParts = partCount;
}

synchronized (lock) {
if (totalParts != null && totalParts > 1 && totalComplete < totalParts) {
subscription.request(1);
} else {
logMulitpartComplete(totalParts);
log.debug(() -> String.format("Completing multipart download after a total of %d parts downloaded.", totalParts));
subscription.cancel();
}
}
Expand Down
Loading