Skip to content

cleanup of some frame pooling #54

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/io/reactivesocket/Frame.java
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ public static Frame from(
) {
final int code = ErrorFrameFlyweight.errorCodeFromException(throwable);
final Frame frame = POOL.acquireFrame(
ErrorFrameFlyweight.computeFrameLength(data.capacity(), metadata.capacity()));
ErrorFrameFlyweight.computeFrameLength(metadata.capacity(), data.capacity()));

frame.length = ErrorFrameFlyweight.encode(
frame.directBuffer, frame.offset, streamId, code, metadata, data);
Expand Down Expand Up @@ -576,6 +576,6 @@ public String toString() {
} catch (Exception e) {
e.printStackTrace();
}
return "Frame[" + offset + "] => Stream ID: " + streamId + " Type: " + type + " Payload Data: " + payload;
return "Frame[" + offset + "] => Stream ID: " + streamId + " Type: " + type + " Payload: " + payload;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public class ErrorFrameFlyweight {
private static final int PAYLOAD_OFFSET = ERROR_CODE_FIELD_OFFSET + BitUtil.SIZE_OF_INT;

public static int computeFrameLength(
final int dataLength,
final int metadataLength
final int metadataLength,
final int dataLength
) {
int length = FrameHeaderFlyweight.computeFrameHeaderLength(
FrameType.ERROR, metadataLength, dataLength);
Expand All @@ -56,7 +56,7 @@ public static int encode(
final ByteBuffer metadata,
final ByteBuffer data
) {
final int frameLength = computeFrameLength(data.capacity(), metadata.capacity());
final int frameLength = computeFrameLength(metadata.capacity(), data.capacity());

int length = FrameHeaderFlyweight.encodeFrameHeader(
mutableDirectBuffer, offset, frameLength, 0, FrameType.ERROR, streamId);
Expand Down Expand Up @@ -98,6 +98,6 @@ public static int errorCode(final DirectBuffer directBuffer, final int offset) {
}

public static int payloadOffset(final DirectBuffer directBuffer, final int offset) {
return offset + PAYLOAD_OFFSET;
return offset + FrameHeaderFlyweight.FRAME_HEADER_LENGTH + BitUtil.SIZE_OF_INT;
}
}
30 changes: 21 additions & 9 deletions src/main/java/io/reactivesocket/internal/ThreadSafeFramePool.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public MutableDirectBuffer acquireMutableDirectBuffer(ByteBuffer byteBuffer)
public MutableDirectBuffer acquireMutableDirectBuffer(int size)
{
UnsafeBuffer directBuffer = (UnsafeBuffer)pollMutableDirectBuffer();
if (null == directBuffer || directBuffer.byteBuffer().capacity() < size)
if (null == directBuffer || directBuffer.capacity() < size)
{
directBuffer = new UnsafeBuffer(ByteBuffer.allocate(size));
}
Expand All @@ -87,23 +87,35 @@ public MutableDirectBuffer acquireMutableDirectBuffer(int size)
return directBuffer;
}

public synchronized void release(Frame frame)
public void release(Frame frame)
{
FRAME_QUEUE.offer(frame);
synchronized (FRAME_QUEUE)
{
FRAME_QUEUE.offer(frame);
}
}

public synchronized void release(MutableDirectBuffer mutableDirectBuffer)
public void release(MutableDirectBuffer mutableDirectBuffer)
{
DIRECTBUFFER_QUEUE.offer(mutableDirectBuffer);
synchronized (DIRECTBUFFER_QUEUE)
{
DIRECTBUFFER_QUEUE.offer(mutableDirectBuffer);
}
}

private synchronized Frame pollFrame()
private Frame pollFrame()
{
return FRAME_QUEUE.poll();
synchronized (FRAME_QUEUE)
{
return FRAME_QUEUE.poll();
}
}

private synchronized MutableDirectBuffer pollMutableDirectBuffer()
private MutableDirectBuffer pollMutableDirectBuffer()
{
return DIRECTBUFFER_QUEUE.poll();
synchronized (DIRECTBUFFER_QUEUE)
{
return DIRECTBUFFER_QUEUE.poll();
}
}
}
4 changes: 2 additions & 2 deletions src/test/java/io/reactivesocket/TestConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ public void connectToServerConnection(TestConnection serverConnection, boolean l
// client to server
write.add(f -> {
serverThread.schedule(() -> {
serverConnection.toInput.send(f);
serverConnection.toInput.send(f);
});
});
// server to client
serverConnection.write.add(f -> {
clientThread.schedule(() -> {
toInput.send(f);
toInput.send(f);
});
});
}
Expand Down