Skip to content

Commit dc44552

Browse files
committed
Merge branch 'master' into marshaller-refactor
2 parents 6e848bd + 4ff5bb5 commit dc44552

File tree

7 files changed

+337
-160
lines changed

7 files changed

+337
-160
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"category": "Apache HTTP Client",
3+
"type": "feature",
4+
"description": "Add support for idle connection reaping."
5+
}

flow/src/main/java/software/amazon/eventstream/Message.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public static Message decode(ByteBuffer buf) {
6666
* @param buf Data of message (including prelude which will be skipped over).
6767
* @return Decoded message
6868
*/
69-
public static Message decode(Prelude prelude, ByteBuffer buf) {
69+
static Message decode(Prelude prelude, ByteBuffer buf) {
7070
int totalLength = prelude.getTotalLength();
7171
validateMessageCrc(buf, totalLength);
7272
buf.position(buf.position() + Prelude.LENGTH_WITH_CRC);
@@ -135,16 +135,30 @@ public void encode(OutputStream os) {
135135
}
136136
}
137137

138-
private void encodeOrThrow(OutputStream os) throws IOException {
139-
ByteArrayOutputStream headersAndPayload = new ByteArrayOutputStream();
140-
{
141-
DataOutputStream dos = new DataOutputStream(headersAndPayload);
142-
for (Entry<String, HeaderValue> entry : headers.entrySet()) {
138+
/**
139+
* Encode the given {@code headers}, without any leading or trailing metadata such as checksums or lengths.
140+
*
141+
* @param headers a sequence of zero or more headers, which will be encoded in iteration order
142+
* @return a byte array corresponding to the {@code headers} section of a {@code Message}
143+
*/
144+
public static byte[] encodeHeaders(Iterable<Entry<String, HeaderValue>> headers) {
145+
try {
146+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
147+
DataOutputStream dos = new DataOutputStream(baos);
148+
for (Entry<String, HeaderValue> entry : headers) {
143149
Header.encode(entry, dos);
144150
}
145-
dos.write(payload);
146-
dos.flush();
151+
dos.close();
152+
return baos.toByteArray();
153+
} catch (IOException e) {
154+
throw new RuntimeException(e);
147155
}
156+
}
157+
158+
private void encodeOrThrow(OutputStream os) throws IOException {
159+
ByteArrayOutputStream headersAndPayload = new ByteArrayOutputStream();
160+
headersAndPayload.write(encodeHeaders(headers.entrySet()));
161+
headersAndPayload.write(payload);
148162

149163
int totalLength = Prelude.LENGTH_WITH_CRC + headersAndPayload.size() + 4;
150164

flow/src/main/java/software/amazon/eventstream/MessageDecoder.java

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package software.amazon.eventstream;
22

33
import java.nio.ByteBuffer;
4+
import java.util.ArrayList;
5+
import java.util.Collections;
6+
import java.util.List;
47
import java.util.function.Consumer;
58

69
/**
@@ -15,9 +18,27 @@ public final class MessageDecoder {
1518
private static final int INITIAL_BUFFER_SIZE = 2048 * 1024;
1619

1720
private final Consumer<Message> messageConsumer;
21+
private List<Message> bufferedOutput;
1822
private ByteBuffer buf;
1923
private Prelude currentPrelude;
2024

25+
/**
26+
* Creates a {@code MessageDecoder} instance that will buffer messages internally as they are decoded. Decoded
27+
* messages can be obtained by calling {@link #getDecodedMessages()}.
28+
*/
29+
public MessageDecoder() {
30+
this.messageConsumer = message -> this.bufferedOutput.add(message);
31+
this.bufferedOutput = new ArrayList<>();
32+
this.buf = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
33+
}
34+
35+
/**
36+
* Creates a {@code MessageDecoder} instance that will publish messages incrementally to the supplied {@code
37+
* messageConsumer} as they are decoded. The resulting instance does not support the {@link #getDecodedMessages()}
38+
* operation, and will throw an exception if it is invoked.
39+
*
40+
* @param messageConsumer a function that consumes {@link Message} instances
41+
*/
2142
public MessageDecoder(Consumer<Message> messageConsumer) {
2243
this(messageConsumer, INITIAL_BUFFER_SIZE);
2344
}
@@ -28,23 +49,51 @@ public MessageDecoder(Consumer<Message> messageConsumer) {
2849
MessageDecoder(Consumer<Message> messageConsumer, int initialBufferSize) {
2950
this.messageConsumer = messageConsumer;
3051
this.buf = ByteBuffer.allocate(initialBufferSize);
52+
this.bufferedOutput = null;
53+
}
54+
55+
/**
56+
* Returns {@link Message} instances that have been decoded since this method was last invoked. Note that this
57+
* method is only supported if this decoder was not configured to use a custom message consumer.
58+
*
59+
* @return all messages decoded since the last invocation of this method
60+
*/
61+
public List<Message> getDecodedMessages() {
62+
if (bufferedOutput == null) {
63+
throw new IllegalStateException("");
64+
}
65+
List<Message> ret = bufferedOutput;
66+
bufferedOutput = new ArrayList<>();
67+
return Collections.unmodifiableList(ret);
3168
}
3269

3370
public void feed(byte[] bytes) {
34-
feed(bytes, 0, bytes.length);
71+
feed(ByteBuffer.wrap(bytes));
3572
}
3673

3774
public void feed(byte[] bytes, int offset, int length) {
38-
int bytesToRead = Math.min(bytes.length, length + offset);
39-
int bytesConsumed = offset;
75+
feed(ByteBuffer.wrap(bytes, offset, length));
76+
}
77+
78+
/**
79+
* Feed the contents of the given {@link ByteBuffer} into this decoder. Messages will be incrementally decoded and
80+
* buffered or published to the message consumer (depending on configuration).
81+
*
82+
* @param byteBuffer a {@link ByteBuffer} whose entire contents will be read into the decoder's internal buffer
83+
* @return this {@code MessageDecoder} instance
84+
*/
85+
public MessageDecoder feed(ByteBuffer byteBuffer) {
86+
int bytesToRead = byteBuffer.remaining();
87+
int bytesConsumed = 0;
4088
while (bytesConsumed < bytesToRead) {
4189
ByteBuffer readView = updateReadView();
4290
if (currentPrelude == null) {
4391
// Put only 15 bytes into buffer and compute prelude.
4492
int numBytesToWrite = Math.min(15 - readView.remaining(),
45-
bytesToRead - bytesConsumed);
93+
bytesToRead - bytesConsumed);
94+
95+
feedBuf(byteBuffer, numBytesToWrite);
4696

47-
buf.put(bytes, bytesConsumed, numBytesToWrite);
4897
bytesConsumed += numBytesToWrite;
4998
readView = updateReadView();
5099

@@ -63,9 +112,9 @@ public void feed(byte[] bytes, int offset, int length) {
63112
if (currentPrelude != null) {
64113
// Only write up to what we need to decode the next message
65114
int numBytesToWrite = Math.min(currentPrelude.getTotalLength() - readView.remaining(),
66-
bytesToRead - bytesConsumed);
115+
bytesToRead - bytesConsumed);
67116

68-
buf.put(bytes, bytesConsumed, numBytesToWrite);
117+
feedBuf(byteBuffer, numBytesToWrite);
69118
bytesConsumed += numBytesToWrite;
70119
readView = updateReadView();
71120

@@ -77,6 +126,13 @@ public void feed(byte[] bytes, int offset, int length) {
77126
}
78127
}
79128
}
129+
130+
return this;
131+
}
132+
133+
private void feedBuf(ByteBuffer byteBuffer, int numBytesToWrite) {
134+
buf.put((ByteBuffer) byteBuffer.duplicate().limit(byteBuffer.position() + numBytesToWrite));
135+
byteBuffer.position(byteBuffer.position() + numBytesToWrite);
80136
}
81137

82138
private ByteBuffer updateReadView() {

flow/src/test/java/software/amazon/eventstream/MessageDecoderTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@
1414
import java.util.stream.IntStream;
1515
import org.hamcrest.Matchers;
1616
import org.junit.Assert;
17-
import org.junit.Ignore;
1817
import org.junit.Test;
1918

20-
@Ignore
2119
public class MessageDecoderTest {
2220
long SEED = 8912374098123423L;
2321

@@ -48,7 +46,7 @@ public void testDecoder() throws Exception {
4846
public void testDecoder_WithOffset() throws Exception {
4947
TestUtils utils = new TestUtils(SEED);
5048
Random rand = new Random(SEED);
51-
List<Message> expected = IntStream.range(0, 100_000)
49+
List<Message> expected = IntStream.range(0, 10_000)
5250
.mapToObj(x -> utils.randomMessage())
5351
.collect(Collectors.toList());
5452
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -61,6 +59,9 @@ public void testDecoder_WithOffset() throws Exception {
6159
MessageDecoder decoder = new MessageDecoder(actual::add);
6260
while (toRead > 0) {
6361
int length = rand.nextInt(100);
62+
if (read + length > data.length) {
63+
length = data.length - read;
64+
}
6465
decoder.feed(data, read, length);
6566
read += length;
6667
toRead -= length;
@@ -207,4 +208,4 @@ public void multipleLargeMessages_GrowsBufferAsNeeded() {
207208
assertThat(decoder.currentBufferSize(), greaterThan(9001));
208209
}
209210

210-
}
211+
}

http-clients/apache-client/src/main/java/software/amazon/awssdk/http/apache/ApacheHttpClient.java

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import software.amazon.awssdk.http.apache.internal.DefaultConfiguration;
7272
import software.amazon.awssdk.http.apache.internal.SdkProxyRoutePlanner;
7373
import software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory;
74+
import software.amazon.awssdk.http.apache.internal.conn.IdleConnectionReaper;
7475
import software.amazon.awssdk.http.apache.internal.conn.SdkConnectionKeepAliveStrategy;
7576
import software.amazon.awssdk.http.apache.internal.conn.SdkTlsSocketFactory;
7677
import software.amazon.awssdk.http.apache.internal.impl.ApacheHttpRequestFactory;
@@ -129,10 +130,10 @@ private ConnectionManagerAwareHttpClient createClient(ApacheHttpClient.DefaultBu
129130

130131
addProxyConfig(builder, configuration.proxyConfiguration);
131132

132-
// TODO idle connection reaper
133-
// if (.useReaper()) {
134-
// IdleConnectionReaper.registerConnectionManager(cm, settings.getMaxIdleConnectionTime());
135-
// }
133+
if (useIdleConnectionReaper(configuration)) {
134+
IdleConnectionReaper.getInstance().registerConnectionManager(
135+
cm, connectionMaxIdleTime(configuration).toMillis());
136+
}
136137

137138
return new software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient(builder.build(), cm);
138139
}
@@ -155,12 +156,19 @@ private void addProxyConfig(HttpClientBuilder builder,
155156
}
156157

157158
private ConnectionKeepAliveStrategy buildKeepAliveStrategy(ApacheHttpClient.DefaultBuilder configuration) {
158-
final long maxIdle = Optional.ofNullable(configuration.connectionMaxIdleTime)
159-
.orElse(DefaultConfiguration.MAX_IDLE_CONNECTION_TIME)
160-
.toMillis();
159+
final long maxIdle = connectionMaxIdleTime(configuration).toMillis();
161160
return maxIdle > 0 ? new SdkConnectionKeepAliveStrategy(maxIdle) : null;
162161
}
163162

163+
private Duration connectionMaxIdleTime(DefaultBuilder configuration) {
164+
return Optional.ofNullable(configuration.connectionMaxIdleTime)
165+
.orElse(DefaultConfiguration.MAX_IDLE_CONNECTION_TIME);
166+
}
167+
168+
private boolean useIdleConnectionReaper(DefaultBuilder configuration) {
169+
return Boolean.TRUE.equals(configuration.useIdleConnectionReaper);
170+
}
171+
164172
private boolean isAuthenticatedProxy(ProxyConfiguration proxyConfiguration) {
165173
return proxyConfiguration.username() != null && proxyConfiguration.password() != null;
166174
}
@@ -188,7 +196,9 @@ public void abort() {
188196

189197
@Override
190198
public void close() {
191-
httpClient.getHttpClientConnectionManager().shutdown();
199+
HttpClientConnectionManager cm = httpClient.getHttpClientConnectionManager();
200+
IdleConnectionReaper.getInstance().deregisterConnectionManager(cm);
201+
cm.shutdown();
192202
}
193203

194204
private SdkHttpFullResponse execute(HttpRequestBase apacheRequest) throws IOException {
@@ -307,6 +317,14 @@ public interface Builder extends SdkHttpClient.Builder<ApacheHttpClient.Builder>
307317
* Configure the maximum amount of time that a connection should be allowed to remain open while idle.
308318
*/
309319
Builder connectionMaxIdleTime(Duration maxIdleConnectionTimeout);
320+
321+
/**
322+
* Configure whether the idle connections in the connection pool should be closed asynchronously.
323+
* <p>
324+
* When enabled, connections left idling for longer than {@link #connectionMaxIdleTime(Duration)} will be
325+
* closed. If no value is set, the default value of {@link DefaultConfiguration#MAX_IDLE_CONNECTION_TIME} is used.
326+
*/
327+
Builder useIdleConnectionReaper(Boolean useConnectionReaper);
310328
}
311329

312330
private static final class DefaultBuilder implements Builder {
@@ -316,6 +334,7 @@ private static final class DefaultBuilder implements Builder {
316334
private Boolean expectContinueEnabled;
317335
private Duration connectionTimeToLive;
318336
private Duration connectionMaxIdleTime;
337+
private Boolean useIdleConnectionReaper;
319338

320339
private DefaultBuilder() {
321340
}
@@ -416,6 +435,16 @@ public void setConnectionMaxIdleTime(Duration connectionMaxIdleTime) {
416435
connectionMaxIdleTime(connectionMaxIdleTime);
417436
}
418437

438+
@Override
439+
public Builder useIdleConnectionReaper(Boolean useIdleConnectionReaper) {
440+
this.useIdleConnectionReaper = useIdleConnectionReaper;
441+
return this;
442+
}
443+
444+
public void setUseIdleConnectionReaper(Boolean useIdleConnectionReaper) {
445+
useIdleConnectionReaper(useIdleConnectionReaper);
446+
}
447+
419448
@Override
420449
public SdkHttpClient buildWithDefaults(AttributeMap serviceDefaults) {
421450
AttributeMap resolvedOptions = standardOptions.build().merge(serviceDefaults).merge(GLOBAL_HTTP_DEFAULTS);

0 commit comments

Comments
 (0)