Skip to content

Commit fa5f179

Browse files
authored
Tuple Frames (#658)
* added logic to handle different websocket frames Signed-off-by: Robert Roeser <[email protected]> * tests and formatting Signed-off-by: Robert Roeser <[email protected]> * disable debug logging Signed-off-by: Robert Roeser <[email protected]> * fixes tuple frames to properly span internal byte arrays Signed-off-by: Robert Roeser <[email protected]> * fixes tuple frames to properly span internal byte arrays Signed-off-by: Robert Roeser <[email protected]> * update tuple buffer Signed-off-by: Robert Roeser <[email protected]> * tests and formatting Signed-off-by: Robert Roeser <[email protected]> * disable debug logging Signed-off-by: Robert Roeser <[email protected]> * fixes tuple frames to properly span internal byte arrays Signed-off-by: Robert Roeser <[email protected]> * fixes tuple frames to properly span internal byte arrays Signed-off-by: Robert Roeser <[email protected]> * update tuple buffer Signed-off-by: Robert Roeser <[email protected]>
1 parent d88767c commit fa5f179

File tree

8 files changed

+131
-46
lines changed

8 files changed

+131
-46
lines changed

rsocket-core/src/main/java/io/rsocket/buffer/AbstractTupleByteBuf.java

Lines changed: 84 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -54,93 +54,152 @@ public ByteBuffer nioBuffer(int index, int length) {
5454
}
5555

5656
@Override
57-
protected byte _getByte(int index) {
57+
public ByteBuffer[] nioBuffers(int index, int length) {
58+
checkIndex(index, length);
59+
if (length == 0) {
60+
return new ByteBuffer[] {EMPTY_NIO_BUFFER};
61+
}
62+
return _nioBuffers(index, length);
63+
}
64+
65+
protected abstract ByteBuffer[] _nioBuffers(int index, int length);
66+
67+
@Override
68+
protected byte _getByte(final int index) {
5869
long ri = calculateRelativeIndex(index);
5970
ByteBuf byteBuf = getPart(index);
6071

61-
index = (int) (ri & Integer.MAX_VALUE);
72+
int calculatedIndex = (int) (ri & Integer.MAX_VALUE);
6273

63-
return byteBuf.getByte(index);
74+
return byteBuf.getByte(calculatedIndex);
6475
}
6576

6677
@Override
67-
protected short _getShort(int index) {
78+
protected short _getShort(final int index) {
6879
long ri = calculateRelativeIndex(index);
6980
ByteBuf byteBuf = getPart(index);
7081

71-
index = (int) (ri & Integer.MAX_VALUE);
82+
final int calculatedIndex = (int) (ri & Integer.MAX_VALUE);
7283

73-
return byteBuf.getShort(index);
84+
if (calculatedIndex + Short.BYTES <= byteBuf.writerIndex()) {
85+
return byteBuf.getShort(calculatedIndex);
86+
} else if (order() == ByteOrder.BIG_ENDIAN) {
87+
return (short) ((_getByte(index) & 0xff) << 8 | _getByte(index + 1) & 0xff);
88+
} else {
89+
return (short) (_getByte(index) & 0xff | (_getByte(index + 1) & 0xff) << 8);
90+
}
7491
}
7592

7693
@Override
7794
protected short _getShortLE(int index) {
7895
long ri = calculateRelativeIndex(index);
7996
ByteBuf byteBuf = getPart(index);
8097

81-
index = (int) (ri & Integer.MAX_VALUE);
98+
final int calculatedIndex = (int) (ri & Integer.MAX_VALUE);
8299

83-
return byteBuf.getShortLE(index);
100+
if (calculatedIndex + Short.BYTES <= byteBuf.writerIndex()) {
101+
return byteBuf.getShortLE(calculatedIndex);
102+
} else if (order() == ByteOrder.BIG_ENDIAN) {
103+
return (short) (_getByte(index) & 0xff | (_getByte(index + 1) & 0xff) << 8);
104+
} else {
105+
return (short) ((_getByte(index) & 0xff) << 8 | _getByte(index + 1) & 0xff);
106+
}
84107
}
85108

86109
@Override
87-
protected int _getUnsignedMedium(int index) {
110+
protected int _getUnsignedMedium(final int index) {
88111
long ri = calculateRelativeIndex(index);
89112
ByteBuf byteBuf = getPart(index);
90113

91-
index = (int) (ri & Integer.MAX_VALUE);
114+
int calculatedIndex = (int) (ri & Integer.MAX_VALUE);
92115

93-
return byteBuf.getUnsignedMedium(index);
116+
if (calculatedIndex + 3 <= byteBuf.writerIndex()) {
117+
return byteBuf.getUnsignedMedium(calculatedIndex);
118+
} else if (order() == ByteOrder.BIG_ENDIAN) {
119+
return (_getShort(index) & 0xffff) << 8 | _getByte(index + 2) & 0xff;
120+
} else {
121+
return _getShort(index) & 0xFFFF | (_getByte(index + 2) & 0xFF) << 16;
122+
}
94123
}
95124

96125
@Override
97126
protected int _getUnsignedMediumLE(int index) {
98127
long ri = calculateRelativeIndex(index);
99128
ByteBuf byteBuf = getPart(index);
100129

101-
index = (int) (ri & Integer.MAX_VALUE);
130+
int calculatedIndex = (int) (ri & Integer.MAX_VALUE);
102131

103-
return byteBuf.getUnsignedMediumLE(index);
132+
if (calculatedIndex + 3 <= byteBuf.writerIndex()) {
133+
return byteBuf.getUnsignedMediumLE(calculatedIndex);
134+
} else if (order() == ByteOrder.BIG_ENDIAN) {
135+
return _getShortLE(index) & 0xffff | (_getByte(index + 2) & 0xff) << 16;
136+
} else {
137+
return (_getShortLE(index) & 0xffff) << 8 | _getByte(index + 2) & 0xff;
138+
}
104139
}
105140

106141
@Override
107-
protected int _getInt(int index) {
142+
protected int _getInt(final int index) {
108143
long ri = calculateRelativeIndex(index);
109144
ByteBuf byteBuf = getPart(index);
110145

111-
index = (int) (ri & Integer.MAX_VALUE);
146+
int calculatedIndex = (int) (ri & Integer.MAX_VALUE);
112147

113-
return byteBuf.getInt(index);
148+
if (calculatedIndex + Integer.BYTES <= byteBuf.writerIndex()) {
149+
return byteBuf.getInt(calculatedIndex);
150+
} else if (order() == ByteOrder.BIG_ENDIAN) {
151+
return (_getShort(index) & 0xffff) << 16 | _getShort(index + 2) & 0xffff;
152+
} else {
153+
return _getShort(index) & 0xFFFF | (_getShort(index + 2) & 0xFFFF) << 16;
154+
}
114155
}
115156

116157
@Override
117-
protected int _getIntLE(int index) {
158+
protected int _getIntLE(final int index) {
118159
long ri = calculateRelativeIndex(index);
119160
ByteBuf byteBuf = getPart(index);
120161

121-
index = (int) (ri & Integer.MAX_VALUE);
162+
int calculatedIndex = (int) (ri & Integer.MAX_VALUE);
122163

123-
return byteBuf.getIntLE(index);
164+
if (calculatedIndex + Integer.BYTES <= byteBuf.writerIndex()) {
165+
return byteBuf.getIntLE(calculatedIndex);
166+
} else if (order() == ByteOrder.BIG_ENDIAN) {
167+
return _getShortLE(index) & 0xffff | (_getShortLE(index + 2) & 0xffff) << 16;
168+
} else {
169+
return (_getShortLE(index) & 0xffff) << 16 | _getShortLE(index + 2) & 0xffff;
170+
}
124171
}
125172

126173
@Override
127-
protected long _getLong(int index) {
174+
protected long _getLong(final int index) {
128175
long ri = calculateRelativeIndex(index);
129176
ByteBuf byteBuf = getPart(index);
130177

131-
index = (int) (ri & Integer.MAX_VALUE);
178+
int calculatedIndex = (int) (ri & Integer.MAX_VALUE);
132179

133-
return byteBuf.getLong(index);
180+
if (calculatedIndex + Long.BYTES <= byteBuf.writerIndex()) {
181+
return byteBuf.getLong(calculatedIndex);
182+
} else if (order() == ByteOrder.BIG_ENDIAN) {
183+
return (_getInt(index) & 0xffffffffL) << 32 | _getInt(index + 4) & 0xffffffffL;
184+
} else {
185+
return _getInt(index) & 0xFFFFFFFFL | (_getInt(index + 4) & 0xFFFFFFFFL) << 32;
186+
}
134187
}
135188

136189
@Override
137-
protected long _getLongLE(int index) {
190+
protected long _getLongLE(final int index) {
138191
long ri = calculateRelativeIndex(index);
139192
ByteBuf byteBuf = getPart(index);
140193

141-
index = (int) (ri & Integer.MAX_VALUE);
194+
int calculatedIndex = (int) (ri & Integer.MAX_VALUE);
142195

143-
return byteBuf.getLongLE(index);
196+
if (calculatedIndex + Long.BYTES <= byteBuf.writerIndex()) {
197+
return byteBuf.getLongLE(calculatedIndex);
198+
} else if (order() == ByteOrder.BIG_ENDIAN) {
199+
return (_getInt(index) & 0xffffffffL) << 32 | _getInt(index + 4) & 0xffffffffL;
200+
} else {
201+
return _getInt(index) & 0xFFFFFFFFL | (_getInt(index + 4) & 0xFFFFFFFFL) << 32;
202+
}
144203
}
145204

146205
@Override
@@ -518,11 +577,6 @@ public long memoryAddress() {
518577
throw new UnsupportedOperationException();
519578
}
520579

521-
@Override
522-
public int compareTo(ByteBuf buffer) {
523-
return 0;
524-
}
525-
526580
@Override
527581
protected void _setByte(int index, int value) {}
528582

rsocket-core/src/main/java/io/rsocket/buffer/Tuple2ByteBuf.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,7 @@ public ByteBuffer nioBuffer() {
104104
}
105105

106106
@Override
107-
public ByteBuffer[] nioBuffers(int index, int length) {
108-
if (length == 0) {
109-
return new ByteBuffer[] {EMPTY_NIO_BUFFER};
110-
}
111-
107+
public ByteBuffer[] _nioBuffers(int index, int length) {
112108
long ri = calculateRelativeIndex(index);
113109
index = (int) (ri & Integer.MAX_VALUE);
114110
switch ((int) ((ri & MASK) >>> 32L)) {

rsocket-core/src/main/java/io/rsocket/buffer/Tuple3ByteBuf.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,7 @@ public ByteBuffer nioBuffer() {
122122
}
123123

124124
@Override
125-
public ByteBuffer[] nioBuffers(int index, int length) {
126-
if (length == 0) {
127-
return new ByteBuffer[] {EMPTY_NIO_BUFFER};
128-
}
129-
125+
public ByteBuffer[] _nioBuffers(int index, int length) {
130126
long ri = calculateRelativeIndex(index);
131127
index = (int) (ri & Integer.MAX_VALUE);
132128
switch ((int) ((ri & MASK) >>> 32L)) {

rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.netty.buffer.ByteBuf;
44
import io.netty.buffer.ByteBufAllocator;
55
import io.netty.buffer.Unpooled;
6+
import io.rsocket.buffer.TupleByteBuf;
67

78
class DataAndMetadataFlyweight {
89
public static final int FRAME_LENGTH_MASK = 0xFFFFFF;
@@ -32,19 +33,19 @@ private static int decodeLength(final ByteBuf byteBuf) {
3233

3334
static ByteBuf encodeOnlyMetadata(
3435
ByteBufAllocator allocator, final ByteBuf header, ByteBuf metadata) {
35-
return allocator.compositeBuffer().addComponents(true, header, metadata);
36+
return TupleByteBuf.of(allocator, header, metadata);
3637
}
3738

3839
static ByteBuf encodeOnlyData(ByteBufAllocator allocator, final ByteBuf header, ByteBuf data) {
39-
return allocator.compositeBuffer().addComponents(true, header, data);
40+
return TupleByteBuf.of(allocator, header, data);
4041
}
4142

4243
static ByteBuf encode(
4344
ByteBufAllocator allocator, final ByteBuf header, ByteBuf metadata, ByteBuf data) {
4445

4546
int length = metadata.readableBytes();
4647
encodeLength(header, length);
47-
return allocator.compositeBuffer().addComponents(true, header, metadata, data);
48+
return TupleByteBuf.of(allocator, header, metadata, data);
4849
}
4950

5051
static ByteBuf metadataWithoutMarking(ByteBuf byteBuf, boolean hasMetadata) {

rsocket-core/src/main/java/io/rsocket/frame/FrameLengthFlyweight.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.netty.buffer.ByteBuf;
44
import io.netty.buffer.ByteBufAllocator;
5+
import io.rsocket.buffer.TupleByteBuf;
56

67
/**
78
* Some transports like TCP aren't framed, and require a length. This is used by DuplexConnections
@@ -34,7 +35,7 @@ private static int decodeLength(final ByteBuf byteBuf) {
3435
public static ByteBuf encode(ByteBufAllocator allocator, int length, ByteBuf frame) {
3536
ByteBuf buffer = allocator.buffer();
3637
encodeLength(buffer, length);
37-
return allocator.compositeBuffer().addComponents(true, buffer, frame);
38+
return TupleByteBuf.of(allocator, buffer, frame);
3839
}
3940

4041
public static int length(ByteBuf byteBuf) {
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package io.rsocket.buffer;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.buffer.ByteBufAllocator;
5+
import io.netty.buffer.Unpooled;
6+
import java.util.concurrent.ThreadLocalRandom;
7+
import org.junit.jupiter.api.Test;
8+
9+
class Tuple3ByteBufTest {
10+
@Test
11+
void testTupleBufferGet() {
12+
ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
13+
ByteBuf one = allocator.directBuffer(9);
14+
15+
byte[] bytes = new byte[9];
16+
ThreadLocalRandom.current().nextBytes(bytes);
17+
one.writeBytes(bytes);
18+
19+
bytes = new byte[8];
20+
ThreadLocalRandom.current().nextBytes(bytes);
21+
ByteBuf two = Unpooled.wrappedBuffer(bytes);
22+
23+
bytes = new byte[9];
24+
ThreadLocalRandom.current().nextBytes(bytes);
25+
ByteBuf three = Unpooled.wrappedBuffer(bytes);
26+
27+
ByteBuf tuple = TupleByteBuf.of(one, two, three);
28+
29+
int anInt = tuple.getInt(16);
30+
31+
long aLong = tuple.getLong(15);
32+
33+
short aShort = tuple.getShort(8);
34+
35+
int medium = tuple.getMedium(8);
36+
}
37+
}

rsocket-transport-netty/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ if (osdetector.classifier in ["linux-x86_64"] || ["osx-x86_64"] || ["windows-x86
3030
dependencies {
3131
api project(':rsocket-core')
3232
api 'io.projectreactor.netty:reactor-netty'
33-
implementation 'org.slf4j:slf4j-api'
33+
api 'org.slf4j:slf4j-api'
3434

3535
compileOnly 'com.google.code.findbugs:jsr305'
3636

rsocket-transport-netty/src/test/resources/logback-test.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
<logger name="io.rsocket.transport.netty" level="INFO"/>
2727
<logger name="io.rsocket.FrameLogger" level="INFO"/>
28-
<logger name="io.rsocket.fragmentation.FragmentationDuplexConnection" level="DEBUG"/>
28+
<logger name="io.rsocket.fragmentation.FragmentationDuplexConnection" level="INFO"/>
2929

3030
<root level="INFO">
3131
<appender-ref ref="STDOUT"/>

0 commit comments

Comments
 (0)