18
18
19
19
import java .io .IOException ;
20
20
import java .lang .reflect .Method ;
21
+ import java .nio .ByteBuffer ;
21
22
import java .util .ArrayList ;
22
23
import java .util .List ;
23
24
import java .util .Map ;
42
43
import org .springframework .util .MimeType ;
43
44
44
45
/**
45
- * A {@code Decoder} that reads {@link com.google.protobuf.Message}s
46
- * using <a href="https://developers.google.com/protocol-buffers/">Google Protocol Buffers</a>.
46
+ * A {@code Decoder} that reads {@link com.google.protobuf.Message}s using
47
+ * <a href="https://developers.google.com/protocol-buffers/">Google Protocol Buffers</a>.
47
48
*
48
49
* <p>Flux deserialized via
49
50
* {@link #decode(Publisher, ResolvableType, MimeType, Map)} are expected to use
50
- * <a href="https://developers.google.com/protocol-buffers/docs/techniques?hl=en#streaming">delimited Protobuf messages</a>
51
- * with the size of each message specified before the message itself. Single values deserialized
52
- * via {@link #decodeToMono(Publisher, ResolvableType, MimeType, Map)} are expected to use
53
- * regular Protobuf message format (without the size prepended before the message).
51
+ * <a href="https://developers.google.com/protocol-buffers/docs/techniques?hl=en#streaming">
52
+ * delimited Protobuf messages</a> with the size of each message specified before
53
+ * the message itself. Single values deserialized via
54
+ * {@link #decodeToMono(Publisher, ResolvableType, MimeType, Map)} are expected
55
+ * to use regular Protobuf message format (without the size prepended before
56
+ * the message).
54
57
*
55
- * <p>Notice that default instance of Protobuf message produces empty byte array, so
56
- * {@code Mono.just(Msg.getDefaultInstance())} sent over the network will be deserialized
57
- * as an empty {@link Mono}.
58
+ * <p>Notice that default instance of Protobuf message produces empty byte
59
+ * array, so {@code Mono.just(Msg.getDefaultInstance())} sent over the network
60
+ * will be deserialized as an empty {@link Mono}.
58
61
*
59
- * <p>To generate {@code Message} Java classes, you need to install the {@code protoc} binary.
62
+ * <p>To generate {@code Message} Java classes, you need to install the
63
+ * {@code protoc} binary.
60
64
*
61
65
* <p>This decoder requires Protobuf 3 or higher, and supports
62
- * {@code "application/x-protobuf"} and {@code "application/octet-stream"} with the official
63
- * {@code "com.google.protobuf:protobuf-java"} library.
66
+ * {@code "application/x-protobuf"} and {@code "application/octet-stream"} with
67
+ * the official {@code "com.google.protobuf:protobuf-java"} library.
64
68
*
65
69
* @author Sébastien Deleuze
66
70
* @since 5.1
67
71
* @see ProtobufEncoder
68
72
*/
69
73
public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder <Message > {
70
74
71
- /**
72
- * The default max size for aggregating messages.
73
- */
75
+ /** The default max size for aggregating messages. */
74
76
protected static final int DEFAULT_MESSAGE_MAX_SIZE = 64 * 1024 ;
75
77
76
78
private static final ConcurrentMap <Class <?>, Method > methodCache = new ConcurrentReferenceHashMap <>();
@@ -123,15 +125,17 @@ public Mono<Message> decodeToMono(Publisher<DataBuffer> inputStream, ResolvableT
123
125
return DataBufferUtils .join (inputStream ).map (dataBuffer -> {
124
126
try {
125
127
Message .Builder builder = getMessageBuilder (elementType .toClass ());
126
- builder .mergeFrom (CodedInputStream .newInstance (dataBuffer .asByteBuffer ()), this .extensionRegistry );
128
+ ByteBuffer buffer = dataBuffer .asByteBuffer ();
129
+ builder .mergeFrom (CodedInputStream .newInstance (buffer ), this .extensionRegistry );
127
130
return builder .build ();
128
131
}
129
132
catch (IOException ex ) {
130
133
throw new DecodingException ("I/O error while parsing input stream" , ex );
131
134
}
132
135
catch (Exception ex ) {
133
136
throw new DecodingException ("Could not read Protobuf message: " + ex .getMessage (), ex );
134
- } finally {
137
+ }
138
+ finally {
135
139
DataBufferUtils .release (dataBuffer );
136
140
}
137
141
}
@@ -168,11 +172,13 @@ private class MessageDecoderFunction implements Function<DataBuffer, Iterable<?
168
172
169
173
private int messageBytesToRead ;
170
174
175
+
171
176
public MessageDecoderFunction (ResolvableType elementType , int maxMessageSize ) {
172
177
this .elementType = elementType ;
173
178
this .maxMessageSize = maxMessageSize ;
174
179
}
175
180
181
+
176
182
@ Override
177
183
public Iterable <? extends Message > apply (DataBuffer input ) {
178
184
try {
@@ -189,8 +195,9 @@ public Iterable<? extends Message> apply(DataBuffer input) {
189
195
this .messageBytesToRead = CodedInputStream .readRawVarint32 (firstByte , input .asInputStream ());
190
196
if (this .messageBytesToRead > this .maxMessageSize ) {
191
197
throw new DecodingException (
192
- "The number of bytes to read parsed in the incoming stream (" +
193
- this .messageBytesToRead + ") exceeds the configured limit (" + this .maxMessageSize + ")" );
198
+ "The number of bytes to read from the incoming stream " +
199
+ "(" + this .messageBytesToRead + ") exceeds " +
200
+ "the configured limit (" + this .maxMessageSize + ")" );
194
201
}
195
202
this .output = input .factory ().allocateBuffer (this .messageBytesToRead );
196
203
}
@@ -206,7 +213,8 @@ public Iterable<? extends Message> apply(DataBuffer input) {
206
213
207
214
if (this .messageBytesToRead == 0 ) {
208
215
Message .Builder builder = getMessageBuilder (this .elementType .toClass ());
209
- builder .mergeFrom (CodedInputStream .newInstance (this .output .asByteBuffer ()), extensionRegistry );
216
+ ByteBuffer buffer = this .output .asByteBuffer ();
217
+ builder .mergeFrom (CodedInputStream .newInstance (buffer ), extensionRegistry );
210
218
messages .add (builder .build ());
211
219
DataBufferUtils .release (this .output );
212
220
this .output = null ;
0 commit comments