Skip to content

Commit 238c9a7

Browse files
authored
Fixed the issue in S3 multipart client where the list of parts could be out of order in the CompleteMultipartRequest, causing "The list of parts was not in ascending order" error to be thrown. (#4998)
1 parent 5b7ec50 commit 238c9a7

File tree

6 files changed

+75
-20
lines changed

6 files changed

+75
-20
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS S3",
4+
"contributor": "",
5+
"description": "Fixed the issue in S3 multipart client where the list of parts could be out of order in CompleteMultipartRequest, causing `The list of parts was not in ascending order` error to be thrown."
6+
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriber.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.JAVA_PROGRESS_LISTENER;
1919

2020
import java.util.Collection;
21+
import java.util.HashMap;
2122
import java.util.Map;
2223
import java.util.concurrent.CompletableFuture;
23-
import java.util.concurrent.ConcurrentHashMap;
2424
import java.util.concurrent.ConcurrentLinkedQueue;
2525
import java.util.concurrent.atomic.AtomicBoolean;
2626
import java.util.concurrent.atomic.AtomicInteger;
27+
import java.util.concurrent.atomic.AtomicReferenceArray;
2728
import java.util.function.Consumer;
29+
import java.util.stream.IntStream;
2830
import org.reactivestreams.Subscriber;
2931
import org.reactivestreams.Subscription;
3032
import software.amazon.awssdk.annotations.SdkInternalApi;
@@ -59,12 +61,16 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
5961
private final Collection<CompletableFuture<CompletedPart>> futures = new ConcurrentLinkedQueue<>();
6062
private final PutObjectRequest putObjectRequest;
6163
private final CompletableFuture<PutObjectResponse> returnFuture;
62-
private final Map<Integer, CompletedPart> completedParts;
64+
private final AtomicReferenceArray<CompletedPart> completedParts;
6365
private final Map<Integer, CompletedPart> existingParts;
6466
private final PublisherListener<Long> progressListener;
6567
private Subscription subscription;
6668
private volatile boolean isDone;
6769
private volatile boolean isPaused;
70+
/**
71+
* Indicates whether CompleteMultipart has been initiated or not.
72+
*/
73+
private final AtomicBoolean completedMultipartInitiated = new AtomicBoolean(false);
6874
private volatile CompletableFuture<CompleteMultipartUploadResponse> completeMpuFuture;
6975

7076
KnownContentLengthAsyncRequestBodySubscriber(MpuRequestContext mpuRequestContext,
@@ -75,9 +81,9 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
7581
this.putObjectRequest = mpuRequestContext.request().left();
7682
this.returnFuture = returnFuture;
7783
this.uploadId = mpuRequestContext.uploadId();
78-
this.existingParts = mpuRequestContext.existingParts();
84+
this.existingParts = mpuRequestContext.existingParts() == null ? new HashMap<>() : mpuRequestContext.existingParts();
7985
this.numExistingParts = NumericUtils.saturatedCast(mpuRequestContext.numPartsCompleted());
80-
this.completedParts = new ConcurrentHashMap<>();
86+
this.completedParts = new AtomicReferenceArray<>(partCount);
8187
this.multipartUploadHelper = multipartUploadHelper;
8288
this.progressListener = putObjectRequest.overrideConfiguration().map(c -> c.executionAttributes()
8389
.getAttribute(JAVA_PROGRESS_LISTENER))
@@ -154,8 +160,8 @@ public void onNext(AsyncRequestBody asyncRequestBody) {
154160
partNumber.getAndIncrement(),
155161
uploadId);
156162

157-
Consumer<CompletedPart> completedPartConsumer =
158-
completedPart -> completedParts.put(completedPart.partNumber(), completedPart);
163+
Consumer<CompletedPart> completedPartConsumer = completedPart -> completedParts.set(completedPart.partNumber() - 1,
164+
completedPart);
159165
multipartUploadHelper.sendIndividualUploadPartRequest(uploadId, completedPartConsumer, futures,
160166
Pair.of(uploadRequest, asyncRequestBody), progressListener)
161167
.whenComplete((r, t) -> {
@@ -193,15 +199,16 @@ public void onComplete() {
193199
}
194200

195201
private void completeMultipartUploadIfFinished(int requestsInFlight) {
196-
if (isDone && requestsInFlight == 0) {
202+
if (isDone && requestsInFlight == 0 && completedMultipartInitiated.compareAndSet(false, true)) {
197203
CompletedPart[] parts;
198204
if (existingParts.isEmpty()) {
199-
parts = completedParts.values().toArray(new CompletedPart[0]);
200-
} else if (!completedParts.isEmpty()) {
205+
parts =
206+
IntStream.range(0, completedParts.length())
207+
.mapToObj(completedParts::get)
208+
.toArray(CompletedPart[]::new);
209+
} else {
201210
// List of CompletedParts needs to be in ascending order
202211
parts = mergeCompletedParts();
203-
} else {
204-
parts = existingParts.values().toArray(new CompletedPart[0]);
205212
}
206213
completeMpuFuture = multipartUploadHelper.completeMultipartUpload(returnFuture, uploadId, parts, putObjectRequest);
207214
}
@@ -212,7 +219,7 @@ private CompletedPart[] mergeCompletedParts() {
212219
int currPart = 1;
213220
while (currPart < partCount + 1) {
214221
CompletedPart completedPart = existingParts.containsKey(currPart) ? existingParts.get(currPart) :
215-
completedParts.get(currPart);
222+
completedParts.get(currPart - 1);
216223
merged[currPart - 1] = completedPart;
217224
currPart++;
218225
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MpuRequestContext.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
package software.amazon.awssdk.services.s3.internal.multipart;
1717

18-
import java.util.Collections;
1918
import java.util.Map;
2019
import java.util.Objects;
2120
import software.amazon.awssdk.annotations.SdkInternalApi;
@@ -94,7 +93,7 @@ public String uploadId() {
9493
}
9594

9695
public Map<Integer, CompletedPart> existingParts() {
97-
return existingParts != null ? Collections.unmodifiableMap(existingParts) : null;
96+
return existingParts;
9897
}
9998

10099
public static final class Builder {

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ private void uploadFromBeginning(Pair<PutObjectRequest, AsyncRequestBody> reques
136136
.contentLength(contentLength)
137137
.partSize(partSize)
138138
.uploadId(uploadId)
139-
.existingParts(new ConcurrentHashMap<>())
140139
.numPartsCompleted(numPartsCompleted)
141140
.build();
142141

services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriberTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
3434
import software.amazon.awssdk.services.s3.model.CompletedPart;
3535
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
36+
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
3637
import software.amazon.awssdk.services.s3.multipart.S3ResumeToken;
3738
import software.amazon.awssdk.testutils.RandomTempFile;
3839
import software.amazon.awssdk.utils.Pair;
@@ -100,7 +101,8 @@ void pause_withUninitiatedCompleteMpuFuture_shouldReturnToken() {
100101
private S3ResumeToken configureSubscriberAndPause(int numExistingParts,
101102
CompletableFuture<CompleteMultipartUploadResponse> completeMpuFuture) {
102103
Map<Integer, CompletedPart> existingParts = existingParts(numExistingParts);
103-
KnownContentLengthAsyncRequestBodySubscriber subscriber = subscriber(putObjectRequest, asyncRequestBody, existingParts);
104+
KnownContentLengthAsyncRequestBodySubscriber subscriber = subscriber(putObjectRequest, asyncRequestBody, existingParts,
105+
new CompletableFuture<>());
104106

105107
when(multipartUploadHelper.completeMultipartUpload(any(CompletableFuture.class), any(String.class),
106108
any(CompletedPart[].class), any(PutObjectRequest.class)))
@@ -111,7 +113,8 @@ private S3ResumeToken configureSubscriberAndPause(int numExistingParts,
111113

112114
private KnownContentLengthAsyncRequestBodySubscriber subscriber(PutObjectRequest putObjectRequest,
113115
AsyncRequestBody asyncRequestBody,
114-
Map<Integer, CompletedPart> existingParts) {
116+
Map<Integer, CompletedPart> existingParts,
117+
CompletableFuture<PutObjectResponse> returnFuture) {
115118

116119
MpuRequestContext mpuRequestContext = MpuRequestContext.builder()
117120
.request(Pair.of(putObjectRequest, asyncRequestBody))
@@ -122,7 +125,7 @@ private KnownContentLengthAsyncRequestBodySubscriber subscriber(PutObjectRequest
122125
.numPartsCompleted((long) existingParts.size())
123126
.build();
124127

125-
return new KnownContentLengthAsyncRequestBodySubscriber(mpuRequestContext, new CompletableFuture<>(), multipartUploadHelper);
128+
return new KnownContentLengthAsyncRequestBodySubscriber(mpuRequestContext, returnFuture, multipartUploadHelper);
126129
}
127130

128131
private Map<Integer, CompletedPart> existingParts(int numExistingParts) {

services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/UploadObjectHelperTest.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,9 @@ void uploadObject_contentLengthExceedThresholdAndPartSize_shouldUseMPU(AsyncRequ
166166

167167
List<UploadPartRequest> actualRequests = requestArgumentCaptor.getAllValues();
168168
List<AsyncRequestBody> actualRequestBodies = requestBodyArgumentCaptor.getAllValues();
169-
assertThat(actualRequestBodies).hasSize(4);
170-
assertThat(actualRequests).hasSize(4);
169+
int numTotalParts = 4;
170+
assertThat(actualRequestBodies).hasSize(numTotalParts);
171+
assertThat(actualRequests).hasSize(numTotalParts);
171172

172173
for (int i = 0; i < actualRequests.size(); i++) {
173174
UploadPartRequest request = actualRequests.get(i);
@@ -182,6 +183,12 @@ void uploadObject_contentLengthExceedThresholdAndPartSize_shouldUseMPU(AsyncRequ
182183
assertThat(requestBody.contentLength()).hasValue(PART_SIZE);
183184
}
184185
}
186+
187+
ArgumentCaptor<CompleteMultipartUploadRequest> completeMpuArgumentCaptor = ArgumentCaptor.forClass(CompleteMultipartUploadRequest.class);
188+
verify(s3AsyncClient).completeMultipartUpload(completeMpuArgumentCaptor.capture());
189+
190+
CompleteMultipartUploadRequest actualRequest = completeMpuArgumentCaptor.getValue();
191+
assertThat(actualRequest.multipartUpload().parts()).isEqualTo(completedParts(numTotalParts));
185192
}
186193

187194
/**
@@ -373,6 +380,40 @@ void uploadObject_withResumeToken_shouldInvokeListPartsAndSkipExistingParts(int
373380
assertThat(actualRequest.multipartUpload().parts()).isEqualTo(completedParts(numTotalParts));
374381
}
375382

383+
@Test
384+
void uploadObject_partsFinishedOutOfOrder_shouldSortThemInCompleteMultipart() {
385+
int numTotalParts = 4;
386+
PutObjectRequest putObjectRequest = putObjectRequest(MPU_CONTENT_SIZE);
387+
388+
stubSuccessfulCreateMultipartCall(UPLOAD_ID, s3AsyncClient);
389+
stubSuccessfulCompleteMultipartCall(BUCKET, KEY, s3AsyncClient);
390+
391+
CompletableFuture<UploadPartResponse> part1Future = new CompletableFuture<>();
392+
CompletableFuture<UploadPartResponse> part2Future = new CompletableFuture<>();
393+
CompletableFuture<UploadPartResponse> part3Future = new CompletableFuture<>();
394+
CompletableFuture<UploadPartResponse> part4Future = new CompletableFuture<>();
395+
396+
when(s3AsyncClient.uploadPart(any(UploadPartRequest.class), any(AsyncRequestBody.class))).thenReturn(part1Future)
397+
.thenReturn(part2Future)
398+
.thenReturn(part3Future)
399+
.thenReturn(part4Future);
400+
CompletableFuture<PutObjectResponse> returnFuture = uploadHelper.uploadObject(putObjectRequest,
401+
AsyncRequestBody.fromBytes(RandomStringUtils.randomAscii((int) MPU_CONTENT_SIZE).getBytes(StandardCharsets.UTF_8)));
402+
403+
part4Future.complete(UploadPartResponse.builder().build());
404+
part2Future.complete(UploadPartResponse.builder().build());
405+
part3Future.complete(UploadPartResponse.builder().build());
406+
part1Future.complete(UploadPartResponse.builder().build());
407+
408+
returnFuture.join();
409+
410+
ArgumentCaptor<CompleteMultipartUploadRequest> completeMpuArgumentCaptor = ArgumentCaptor.forClass(CompleteMultipartUploadRequest.class);
411+
verify(s3AsyncClient).completeMultipartUpload(completeMpuArgumentCaptor.capture());
412+
413+
CompleteMultipartUploadRequest actualRequest = completeMpuArgumentCaptor.getValue();
414+
assertThat(actualRequest.multipartUpload().parts()).isEqualTo(completedParts(numTotalParts));
415+
}
416+
376417
private List<CompletedPart> completedParts(int totalNumParts) {
377418
return IntStream.range(1, totalNumParts + 1).mapToObj(i -> CompletedPart.builder().partNumber(i).build()).collect(Collectors.toList());
378419
}

0 commit comments

Comments
 (0)