1
1
package software .amazon .eventstream ;
2
2
3
3
import java .nio .ByteBuffer ;
4
+ import java .util .ArrayList ;
5
+ import java .util .Collections ;
6
+ import java .util .List ;
4
7
import java .util .function .Consumer ;
5
8
6
9
/**
@@ -15,9 +18,27 @@ public final class MessageDecoder {
15
18
private static final int INITIAL_BUFFER_SIZE = 2048 * 1024 ;
16
19
17
20
private final Consumer <Message > messageConsumer ;
21
+ private List <Message > bufferedOutput ;
18
22
private ByteBuffer buf ;
19
23
private Prelude currentPrelude ;
20
24
25
+ /**
26
+ * Creates a {@code MessageDecoder} instance that will buffer messages internally as they are decoded. Decoded
27
+ * messages can be obtained by calling {@link #getDecodedMessages()}.
28
+ */
29
+ public MessageDecoder () {
30
+ this .messageConsumer = message -> this .bufferedOutput .add (message );
31
+ this .bufferedOutput = new ArrayList <>();
32
+ this .buf = ByteBuffer .allocate (INITIAL_BUFFER_SIZE );
33
+ }
34
+
35
+ /**
36
+ * Creates a {@code MessageDecoder} instance that will publish messages incrementally to the supplied {@code
37
+ * messageConsumer} as they are decoded. The resulting instance does not support the {@link #getDecodedMessages()}
38
+ * operation, and will throw an exception if it is invoked.
39
+ *
40
+ * @param messageConsumer a function that consumes {@link Message} instances
41
+ */
21
42
public MessageDecoder (Consumer <Message > messageConsumer ) {
22
43
this (messageConsumer , INITIAL_BUFFER_SIZE );
23
44
}
@@ -28,23 +49,51 @@ public MessageDecoder(Consumer<Message> messageConsumer) {
28
49
MessageDecoder (Consumer <Message > messageConsumer , int initialBufferSize ) {
29
50
this .messageConsumer = messageConsumer ;
30
51
this .buf = ByteBuffer .allocate (initialBufferSize );
52
+ this .bufferedOutput = null ;
53
+ }
54
+
55
+ /**
56
+ * Returns {@link Message} instances that have been decoded since this method was last invoked. Note that this
57
+ * method is only supported if this decoder was not configured to use a custom message consumer.
58
+ *
59
+ * @return all messages decoded since the last invocation of this method
60
+ */
61
+ public List <Message > getDecodedMessages () {
62
+ if (bufferedOutput == null ) {
63
+ throw new IllegalStateException ("" );
64
+ }
65
+ List <Message > ret = bufferedOutput ;
66
+ bufferedOutput = new ArrayList <>();
67
+ return Collections .unmodifiableList (ret );
31
68
}
32
69
33
70
public void feed (byte [] bytes ) {
34
- feed (bytes , 0 , bytes . length );
71
+ feed (ByteBuffer . wrap ( bytes ) );
35
72
}
36
73
37
74
public void feed (byte [] bytes , int offset , int length ) {
38
- int bytesToRead = Math .min (bytes .length , length + offset );
39
- int bytesConsumed = offset ;
75
+ feed (ByteBuffer .wrap (bytes , offset , length ));
76
+ }
77
+
78
+ /**
79
+ * Feed the contents of the given {@link ByteBuffer} into this decoder. Messages will be incrementally decoded and
80
+ * buffered or published to the message consumer (depending on configuration).
81
+ *
82
+ * @param byteBuffer a {@link ByteBuffer} whose entire contents will be read into the decoder's internal buffer
83
+ * @return this {@code MessageDecoder} instance
84
+ */
85
+ public MessageDecoder feed (ByteBuffer byteBuffer ) {
86
+ int bytesToRead = byteBuffer .remaining ();
87
+ int bytesConsumed = 0 ;
40
88
while (bytesConsumed < bytesToRead ) {
41
89
ByteBuffer readView = updateReadView ();
42
90
if (currentPrelude == null ) {
43
91
// Put only 15 bytes into buffer and compute prelude.
44
92
int numBytesToWrite = Math .min (15 - readView .remaining (),
45
- bytesToRead - bytesConsumed );
93
+ bytesToRead - bytesConsumed );
94
+
95
+ feedBuf (byteBuffer , numBytesToWrite );
46
96
47
- buf .put (bytes , bytesConsumed , numBytesToWrite );
48
97
bytesConsumed += numBytesToWrite ;
49
98
readView = updateReadView ();
50
99
@@ -63,9 +112,9 @@ public void feed(byte[] bytes, int offset, int length) {
63
112
if (currentPrelude != null ) {
64
113
// Only write up to what we need to decode the next message
65
114
int numBytesToWrite = Math .min (currentPrelude .getTotalLength () - readView .remaining (),
66
- bytesToRead - bytesConsumed );
115
+ bytesToRead - bytesConsumed );
67
116
68
- buf . put ( bytes , bytesConsumed , numBytesToWrite );
117
+ feedBuf ( byteBuffer , numBytesToWrite );
69
118
bytesConsumed += numBytesToWrite ;
70
119
readView = updateReadView ();
71
120
@@ -77,6 +126,13 @@ public void feed(byte[] bytes, int offset, int length) {
77
126
}
78
127
}
79
128
}
129
+
130
+ return this ;
131
+ }
132
+
133
+ private void feedBuf (ByteBuffer byteBuffer , int numBytesToWrite ) {
134
+ buf .put ((ByteBuffer ) byteBuffer .duplicate ().limit (byteBuffer .position () + numBytesToWrite ));
135
+ byteBuffer .position (byteBuffer .position () + numBytesToWrite );
80
136
}
81
137
82
138
private ByteBuffer updateReadView () {
0 commit comments