Skip to content

Commit e5dc7e0

Browse files
authored
Resuable Payload (#209)
__Problem__ `PayloadImpl` caches `data` and `metadata` buffers. This means once those buffers are read, they can not be reused as the pointer of `ByteBuffer` has moved to the end of buffer. This restricts the usage of `PayloadImpl` instance with something like: ```java socket.requestResponse(new PayloadImpl("Hello")).retry(2); ``` For any retry in the above code, data read from the payload instance will be empty. __Modification__ - store buffer `position()` on creation and reset them on every `get()`. - Additional constructor to override this behavior and create a payload for single use. - `PayloadImpl` defaults to reusable payload. __Result__ Possible to retry a request without having a custom `Payload` implementation.
1 parent f419554 commit e5dc7e0

File tree

2 files changed

+94
-1
lines changed

2 files changed

+94
-1
lines changed

reactivesocket-core/src/main/java/io/reactivesocket/util/PayloadImpl.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,23 @@
2323
import java.nio.charset.Charset;
2424

2525
/**
26-
* An implementation of {@link Payload}
26+
* An implementation of {@link Payload}. This implementation is <b>not</b> thread-safe, and hence any method can not be
27+
* invoked concurrently.
28+
*
29+
* <h2>Reusability</h2>
30+
*
31+
* By default, an instance is reusable, i.e. everytime {@link #getData()} or {@link #getMetadata()} is invoked, the
32+
* position of the returned buffer is reset to the start of data in the buffer. For creating an instance for single-use,
33+
* {@link #PayloadImpl(ByteBuffer, ByteBuffer, boolean)} must be used.
2734
*/
2835
public class PayloadImpl implements Payload {
2936

3037
public static final Payload EMPTY = new PayloadImpl(Frame.NULL_BYTEBUFFER, Frame.NULL_BYTEBUFFER);
3138

3239
private final ByteBuffer data;
40+
private final int dataStartPosition;
41+
private final int metadataStartPosition;
42+
private final boolean reusable;
3343
private final ByteBuffer metadata;
3444

3545
public PayloadImpl(String data) {
@@ -61,17 +71,39 @@ public PayloadImpl(ByteBuffer data) {
6171
}
6272

6373
public PayloadImpl(ByteBuffer data, ByteBuffer metadata) {
74+
this(data, metadata, true);
75+
}
76+
77+
/**
78+
* New instance where every invocation of {@link #getMetadata()} and {@link #getData()} will reset the position of
79+
* the buffer to the position when it is created, if and only if, {@code reusable} is set to {@code true}.
80+
*
81+
* @param data Buffer for data.
82+
* @param metadata Buffer for metadata.
83+
* @param reusable {@code true} if the buffer position is to be reset on every invocation of {@link #getData()} and
84+
* {@link #getMetadata()}.
85+
*/
86+
public PayloadImpl(ByteBuffer data, ByteBuffer metadata, boolean reusable) {
6487
this.data = data;
88+
this.reusable = reusable;
6589
this.metadata = null == metadata ? Frame.NULL_BYTEBUFFER : metadata;
90+
dataStartPosition = reusable ? this.data.position() : 0;
91+
metadataStartPosition = reusable ? this.metadata.position() : 0;
6692
}
6793

6894
@Override
6995
public ByteBuffer getData() {
96+
if (reusable) {
97+
data.position(dataStartPosition);
98+
}
7099
return data;
71100
}
72101

73102
@Override
74103
public ByteBuffer getMetadata() {
104+
if (reusable) {
105+
metadata.position(metadataStartPosition);
106+
}
75107
return metadata;
76108
}
77109

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket.util;
15+
16+
import org.junit.Test;
17+
18+
import java.nio.ByteBuffer;
19+
20+
import static java.nio.ByteBuffer.wrap;
21+
import static org.hamcrest.MatcherAssert.*;
22+
import static org.hamcrest.Matchers.*;
23+
24+
public class PayloadImplTest {
25+
26+
public static final String DATA_VAL = "data";
27+
public static final String METADATA_VAL = "Metadata";
28+
29+
@Test
30+
public void testReuse() throws Exception {
31+
PayloadImpl p = new PayloadImpl(DATA_VAL, METADATA_VAL);
32+
assertDataAndMetadata(p);
33+
assertDataAndMetadata(p);
34+
}
35+
@Test
36+
public void testSingleUse() throws Exception {
37+
PayloadImpl p = new PayloadImpl(wrap(DATA_VAL.getBytes()), wrap(METADATA_VAL.getBytes()), false);
38+
assertDataAndMetadata(p);
39+
assertThat("Unexpected data length", p.getData().remaining(), is(0));
40+
assertThat("Unexpected metadata length", p.getMetadata().remaining(), is(0));
41+
}
42+
43+
@Test
44+
public void testReuseWithExternalMark() throws Exception {
45+
PayloadImpl p = new PayloadImpl(DATA_VAL, METADATA_VAL);
46+
assertDataAndMetadata(p);
47+
p.getData().position(2).mark();
48+
assertDataAndMetadata(p);
49+
}
50+
51+
public void assertDataAndMetadata(PayloadImpl p) {
52+
assertThat("Unexpected data.", readValue(p.getData()), equalTo(DATA_VAL));
53+
assertThat("Unexpected metadata.", readValue(p.getMetadata()), equalTo(METADATA_VAL));
54+
}
55+
56+
public String readValue(ByteBuffer buffer) {
57+
byte[] bytes = new byte[buffer.remaining()];
58+
buffer.get(bytes);
59+
return new String(bytes);
60+
}
61+
}

0 commit comments

Comments
 (0)