-
Notifications
You must be signed in to change notification settings - Fork 916
Fix for multipart integ test failure #5176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0ddb24d
2588752
059fa2a
9d70e6a
86cf6e7
f602822
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -70,7 +70,8 @@ public class SplittingTransformer<ResponseT, ResultT> implements SdkPublisher<As | |
private final AtomicBoolean onStreamCalled = new AtomicBoolean(false); | ||
|
||
/** | ||
* Set to true once {@code .concel()} is called in the subscription of the downstream subscriber | ||
* Set to true once {@code .cancel()} is called in the subscription of the downstream subscriber, or if the | ||
* {@code resultFuture} is cancelled. | ||
*/ | ||
private final AtomicBoolean isCancelled = new AtomicBoolean(false); | ||
|
||
|
@@ -108,6 +109,8 @@ public class SplittingTransformer<ResponseT, ResultT> implements SdkPublisher<As | |
*/ | ||
private final AtomicBoolean emitting = new AtomicBoolean(false); | ||
|
||
private final Object cancelLock = new Object(); | ||
|
||
private SplittingTransformer(AsyncResponseTransformer<ResponseT, ResultT> upstreamResponseTransformer, | ||
Long maximumBufferSizeInBytes, | ||
CompletableFuture<ResultT> resultFuture) { | ||
|
@@ -118,6 +121,15 @@ private SplittingTransformer(AsyncResponseTransformer<ResponseT, ResultT> upstre | |
Validate.notNull(maximumBufferSizeInBytes, "maximumBufferSizeInBytes"); | ||
this.maximumBufferInBytes = Validate.isPositive( | ||
maximumBufferSizeInBytes, "maximumBufferSizeInBytes"); | ||
|
||
this.resultFuture.whenComplete((r, e) -> { | ||
if (e == null) { | ||
return; | ||
} | ||
if (isCancelled.compareAndSet(false, true)) { | ||
handleFutureCancel(e); | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
|
@@ -160,7 +172,7 @@ public void request(long n) { | |
public void cancel() { | ||
if (isCancelled.compareAndSet(false, true)) { | ||
log.trace(() -> "Cancelling splitting transformer"); | ||
handleCancelState(); | ||
handleSubscriptionCancel(); | ||
} | ||
} | ||
} | ||
|
@@ -195,8 +207,14 @@ private boolean doEmit() { | |
return false; | ||
} | ||
|
||
private void handleCancelState() { | ||
synchronized (this) { | ||
/** | ||
* Handle the {@code .cancel()} signal received from the downstream subscription. Data that is being sent to the upstream | ||
* transformer need to finish processing before we complete. One typical use case for this is completing the multipart | ||
* download, the subscriber having reached the final part will signal that it doesn't need more parts by calling {@code | ||
* .cancel()} on the subscription. | ||
*/ | ||
private void handleSubscriptionCancel() { | ||
synchronized (cancelLock) { | ||
L-Applin marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I meant more that just because
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Aaah I see what you mean. Looking at it, yep, it might cause a NPE at line 293. Adding synchronization around the |
||
if (downstreamSubscriber == null) { | ||
return; | ||
} | ||
|
@@ -219,6 +237,23 @@ private void handleCancelState() { | |
} | ||
} | ||
|
||
/** | ||
* Handle when the {@link SplittingTransformer#resultFuture} is cancelled or completed exceptionally from the outside. Data | ||
* need to stop being sent to the upstream transformer immediately. One typical use case for this is transfer manager | ||
* needing to pause download by calling {@code .cancel(true)} on the future. | ||
* | ||
* @param e The exception the future was complete exceptionally with. | ||
*/ | ||
private void handleFutureCancel(Throwable e) { | ||
L-Applin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
synchronized (cancelLock) { | ||
publisherToUpstream.error(e); | ||
if (downstreamSubscriber != null) { | ||
downstreamSubscriber.onError(e); | ||
downstreamSubscriber = null; | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* The AsyncResponseTransformer for each of the individual requests that is sent back to the downstreamSubscriber when | ||
* requested. A future is created per request that is completed when onComplete is called on the subscriber for that request | ||
|
@@ -232,14 +267,23 @@ private class IndividualTransformer implements AsyncResponseTransformer<Response | |
public CompletableFuture<ResponseT> prepare() { | ||
this.individualFuture = new CompletableFuture<>(); | ||
if (preparedCalled.compareAndSet(false, true)) { | ||
if (isCancelled.get()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this logic here? |
||
return individualFuture; | ||
} | ||
CompletableFuture<ResultT> upstreamFuture = upstreamResponseTransformer.prepare(); | ||
if (!resultFuture.isDone()) { | ||
CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture); | ||
} | ||
} | ||
resultFuture.whenComplete((r, e) -> { | ||
if (e == null) { | ||
return; | ||
} | ||
individualFuture.completeExceptionally(e); | ||
}); | ||
individualFuture.whenComplete((r, e) -> { | ||
if (isCancelled.get()) { | ||
handleCancelState(); | ||
handleSubscriptionCancel(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you remind me why we need to check isCancelled here for every individual future? |
||
} | ||
}); | ||
return this.individualFuture; | ||
|
@@ -259,14 +303,16 @@ public void onStream(SdkPublisher<ByteBuffer> publisher) { | |
if (downstreamSubscriber == null) { | ||
return; | ||
} | ||
if (onStreamCalled.compareAndSet(false, true)) { | ||
log.trace(() -> "calling onStream on the upstream transformer"); | ||
upstreamResponseTransformer.onStream(upstreamSubscriber -> publisherToUpstream.subscribe( | ||
DelegatingBufferingSubscriber.builder() | ||
.maximumBufferInBytes(maximumBufferInBytes) | ||
.delegate(upstreamSubscriber) | ||
.build() | ||
)); | ||
synchronized (cancelLock) { | ||
if (onStreamCalled.compareAndSet(false, true)) { | ||
log.trace(() -> "calling onStream on the upstream transformer"); | ||
upstreamResponseTransformer.onStream(upstreamSubscriber -> publisherToUpstream.subscribe( | ||
DelegatingBufferingSubscriber.builder() | ||
.maximumBufferInBytes(maximumBufferInBytes) | ||
.delegate(upstreamSubscriber) | ||
.build() | ||
)); | ||
} | ||
} | ||
publisher.subscribe(new IndividualPartSubscriber<>(this.individualFuture, response)); | ||
} | ||
|
@@ -312,7 +358,9 @@ public void onNext(ByteBuffer byteBuffer) { | |
handleError(t); | ||
return; | ||
} | ||
subscription.request(1); | ||
if (!isCancelled.get()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar question as above, is it possible this is no longer true after we check it? If so, is it an issue? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need the check here assuming subscription is cancelled properly or will be cancelled at this point?
Note "eventually" not "right away" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we do need to stop right away in the case of the return future being cancelled, for example when pausing with transfer-manager There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would happen if we don't check it here and continue to send request demand? It should be no-op right? |
||
subscription.request(1); | ||
} | ||
}); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a test case for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding a unit test for this. Can we add a end-to-end functional test (wiremock test) to verify the cancellation behavior?