Skip to content

Add trace logs for debugging multipart download #5434

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,14 @@ public void onError(Throwable t) {

@Override
public void onComplete() {
log.trace(() -> "onComplete");
// if write in progress, tell write to close on finish.
synchronized (this) {
if (writeInProgress) {
log.trace(() -> "writeInProgress = true, not closing");
closeOnLastWrite = true;
} else {
log.trace(() -> "writeInProgress = false, closing");
close();
}
}
Expand All @@ -264,6 +267,7 @@ private void close() {
if (fileChannel != null) {
invokeSafely(fileChannel::close);
}
log.trace(() -> "Completing File async transformer future future");
future.complete(null);
} catch (RuntimeException exception) {
future.completeExceptionally(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,12 @@ private SplittingTransformer(AsyncResponseTransformer<ResponseT, ResultT> upstre
maximumBufferSizeInBytes, "maximumBufferSizeInBytes");

this.resultFuture.whenComplete((r, e) -> {
log.trace(() -> "result future whenComplete");
if (e == null) {
return;
}
if (isCancelled.compareAndSet(false, true)) {
log.trace(() -> "result future whenComplete with isCancelled=true");
handleFutureCancel(e);
}
});
Expand Down Expand Up @@ -167,6 +169,7 @@ public void request(long n) {

@Override
public void cancel() {
log.trace(() -> String.format("received cancel signal. Current cancel state is 'isCancelled=%s'", isCancelled.get()));
if (isCancelled.compareAndSet(false, true)) {
log.trace(() -> "Cancelling splitting transformer");
handleSubscriptionCancel();
Expand Down Expand Up @@ -207,26 +210,31 @@ private boolean doEmit() {
/**
* Handle the {@code .cancel()} signal received from the downstream subscription. Data that is being sent to the upstream
* transformer need to finish processing before we complete. One typical use case for this is completing the multipart
* download, the subscriber having reached the final part will signal that it doesn't need more parts by calling {@code
* .cancel()} on the subscription.
* download, the subscriber having reached the final part will signal that it doesn't need more parts by calling
* {@code .cancel()} on the subscription.
*/
private void handleSubscriptionCancel() {
synchronized (cancelLock) {
if (downstreamSubscriber == null) {
log.trace(() -> "downstreamSubscriber already null, skipping downstreamSubscriber.onComplete()");
return;
}
if (!onStreamCalled.get()) {
// we never subscribe publisherToUpstream to the upstream, it would not complete
log.trace(() -> "publisherToUpstream never subscribed, skipping downstreamSubscriber.onComplete()");
downstreamSubscriber = null;
return;
}
log.trace(() -> "publisherToUpstream.complete()");
publisherToUpstream.complete().whenComplete((v, t) -> {
if (downstreamSubscriber == null) {
log.trace(() -> "[in future] downstreamSubscriber already null, skipping downstreamSubscriber.onComplete()");
return;
}
if (t != null) {
downstreamSubscriber.onError(t);
} else {
log.trace(() -> "[in future] calling downstreamSubscriber.onComplete");
downstreamSubscriber.onComplete();
}
downstreamSubscriber = null;
Expand All @@ -236,8 +244,8 @@ private void handleSubscriptionCancel() {

/**
* Handle when the {@link SplittingTransformer#resultFuture} is cancelled or completed exceptionally from the outside. Data
* need to stop being sent to the upstream transformer immediately. One typical use case for this is transfer manager
* needing to pause download by calling {@code .cancel(true)} on the future.
* need to stop being sent to the upstream transformer immediately. One typical use case for this is transfer manager needing
* to pause download by calling {@code .cancel(true)} on the future.
*
* @param e The exception the future was complete exceptionally with.
*/
Expand Down Expand Up @@ -281,6 +289,7 @@ public CompletableFuture<ResponseT> prepare() {
});
individualFuture.whenComplete((r, e) -> {
if (isCancelled.get()) {
log.trace(() -> "Individual future completed .");
handleSubscriptionCancel();
}
});
Expand Down Expand Up @@ -308,8 +317,8 @@ public void onStream(SdkPublisher<ByteBuffer> publisher) {
DelegatingBufferingSubscriber.builder()
.maximumBufferInBytes(maximumBufferInBytes)
.delegate(upstreamSubscriber)
.build()
));
.build())
);
}
}
publisher.subscribe(new IndividualPartSubscriber<>(this.individualFuture, response));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,13 @@ public final <ResultT> Download<ResultT> download(DownloadRequest<ResultT> downl
r -> CompletedDownload.builder()
.result(r)
.build());
future.whenComplete((r, e) -> {
if (e != null) {
log.error(() -> String.format("s3 client future completed exceptionally"), e);
} else {
log.trace(() -> String.format("s3 client future completed : %s", r));
}
});
} catch (Throwable throwable) {
returnFuture.completeExceptionally(throwable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ appender.console.layout.pattern = %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %
rootLogger.level = info
rootLogger.appenderRef.stdout.ref = ConsoleAppender

logger.split.name = software.amazon.awssdk.core.internal.async
logger.split.level = trace

logger.multi.name = software.amazon.awssdk.services.s3.internal.multipart
logger.multi.level = trace

# Uncomment below to enable more specific logging
#
#logger.sdk.name = software.amazon.awssdk
Expand Down
Loading