Skip to content

Commit dc315da

Browse files
committed
S3 Async GetObject Bytes: Reduce memory & array-copying
1 parent 69f7191 commit dc315da

File tree

5 files changed

+93
-11
lines changed

5 files changed

+93
-11
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
import java.io.InputStream;
2020
import java.nio.ByteBuffer;
2121
import java.nio.file.Path;
22+
import java.util.Optional;
2223
import java.util.concurrent.CompletableFuture;
2324
import java.util.function.Consumer;
25+
import java.util.function.Function;
2426
import software.amazon.awssdk.annotations.SdkPublicApi;
2527
import software.amazon.awssdk.core.FileTransformerConfiguration;
2628
import software.amazon.awssdk.core.ResponseBytes;
@@ -202,7 +204,11 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(
202204
* @return AsyncResponseTransformer instance.
203205
*/
204206
static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>> toBytes() {
205-
return new ByteArrayAsyncResponseTransformer<>();
207+
return new ByteArrayAsyncResponseTransformer<>(Optional.empty());
208+
}
209+
210+
static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>> toBytes(Function<ResponseT, Integer> f) {
211+
return new ByteArrayAsyncResponseTransformer<>(Optional.of(f));
206212
}
207213

208214
/**

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515

1616
package software.amazon.awssdk.core.internal.async;
1717

18+
import static software.amazon.awssdk.utils.BinaryUtils.copyBytes;
1819
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
1920

2021
import java.io.ByteArrayOutputStream;
2122
import java.nio.ByteBuffer;
23+
import java.util.Optional;
2224
import java.util.concurrent.CompletableFuture;
25+
import java.util.function.Function;
2326
import org.reactivestreams.Subscriber;
2427
import org.reactivestreams.Subscription;
2528
import software.amazon.awssdk.annotations.SdkInternalApi;
@@ -41,13 +44,18 @@
4144
public final class ByteArrayAsyncResponseTransformer<ResponseT> implements
4245
AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>> {
4346

47+
private final Optional<Function<ResponseT, Integer>> knownSize;
4448
private volatile CompletableFuture<byte[]> cf;
4549
private volatile ResponseT response;
4650

51+
public ByteArrayAsyncResponseTransformer(Optional<Function<ResponseT, Integer>> knownSize) {
52+
this.knownSize = knownSize;
53+
}
54+
4755
@Override
4856
public CompletableFuture<ResponseBytes<ResponseT>> prepare() {
4957
cf = new CompletableFuture<>();
50-
return cf.thenApply(arr -> ResponseBytes.fromByteArray(response, arr));
58+
return cf.thenApply(arr -> ResponseBytes.fromByteArrayUnsafe(response, arr));
5159
}
5260

5361
@Override
@@ -57,23 +65,27 @@ public void onResponse(ResponseT response) {
5765

5866
@Override
5967
public void onStream(SdkPublisher<ByteBuffer> publisher) {
60-
publisher.subscribe(new BaosSubscriber(cf));
68+
ByteStore byteStore =
69+
knownSize.<ByteStore>map(f -> new KnownLengthStore(f.apply(response))).orElseGet(BaosStore::new);
70+
publisher.subscribe(new ByteSubscriber(cf, byteStore));
6171
}
6272

6373
@Override
6474
public void exceptionOccurred(Throwable throwable) {
6575
cf.completeExceptionally(throwable);
6676
}
6777

68-
static class BaosSubscriber implements Subscriber<ByteBuffer> {
78+
79+
static class ByteSubscriber implements Subscriber<ByteBuffer> {
6980
private final CompletableFuture<byte[]> resultFuture;
7081

71-
private ByteArrayOutputStream baos = new ByteArrayOutputStream();
82+
private ByteStore byteStore;
7283

7384
private Subscription subscription;
7485

75-
BaosSubscriber(CompletableFuture<byte[]> resultFuture) {
86+
ByteSubscriber(CompletableFuture<byte[]> resultFuture, ByteStore byteStore) {
7687
this.resultFuture = resultFuture;
88+
this.byteStore = byteStore;
7789
}
7890

7991
@Override
@@ -88,19 +100,54 @@ public void onSubscribe(Subscription s) {
88100

89101
@Override
90102
public void onNext(ByteBuffer byteBuffer) {
91-
invokeSafely(() -> baos.write(BinaryUtils.copyBytesFrom(byteBuffer)));
103+
byteStore.append(byteBuffer);
92104
subscription.request(1);
93105
}
94106

95107
@Override
96108
public void onError(Throwable throwable) {
97-
baos = null;
109+
byteStore = null;
98110
resultFuture.completeExceptionally(throwable);
99111
}
100112

101113
@Override
102114
public void onComplete() {
103-
resultFuture.complete(baos.toByteArray());
115+
resultFuture.complete(byteStore.toByteArray());
116+
}
117+
}
118+
119+
interface ByteStore {
120+
void append(ByteBuffer byteBuffer);
121+
122+
byte[] toByteArray();
123+
}
124+
125+
static class BaosStore implements ByteStore {
126+
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
127+
128+
public void append(ByteBuffer byteBuffer) {
129+
invokeSafely(() -> baos.write(BinaryUtils.copyBytesFrom(byteBuffer)));
130+
}
131+
132+
public byte[] toByteArray() {
133+
return baos.toByteArray();
134+
}
135+
}
136+
137+
static class KnownLengthStore implements ByteStore {
138+
private final byte[] byteArray;
139+
private int offset = 0;
140+
141+
KnownLengthStore(int contentSize) {
142+
this.byteArray = new byte[contentSize];
143+
}
144+
145+
public void append(ByteBuffer byteBuffer) {
146+
offset += copyBytes(byteBuffer, byteArray, offset);
147+
}
148+
149+
public byte[] toByteArray() {
150+
return byteArray;
104151
}
105152
}
106153
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package software.amazon.awssdk.services.s3;
2+
3+
import software.amazon.awssdk.core.ResponseBytes;
4+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
5+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
6+
7+
public class AsyncS3ResponseTransformer {
8+
public static AsyncResponseTransformer<GetObjectResponse, ResponseBytes<GetObjectResponse>> toBytes() {
9+
return AsyncResponseTransformer.toBytes(r -> r.contentLength().intValue());
10+
}
11+
}

test/sdk-native-image-test/src/main/java/software/amazon/awssdk/nativeimagetest/S3TestRunner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
import java.util.UUID;
2020
import org.slf4j.Logger;
2121
import org.slf4j.LoggerFactory;
22-
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
2322
import software.amazon.awssdk.core.sync.RequestBody;
23+
import software.amazon.awssdk.services.s3.AsyncS3ResponseTransformer;
2424
import software.amazon.awssdk.services.s3.S3AsyncClient;
2525
import software.amazon.awssdk.services.s3.S3Client;
2626
import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
@@ -54,7 +54,7 @@ public void runTests() {
5454
requestBody);
5555

5656
s3NettyClient.getObject(b -> b.bucket(BUCKET_NAME).key(KEY),
57-
AsyncResponseTransformer.toBytes()).join();
57+
AsyncS3ResponseTransformer.toBytes()).join();
5858

5959
} finally {
6060
if (bucketResponse != null) {

utils/src/main/java/software/amazon/awssdk/utils/BinaryUtils.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,4 +304,22 @@ public static byte[] copyBytesFrom(ByteBuffer bb, int readLimit) {
304304
return dst;
305305
}
306306

307+
308+
/**
309+
* This behaves identically to {@link software.amazon.awssdk.utils.BinaryUtils#copyBytesFrom(ByteBuffer)}, except
310+
* that the bytes are copied to the supplied destination array, at the supplied destination offset.
311+
*/
312+
public static int copyBytes(ByteBuffer bb, byte[] dest, int destOffset) {
313+
if (bb == null) {
314+
return 0;
315+
}
316+
317+
int remaining = bb.remaining();
318+
if (bb.hasArray()) {
319+
System.arraycopy(bb.array(), bb.arrayOffset() + bb.position(), dest, destOffset, remaining);
320+
} else {
321+
bb.asReadOnlyBuffer().get(dest, destOffset, remaining);
322+
}
323+
return remaining;
324+
}
307325
}

0 commit comments

Comments
 (0)