Skip to content

Add Message#encodeHeaders #753

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions flow/src/main/java/software/amazon/eventstream/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public static Message decode(ByteBuffer buf) {
* @param buf Data of message (including prelude which will be skipped over).
* @return Decoded message
*/
public static Message decode(Prelude prelude, ByteBuffer buf) {
static Message decode(Prelude prelude, ByteBuffer buf) {
int totalLength = prelude.getTotalLength();
validateMessageCrc(buf, totalLength);
buf.position(buf.position() + Prelude.LENGTH_WITH_CRC);
Expand Down Expand Up @@ -135,16 +135,30 @@ public void encode(OutputStream os) {
}
}

private void encodeOrThrow(OutputStream os) throws IOException {
ByteArrayOutputStream headersAndPayload = new ByteArrayOutputStream();
{
DataOutputStream dos = new DataOutputStream(headersAndPayload);
for (Entry<String, HeaderValue> entry : headers.entrySet()) {
/**
* Encode the given {@code headers}, without any leading or trailing metadata such as checksums or lengths.
*
* @param headers a sequence of zero or more headers, which will be encoded in iteration order
* @return a byte array corresponding to the {@code headers} section of a {@code Message}
*/
public static byte[] encodeHeaders(Iterable<Entry<String, HeaderValue>> headers) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
for (Entry<String, HeaderValue> entry : headers) {
Header.encode(entry, dos);
}
dos.write(payload);
dos.flush();
dos.close();
return baos.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private void encodeOrThrow(OutputStream os) throws IOException {
ByteArrayOutputStream headersAndPayload = new ByteArrayOutputStream();
headersAndPayload.write(encodeHeaders(headers.entrySet()));
headersAndPayload.write(payload);

int totalLength = Prelude.LENGTH_WITH_CRC + headersAndPayload.size() + 4;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package software.amazon.eventstream;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;

/**
Expand All @@ -15,9 +18,27 @@ public final class MessageDecoder {
private static final int INITIAL_BUFFER_SIZE = 2048 * 1024;

private final Consumer<Message> messageConsumer;
private List<Message> bufferedOutput;
private ByteBuffer buf;
private Prelude currentPrelude;

/**
* Creates a {@code MessageDecoder} instance that will buffer messages internally as they are decoded. Decoded
* messages can be obtained by calling {@link #getDecodedMessages()}.
*/
public MessageDecoder() {
this.messageConsumer = message -> this.bufferedOutput.add(message);
this.bufferedOutput = new ArrayList<>();
this.buf = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
}

/**
* Creates a {@code MessageDecoder} instance that will publish messages incrementally to the supplied {@code
* messageConsumer} as they are decoded. The resulting instance does not support the {@link #getDecodedMessages()}
* operation, and will throw an exception if it is invoked.
*
* @param messageConsumer a function that consumes {@link Message} instances
*/
public MessageDecoder(Consumer<Message> messageConsumer) {
this(messageConsumer, INITIAL_BUFFER_SIZE);
}
Expand All @@ -28,23 +49,51 @@ public MessageDecoder(Consumer<Message> messageConsumer) {
MessageDecoder(Consumer<Message> messageConsumer, int initialBufferSize) {
this.messageConsumer = messageConsumer;
this.buf = ByteBuffer.allocate(initialBufferSize);
this.bufferedOutput = null;
}

/**
* Returns {@link Message} instances that have been decoded since this method was last invoked. Note that this
* method is only supported if this decoder was not configured to use a custom message consumer.
*
* @return all messages decoded since the last invocation of this method
*/
public List<Message> getDecodedMessages() {
if (bufferedOutput == null) {
throw new IllegalStateException("");
}
List<Message> ret = bufferedOutput;
bufferedOutput = new ArrayList<>();
return Collections.unmodifiableList(ret);
}

public void feed(byte[] bytes) {
feed(bytes, 0, bytes.length);
feed(ByteBuffer.wrap(bytes));
}

public void feed(byte[] bytes, int offset, int length) {
int bytesToRead = Math.min(bytes.length, length + offset);
int bytesConsumed = offset;
feed(ByteBuffer.wrap(bytes, offset, length));
}

/**
* Feed the contents of the given {@link ByteBuffer} into this decoder. Messages will be incrementally decoded and
* buffered or published to the message consumer (depending on configuration).
*
* @param byteBuffer a {@link ByteBuffer} whose entire contents will be read into the decoder's internal buffer
* @return this {@code MessageDecoder} instance
*/
public MessageDecoder feed(ByteBuffer byteBuffer) {
int bytesToRead = byteBuffer.remaining();
int bytesConsumed = 0;
while (bytesConsumed < bytesToRead) {
ByteBuffer readView = updateReadView();
if (currentPrelude == null) {
// Put only 15 bytes into buffer and compute prelude.
int numBytesToWrite = Math.min(15 - readView.remaining(),
bytesToRead - bytesConsumed);
bytesToRead - bytesConsumed);

feedBuf(byteBuffer, numBytesToWrite);

buf.put(bytes, bytesConsumed, numBytesToWrite);
bytesConsumed += numBytesToWrite;
readView = updateReadView();

Expand All @@ -63,9 +112,9 @@ public void feed(byte[] bytes, int offset, int length) {
if (currentPrelude != null) {
// Only write up to what we need to decode the next message
int numBytesToWrite = Math.min(currentPrelude.getTotalLength() - readView.remaining(),
bytesToRead - bytesConsumed);
bytesToRead - bytesConsumed);

buf.put(bytes, bytesConsumed, numBytesToWrite);
feedBuf(byteBuffer, numBytesToWrite);
bytesConsumed += numBytesToWrite;
readView = updateReadView();

Expand All @@ -77,6 +126,13 @@ public void feed(byte[] bytes, int offset, int length) {
}
}
}

return this;
}

private void feedBuf(ByteBuffer byteBuffer, int numBytesToWrite) {
buf.put((ByteBuffer) byteBuffer.duplicate().limit(byteBuffer.position() + numBytesToWrite));
byteBuffer.position(byteBuffer.position() + numBytesToWrite);
}

private ByteBuffer updateReadView() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
import java.util.stream.IntStream;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
public class MessageDecoderTest {
long SEED = 8912374098123423L;

Expand Down Expand Up @@ -48,7 +46,7 @@ public void testDecoder() throws Exception {
public void testDecoder_WithOffset() throws Exception {
TestUtils utils = new TestUtils(SEED);
Random rand = new Random(SEED);
List<Message> expected = IntStream.range(0, 100_000)
List<Message> expected = IntStream.range(0, 10_000)
.mapToObj(x -> utils.randomMessage())
.collect(Collectors.toList());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Expand All @@ -61,6 +59,9 @@ public void testDecoder_WithOffset() throws Exception {
MessageDecoder decoder = new MessageDecoder(actual::add);
while (toRead > 0) {
int length = rand.nextInt(100);
if (read + length > data.length) {
length = data.length - read;
}
decoder.feed(data, read, length);
read += length;
toRead -= length;
Expand Down Expand Up @@ -207,4 +208,4 @@ public void multipleLargeMessages_GrowsBufferAsNeeded() {
assertThat(decoder.currentBufferSize(), greaterThan(9001));
}

}
}