Skip to content

Fixed the issue in S3 multipart client where the list of parts could … #4998

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AWSS3-b8d17fa.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS S3",
"contributor": "",
"description": "Fixed the issue in S3 multipart client where the list of parts could be out of order in CompleteMultipartRequest, causing `The list of parts was not in ascending order` error to be thrown."
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.JAVA_PROGRESS_LISTENER;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
Expand Down Expand Up @@ -59,12 +61,16 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
private final Collection<CompletableFuture<CompletedPart>> futures = new ConcurrentLinkedQueue<>();
private final PutObjectRequest putObjectRequest;
private final CompletableFuture<PutObjectResponse> returnFuture;
private final Map<Integer, CompletedPart> completedParts;
private final AtomicReferenceArray<CompletedPart> completedParts;
private final Map<Integer, CompletedPart> existingParts;
private final PublisherListener<Long> progressListener;
private Subscription subscription;
private volatile boolean isDone;
private volatile boolean isPaused;
/**
* Indicates whether CompleteMultipart has been initiated or not.
*/
private final AtomicBoolean completedMultipartInitiated = new AtomicBoolean(false);
private volatile CompletableFuture<CompleteMultipartUploadResponse> completeMpuFuture;

KnownContentLengthAsyncRequestBodySubscriber(MpuRequestContext mpuRequestContext,
Expand All @@ -75,9 +81,9 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
this.putObjectRequest = mpuRequestContext.request().left();
this.returnFuture = returnFuture;
this.uploadId = mpuRequestContext.uploadId();
this.existingParts = mpuRequestContext.existingParts();
this.existingParts = mpuRequestContext.existingParts() == null ? new HashMap<>() : mpuRequestContext.existingParts();
this.numExistingParts = NumericUtils.saturatedCast(mpuRequestContext.numPartsCompleted());
this.completedParts = new ConcurrentHashMap<>();
this.completedParts = new AtomicReferenceArray<>(partCount);
this.multipartUploadHelper = multipartUploadHelper;
this.progressListener = putObjectRequest.overrideConfiguration().map(c -> c.executionAttributes()
.getAttribute(JAVA_PROGRESS_LISTENER))
Expand Down Expand Up @@ -154,8 +160,8 @@ public void onNext(AsyncRequestBody asyncRequestBody) {
partNumber.getAndIncrement(),
uploadId);

Consumer<CompletedPart> completedPartConsumer =
completedPart -> completedParts.put(completedPart.partNumber(), completedPart);
Consumer<CompletedPart> completedPartConsumer = completedPart -> completedParts.set(completedPart.partNumber() - 1,
completedPart);
multipartUploadHelper.sendIndividualUploadPartRequest(uploadId, completedPartConsumer, futures,
Pair.of(uploadRequest, asyncRequestBody), progressListener)
.whenComplete((r, t) -> {
Expand Down Expand Up @@ -193,15 +199,16 @@ public void onComplete() {
}

private void completeMultipartUploadIfFinished(int requestsInFlight) {
if (isDone && requestsInFlight == 0) {
if (isDone && requestsInFlight == 0 && completedMultipartInitiated.compareAndSet(false, true)) {
Copy link
Contributor Author

@zoewangg zoewangg Mar 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added back this check. It was removed as part of the refactoring for some reason.

CompletedPart[] parts;
if (existingParts.isEmpty()) {
parts = completedParts.values().toArray(new CompletedPart[0]);
} else if (!completedParts.isEmpty()) {
parts =
IntStream.range(0, completedParts.length())
.mapToObj(completedParts::get)
.toArray(CompletedPart[]::new);
} else {
// List of CompletedParts needs to be in ascending order
parts = mergeCompletedParts();
} else {
parts = existingParts.values().toArray(new CompletedPart[0]);
Comment on lines -203 to -204
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This else block is not needed since mergeCompletedParts should handle it

}
completeMpuFuture = multipartUploadHelper.completeMultipartUpload(returnFuture, uploadId, parts, putObjectRequest);
}
Expand All @@ -212,7 +219,7 @@ private CompletedPart[] mergeCompletedParts() {
int currPart = 1;
while (currPart < partCount + 1) {
CompletedPart completedPart = existingParts.containsKey(currPart) ? existingParts.get(currPart) :
completedParts.get(currPart);
completedParts.get(currPart - 1);
merged[currPart - 1] = completedPart;
currPart++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

package software.amazon.awssdk.services.s3.internal.multipart;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import software.amazon.awssdk.annotations.SdkInternalApi;
Expand Down Expand Up @@ -94,7 +93,7 @@ public String uploadId() {
}

public Map<Integer, CompletedPart> existingParts() {
return existingParts != null ? Collections.unmodifiableMap(existingParts) : null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this since this seems a bit overkill for an internal API.

return existingParts;
}

public static final class Builder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ private void uploadFromBeginning(Pair<PutObjectRequest, AsyncRequestBody> reques
.contentLength(contentLength)
.partSize(partSize)
.uploadId(uploadId)
.existingParts(new ConcurrentHashMap<>())
.numPartsCompleted(numPartsCompleted)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.multipart.S3ResumeToken;
import software.amazon.awssdk.testutils.RandomTempFile;
import software.amazon.awssdk.utils.Pair;
Expand Down Expand Up @@ -100,7 +101,8 @@ void pause_withUninitiatedCompleteMpuFuture_shouldReturnToken() {
private S3ResumeToken configureSubscriberAndPause(int numExistingParts,
CompletableFuture<CompleteMultipartUploadResponse> completeMpuFuture) {
Map<Integer, CompletedPart> existingParts = existingParts(numExistingParts);
KnownContentLengthAsyncRequestBodySubscriber subscriber = subscriber(putObjectRequest, asyncRequestBody, existingParts);
KnownContentLengthAsyncRequestBodySubscriber subscriber = subscriber(putObjectRequest, asyncRequestBody, existingParts,
new CompletableFuture<>());

when(multipartUploadHelper.completeMultipartUpload(any(CompletableFuture.class), any(String.class),
any(CompletedPart[].class), any(PutObjectRequest.class)))
Expand All @@ -111,7 +113,8 @@ private S3ResumeToken configureSubscriberAndPause(int numExistingParts,

private KnownContentLengthAsyncRequestBodySubscriber subscriber(PutObjectRequest putObjectRequest,
AsyncRequestBody asyncRequestBody,
Map<Integer, CompletedPart> existingParts) {
Map<Integer, CompletedPart> existingParts,
CompletableFuture<PutObjectResponse> returnFuture) {

MpuRequestContext mpuRequestContext = MpuRequestContext.builder()
.request(Pair.of(putObjectRequest, asyncRequestBody))
Expand All @@ -122,7 +125,7 @@ private KnownContentLengthAsyncRequestBodySubscriber subscriber(PutObjectRequest
.numPartsCompleted((long) existingParts.size())
.build();

return new KnownContentLengthAsyncRequestBodySubscriber(mpuRequestContext, new CompletableFuture<>(), multipartUploadHelper);
return new KnownContentLengthAsyncRequestBodySubscriber(mpuRequestContext, returnFuture, multipartUploadHelper);
}

private Map<Integer, CompletedPart> existingParts(int numExistingParts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,9 @@ void uploadObject_contentLengthExceedThresholdAndPartSize_shouldUseMPU(AsyncRequ

List<UploadPartRequest> actualRequests = requestArgumentCaptor.getAllValues();
List<AsyncRequestBody> actualRequestBodies = requestBodyArgumentCaptor.getAllValues();
assertThat(actualRequestBodies).hasSize(4);
assertThat(actualRequests).hasSize(4);
int numTotalParts = 4;
assertThat(actualRequestBodies).hasSize(numTotalParts);
assertThat(actualRequests).hasSize(numTotalParts);

for (int i = 0; i < actualRequests.size(); i++) {
UploadPartRequest request = actualRequests.get(i);
Expand All @@ -182,6 +183,12 @@ void uploadObject_contentLengthExceedThresholdAndPartSize_shouldUseMPU(AsyncRequ
assertThat(requestBody.contentLength()).hasValue(PART_SIZE);
}
}

ArgumentCaptor<CompleteMultipartUploadRequest> completeMpuArgumentCaptor = ArgumentCaptor.forClass(CompleteMultipartUploadRequest.class);
verify(s3AsyncClient).completeMultipartUpload(completeMpuArgumentCaptor.capture());

CompleteMultipartUploadRequest actualRequest = completeMpuArgumentCaptor.getValue();
assertThat(actualRequest.multipartUpload().parts()).isEqualTo(completedParts(numTotalParts));
}

/**
Expand Down Expand Up @@ -373,6 +380,40 @@ void uploadObject_withResumeToken_shouldInvokeListPartsAndSkipExistingParts(int
assertThat(actualRequest.multipartUpload().parts()).isEqualTo(completedParts(numTotalParts));
}

@Test
void uploadObject_partsFinishedOutOfOrder_shouldSortThemInCompleteMultipart() {
int numTotalParts = 4;
PutObjectRequest putObjectRequest = putObjectRequest(MPU_CONTENT_SIZE);

stubSuccessfulCreateMultipartCall(UPLOAD_ID, s3AsyncClient);
stubSuccessfulCompleteMultipartCall(BUCKET, KEY, s3AsyncClient);

CompletableFuture<UploadPartResponse> part1Future = new CompletableFuture<>();
CompletableFuture<UploadPartResponse> part2Future = new CompletableFuture<>();
CompletableFuture<UploadPartResponse> part3Future = new CompletableFuture<>();
CompletableFuture<UploadPartResponse> part4Future = new CompletableFuture<>();

when(s3AsyncClient.uploadPart(any(UploadPartRequest.class), any(AsyncRequestBody.class))).thenReturn(part1Future)
.thenReturn(part2Future)
.thenReturn(part3Future)
.thenReturn(part4Future);
CompletableFuture<PutObjectResponse> returnFuture = uploadHelper.uploadObject(putObjectRequest,
AsyncRequestBody.fromBytes(RandomStringUtils.randomAscii((int) MPU_CONTENT_SIZE).getBytes(StandardCharsets.UTF_8)));

part4Future.complete(UploadPartResponse.builder().build());
part2Future.complete(UploadPartResponse.builder().build());
part3Future.complete(UploadPartResponse.builder().build());
part1Future.complete(UploadPartResponse.builder().build());

returnFuture.join();

ArgumentCaptor<CompleteMultipartUploadRequest> completeMpuArgumentCaptor = ArgumentCaptor.forClass(CompleteMultipartUploadRequest.class);
verify(s3AsyncClient).completeMultipartUpload(completeMpuArgumentCaptor.capture());

CompleteMultipartUploadRequest actualRequest = completeMpuArgumentCaptor.getValue();
assertThat(actualRequest.multipartUpload().parts()).isEqualTo(completedParts(numTotalParts));
}

private List<CompletedPart> completedParts(int totalNumParts) {
return IntStream.range(1, totalNumParts + 1).mapToObj(i -> CompletedPart.builder().partNumber(i).build()).collect(Collectors.toList());
}
Expand Down