Skip to content

Merge streaming branch into base branch #4370

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,9 @@ public static CodeBlock create(OperationModel operationModel, IntermediateModel
return CodeBlock.of("");
}

// TODO : remove once request compression for streaming operations is supported
if (operationModel.isStreaming()) {
throw new IllegalStateException("Request compression for streaming operations is not yet supported in the AWS SDK "
+ "for Java.");
}

// TODO : remove once S3 checksum interceptors are moved to occur after CompressRequestStage
// TODO : remove once:
// 1) S3 checksum interceptors are moved to occur after CompressRequestStage
// 2) Transfer-Encoding:chunked is supported in S3
if (model.getMetadata().getServiceName().equals("S3")) {
throw new IllegalStateException("Request compression for S3 is not yet supported in the AWS SDK for Java.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
@SdkInternalApi
public final class AwsSignedChunkedEncodingInputStream extends AwsChunkedEncodingInputStream {

private static final String CRLF = "\r\n";
private static final String CHUNK_SIGNATURE_HEADER = ";chunk-signature=";
private static final String CHECKSUM_SIGNATURE_HEADER = "x-amz-trailer-signature:";
private String previousChunkSignature;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,28 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.builder.SdkBuilder;

/**
* Class that will buffer incoming BufferBytes of totalBytes length to chunks of bufferSize*
* Class that will buffer incoming BufferBytes to chunks of bufferSize.
* If totalBytes is not provided, i.e. content-length is unknown, {@link #getBufferedData()} should be used in the Subscriber's
* {@code onComplete()} to check for a final chunk that is smaller than the chunk size, and send if present.
*/
@SdkInternalApi
public final class ChunkBuffer {
private static final Logger log = Logger.loggerFor(ChunkBuffer.class);
private final AtomicLong transferredBytes;
private final ByteBuffer currentBuffer;
private final int chunkSize;
private final long totalBytes;
private final Long totalBytes;

private ChunkBuffer(Long totalBytes, Integer bufferSize) {
Validate.notNull(totalBytes, "The totalBytes must not be null");

int chunkSize = bufferSize != null ? bufferSize : DEFAULT_ASYNC_CHUNK_SIZE;
this.chunkSize = chunkSize;
this.currentBuffer = ByteBuffer.allocate(chunkSize);
Expand All @@ -52,14 +54,12 @@ public static Builder builder() {
return new DefaultBuilder();
}


/**
* Split the input {@link ByteBuffer} into multiple smaller {@link ByteBuffer}s, each of which contains {@link #chunkSize}
* worth of bytes. If the last chunk of the input ByteBuffer contains less than {@link #chunkSize} data, the last chunk will
* be buffered.
*/
public synchronized Iterable<ByteBuffer> split(ByteBuffer inputByteBuffer) {

if (!inputByteBuffer.hasRemaining()) {
return Collections.singletonList(inputByteBuffer);
}
Expand All @@ -71,7 +71,7 @@ public synchronized Iterable<ByteBuffer> split(ByteBuffer inputByteBuffer) {
fillCurrentBuffer(inputByteBuffer);

if (isCurrentBufferFull()) {
addCurrentBufferToIterable(byteBuffers, chunkSize);
addCurrentBufferToIterable(byteBuffers);
}
}

Expand All @@ -82,8 +82,7 @@ public synchronized Iterable<ByteBuffer> split(ByteBuffer inputByteBuffer) {

// If this is the last chunk, add data buffered to the iterable
if (isLastChunk()) {
int remainingBytesInBuffer = currentBuffer.position();
addCurrentBufferToIterable(byteBuffers, remainingBytesInBuffer);
addCurrentBufferToIterable(byteBuffers);
}
return byteBuffers;
}
Expand Down Expand Up @@ -111,19 +110,38 @@ private void splitRemainingInputByteBuffer(ByteBuffer inputByteBuffer, List<Byte
}
}

/**
* Retrieve the current buffered data.
*/
public Optional<ByteBuffer> getBufferedData() {
int remainingBytesInBuffer = currentBuffer.position();

if (remainingBytesInBuffer == 0) {
return Optional.empty();
}

ByteBuffer bufferedChunk = ByteBuffer.allocate(remainingBytesInBuffer);
currentBuffer.flip();
bufferedChunk.put(currentBuffer);
bufferedChunk.flip();
return Optional.of(bufferedChunk);
}

private boolean isLastChunk() {
if (totalBytes == null) {
return false;
}
long remainingBytes = totalBytes - transferredBytes.get();
return remainingBytes != 0 && remainingBytes == currentBuffer.position();
}

private void addCurrentBufferToIterable(List<ByteBuffer> byteBuffers, int capacity) {
ByteBuffer bufferedChunk = ByteBuffer.allocate(capacity);
currentBuffer.flip();
bufferedChunk.put(currentBuffer);
bufferedChunk.flip();
byteBuffers.add(bufferedChunk);
transferredBytes.addAndGet(bufferedChunk.remaining());
currentBuffer.clear();
private void addCurrentBufferToIterable(List<ByteBuffer> byteBuffers) {
Optional<ByteBuffer> bufferedChunk = getBufferedData();
if (bufferedChunk.isPresent()) {
byteBuffers.add(bufferedChunk.get());
transferredBytes.addAndGet(bufferedChunk.get().remaining());
currentBuffer.clear();
}
}

private void fillCurrentBuffer(ByteBuffer inputByteBuffer) {
Expand Down Expand Up @@ -151,8 +169,6 @@ public interface Builder extends SdkBuilder<Builder, ChunkBuffer> {
Builder bufferSize(int bufferSize);

Builder totalBytes(long totalBytes);


}

private static final class DefaultBuilder implements Builder {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.async;

import static software.amazon.awssdk.core.internal.io.AwsChunkedInputStream.DEFAULT_CHUNK_SIZE;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.internal.compression.Compressor;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.async.DelegatingSubscriber;
import software.amazon.awssdk.utils.async.FlatteningSubscriber;
import software.amazon.awssdk.utils.builder.SdkBuilder;

/**
* Wrapper class to wrap an AsyncRequestBody.
* This will chunk and compress the payload with the provided {@link Compressor}.
*/
@SdkInternalApi
public class CompressionAsyncRequestBody implements AsyncRequestBody {

private final AsyncRequestBody wrapped;
private final Compressor compressor;
private final int chunkSize;

private CompressionAsyncRequestBody(DefaultBuilder builder) {
this.wrapped = Validate.paramNotNull(builder.asyncRequestBody, "asyncRequestBody");
this.compressor = Validate.paramNotNull(builder.compressor, "compressor");
this.chunkSize = builder.chunkSize != null ? builder.chunkSize : DEFAULT_CHUNK_SIZE;
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
Validate.notNull(s, "Subscription MUST NOT be null.");

SdkPublisher<Iterable<ByteBuffer>> split = split(wrapped);
SdkPublisher<ByteBuffer> flattening = flattening(split);
flattening.map(compressor::compress).subscribe(s);
}

@Override
public Optional<Long> contentLength() {
return wrapped.contentLength();
}

@Override
public String contentType() {
return wrapped.contentType();
}

private SdkPublisher<Iterable<ByteBuffer>> split(SdkPublisher<ByteBuffer> source) {
return subscriber -> source.subscribe(new SplittingSubscriber(subscriber, chunkSize));
}

private SdkPublisher<ByteBuffer> flattening(SdkPublisher<Iterable<ByteBuffer>> source) {
return subscriber -> source.subscribe(new FlatteningSubscriber<>(subscriber));
}

/**
* @return Builder instance to construct a {@link CompressionAsyncRequestBody}.
*/
public static Builder builder() {
return new DefaultBuilder();
}

public interface Builder extends SdkBuilder<CompressionAsyncRequestBody.Builder, CompressionAsyncRequestBody> {

/**
* Sets the AsyncRequestBody that will be wrapped.
* @param asyncRequestBody
* @return This builder for method chaining.
*/
Builder asyncRequestBody(AsyncRequestBody asyncRequestBody);

/**
* Sets the compressor to compress the request.
* @param compressor
* @return This builder for method chaining.
*/
Builder compressor(Compressor compressor);

/**
* Sets the chunk size. Default size is 128 * 1024.
* @param chunkSize
* @return This builder for method chaining.
*/
Builder chunkSize(Integer chunkSize);
}

private static final class DefaultBuilder implements Builder {

private AsyncRequestBody asyncRequestBody;
private Compressor compressor;
private Integer chunkSize;

@Override
public CompressionAsyncRequestBody build() {
return new CompressionAsyncRequestBody(this);
}

@Override
public Builder asyncRequestBody(AsyncRequestBody asyncRequestBody) {
this.asyncRequestBody = asyncRequestBody;
return this;
}

@Override
public Builder compressor(Compressor compressor) {
this.compressor = compressor;
return this;
}

@Override
public Builder chunkSize(Integer chunkSize) {
this.chunkSize = chunkSize;
return this;
}
}

private static final class SplittingSubscriber extends DelegatingSubscriber<ByteBuffer, Iterable<ByteBuffer>> {
private final ChunkBuffer chunkBuffer;
private final AtomicBoolean upstreamDone = new AtomicBoolean(false);
private final AtomicLong downstreamDemand = new AtomicLong();
private final Object lock = new Object();
private volatile boolean sentFinalChunk = false;

protected SplittingSubscriber(Subscriber<? super Iterable<ByteBuffer>> subscriber, int chunkSize) {
super(subscriber);
this.chunkBuffer = ChunkBuffer.builder()
.bufferSize(chunkSize)
.build();
}

@Override
public void onSubscribe(Subscription s) {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
if (n <= 0) {
throw new IllegalArgumentException("n > 0 required but it was " + n);
}

downstreamDemand.getAndAdd(n);

if (upstreamDone.get()) {
sendFinalChunk();
} else {
s.request(n);
}
}

@Override
public void cancel() {
s.cancel();
}
});
}

@Override
public void onNext(ByteBuffer byteBuffer) {
downstreamDemand.decrementAndGet();
Iterable<ByteBuffer> buffers = chunkBuffer.split(byteBuffer);
subscriber.onNext(buffers);
}

@Override
public void onComplete() {
upstreamDone.compareAndSet(false, true);
if (downstreamDemand.get() > 0) {
sendFinalChunk();
}
}

@Override
public void onError(Throwable t) {
upstreamDone.compareAndSet(false, true);
super.onError(t);
}

private void sendFinalChunk() {
synchronized (lock) {
if (!sentFinalChunk) {
sentFinalChunk = true;
Optional<ByteBuffer> byteBuffer = chunkBuffer.getBufferedData();
byteBuffer.ifPresent(buffer -> subscriber.onNext(Collections.singletonList(buffer)));
subscriber.onComplete();
}
}
}
}
}
Loading