Skip to content

Commit f1afcfb

Browse files
committed
cleanup of some frame pooling
1 parent 162c506 commit f1afcfb

File tree

4 files changed

+29
-17
lines changed

4 files changed

+29
-17
lines changed

src/main/java/io/reactivesocket/Frame.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ public static Frame from(
317317
) {
318318
final int code = ErrorFrameFlyweight.errorCodeFromException(throwable);
319319
final Frame frame = POOL.acquireFrame(
320-
ErrorFrameFlyweight.computeFrameLength(data.capacity(), metadata.capacity()));
320+
ErrorFrameFlyweight.computeFrameLength(metadata.capacity(), data.capacity()));
321321

322322
frame.length = ErrorFrameFlyweight.encode(
323323
frame.directBuffer, frame.offset, streamId, code, metadata, data);
@@ -576,6 +576,6 @@ public String toString() {
576576
} catch (Exception e) {
577577
e.printStackTrace();
578578
}
579-
return "Frame[" + offset + "] => Stream ID: " + streamId + " Type: " + type + " Payload Data: " + payload;
579+
return "Frame[" + offset + "] => Stream ID: " + streamId + " Type: " + type + " Payload: " + payload;
580580
}
581581
}

src/main/java/io/reactivesocket/internal/ErrorFrameFlyweight.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ public class ErrorFrameFlyweight {
4040
private static final int PAYLOAD_OFFSET = ERROR_CODE_FIELD_OFFSET + BitUtil.SIZE_OF_INT;
4141

4242
public static int computeFrameLength(
43-
final int dataLength,
44-
final int metadataLength
43+
final int metadataLength,
44+
final int dataLength
4545
) {
4646
int length = FrameHeaderFlyweight.computeFrameHeaderLength(
4747
FrameType.ERROR, metadataLength, dataLength);
@@ -56,7 +56,7 @@ public static int encode(
5656
final ByteBuffer metadata,
5757
final ByteBuffer data
5858
) {
59-
final int frameLength = computeFrameLength(data.capacity(), metadata.capacity());
59+
final int frameLength = computeFrameLength(metadata.capacity(), data.capacity());
6060

6161
int length = FrameHeaderFlyweight.encodeFrameHeader(
6262
mutableDirectBuffer, offset, frameLength, 0, FrameType.ERROR, streamId);
@@ -98,6 +98,6 @@ public static int errorCode(final DirectBuffer directBuffer, final int offset) {
9898
}
9999

100100
public static int payloadOffset(final DirectBuffer directBuffer, final int offset) {
101-
return offset + PAYLOAD_OFFSET;
101+
return offset + FrameHeaderFlyweight.FRAME_HEADER_LENGTH + BitUtil.SIZE_OF_INT;
102102
}
103103
}

src/main/java/io/reactivesocket/internal/ThreadSafeFramePool.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public MutableDirectBuffer acquireMutableDirectBuffer(ByteBuffer byteBuffer)
7575
public MutableDirectBuffer acquireMutableDirectBuffer(int size)
7676
{
7777
UnsafeBuffer directBuffer = (UnsafeBuffer)pollMutableDirectBuffer();
78-
if (null == directBuffer || directBuffer.byteBuffer().capacity() < size)
78+
if (null == directBuffer || directBuffer.capacity() < size)
7979
{
8080
directBuffer = new UnsafeBuffer(ByteBuffer.allocate(size));
8181
}
@@ -87,23 +87,35 @@ public MutableDirectBuffer acquireMutableDirectBuffer(int size)
8787
return directBuffer;
8888
}
8989

90-
public synchronized void release(Frame frame)
90+
public void release(Frame frame)
9191
{
92-
FRAME_QUEUE.offer(frame);
92+
synchronized (FRAME_QUEUE)
93+
{
94+
FRAME_QUEUE.offer(frame);
95+
}
9396
}
9497

95-
public synchronized void release(MutableDirectBuffer mutableDirectBuffer)
98+
public void release(MutableDirectBuffer mutableDirectBuffer)
9699
{
97-
DIRECTBUFFER_QUEUE.offer(mutableDirectBuffer);
100+
synchronized (DIRECTBUFFER_QUEUE)
101+
{
102+
DIRECTBUFFER_QUEUE.offer(mutableDirectBuffer);
103+
}
98104
}
99105

100-
private synchronized Frame pollFrame()
106+
private Frame pollFrame()
101107
{
102-
return FRAME_QUEUE.poll();
108+
synchronized (FRAME_QUEUE)
109+
{
110+
return FRAME_QUEUE.poll();
111+
}
103112
}
104113

105-
private synchronized MutableDirectBuffer pollMutableDirectBuffer()
114+
private MutableDirectBuffer pollMutableDirectBuffer()
106115
{
107-
return DIRECTBUFFER_QUEUE.poll();
116+
synchronized (DIRECTBUFFER_QUEUE)
117+
{
118+
return DIRECTBUFFER_QUEUE.poll();
119+
}
108120
}
109121
}

src/test/java/io/reactivesocket/TestConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,13 @@ public void connectToServerConnection(TestConnection serverConnection, boolean l
8484
// client to server
8585
write.add(f -> {
8686
serverThread.schedule(() -> {
87-
serverConnection.toInput.send(f);
87+
serverConnection.toInput.send(f);
8888
});
8989
});
9090
// server to client
9191
serverConnection.write.add(f -> {
9292
clientThread.schedule(() -> {
93-
toInput.send(f);
93+
toInput.send(f);
9494
});
9595
});
9696
}

0 commit comments

Comments
 (0)