Skip to content

move to remaining instead of capacity for data and metadata #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions src/main/java/io/reactivesocket/Frame.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public void wrap(final int streamId, final FrameType type, final ByteBuffer data
POOL.release(this.directBuffer);

this.directBuffer =
POOL.acquireMutableDirectBuffer(FrameHeaderFlyweight.computeFrameHeaderLength(type, 0, data.capacity()));
POOL.acquireMutableDirectBuffer(FrameHeaderFlyweight.computeFrameHeaderLength(type, 0, data.remaining()));

this.length = FrameHeaderFlyweight.encode(this.directBuffer, offset, streamId, 0, type, NULL_BYTEBUFFER, data);
}
Expand All @@ -261,7 +261,7 @@ public static Frame from(
final ByteBuffer data = payload.getData();

final Frame frame =
POOL.acquireFrame(SetupFrameFlyweight.computeFrameLength(metadataMimeType, dataMimeType, metadata.capacity(), data.capacity()));
POOL.acquireFrame(SetupFrameFlyweight.computeFrameLength(metadataMimeType, dataMimeType, metadata.remaining(), data.remaining()));

frame.length = SetupFrameFlyweight.encode(
frame.directBuffer, frame.offset, flags, keepaliveInterval, maxLifetime, metadataMimeType, dataMimeType, metadata, data);
Expand Down Expand Up @@ -317,7 +317,7 @@ public static Frame from(
) {
final int code = ErrorFrameFlyweight.errorCodeFromException(throwable);
final Frame frame = POOL.acquireFrame(
ErrorFrameFlyweight.computeFrameLength(metadata.capacity(), data.capacity()));
ErrorFrameFlyweight.computeFrameLength(metadata.remaining(), data.remaining()));

frame.length = ErrorFrameFlyweight.encode(
frame.directBuffer, frame.offset, streamId, code, metadata, data);
Expand Down Expand Up @@ -354,7 +354,7 @@ public static class Lease
{
public static Frame from(int ttl, int numberOfRequests, ByteBuffer metadata)
{
final Frame frame = POOL.acquireFrame(LeaseFrameFlyweight.computeFrameLength(metadata.capacity()));
final Frame frame = POOL.acquireFrame(LeaseFrameFlyweight.computeFrameLength(metadata.remaining()));

frame.length = LeaseFrameFlyweight.encode(frame.directBuffer, frame.offset, ttl, numberOfRequests, metadata);
return frame;
Expand Down Expand Up @@ -397,7 +397,7 @@ public static Frame from(int streamId, FrameType type, Payload payload, int init
final ByteBuffer d = payload.getData() != null ? payload.getData() : NULL_BYTEBUFFER;
final ByteBuffer md = payload.getMetadata() != null ? payload.getMetadata() : NULL_BYTEBUFFER;

final Frame frame = POOL.acquireFrame(RequestFrameFlyweight.computeFrameLength(type, md.capacity(), d.capacity()));
final Frame frame = POOL.acquireFrame(RequestFrameFlyweight.computeFrameLength(type, md.remaining(), d.remaining()));

if (type.hasInitialRequestN())
{
Expand All @@ -421,7 +421,7 @@ public static Frame from(int streamId, FrameType type, int flags)

public static Frame from(int streamId, FrameType type, ByteBuffer metadata, ByteBuffer data, int initialRequestN, int flags)
{
final Frame frame = POOL.acquireFrame(RequestFrameFlyweight.computeFrameLength(type, metadata.capacity(), data.capacity()));
final Frame frame = POOL.acquireFrame(RequestFrameFlyweight.computeFrameLength(type, metadata.remaining(), data.remaining()));

frame.length = RequestFrameFlyweight.encode(frame.directBuffer, frame.offset, streamId, flags, type, initialRequestN, metadata, data);
return frame;
Expand Down Expand Up @@ -471,7 +471,7 @@ public static Frame from(int streamId, FrameType type, Payload payload)
final ByteBuffer metadata = payload.getMetadata() != null ? payload.getMetadata() : NULL_BYTEBUFFER;

final Frame frame =
POOL.acquireFrame(FrameHeaderFlyweight.computeFrameHeaderLength(type, metadata.capacity(), data.capacity()));
POOL.acquireFrame(FrameHeaderFlyweight.computeFrameHeaderLength(type, metadata.remaining(), data.remaining()));

frame.length = FrameHeaderFlyweight.encode(frame.directBuffer, frame.offset, streamId, 0, type, metadata, data);
return frame;
Expand All @@ -480,7 +480,7 @@ public static Frame from(int streamId, FrameType type, Payload payload)
public static Frame from(int streamId, FrameType type, ByteBuffer metadata, ByteBuffer data, int flags)
{
final Frame frame =
POOL.acquireFrame(FrameHeaderFlyweight.computeFrameHeaderLength(type, metadata.capacity(), data.capacity()));
POOL.acquireFrame(FrameHeaderFlyweight.computeFrameHeaderLength(type, metadata.remaining(), data.remaining()));

frame.length = FrameHeaderFlyweight.encode(frame.directBuffer, frame.offset, streamId, flags, type, metadata, data);
return frame;
Expand Down Expand Up @@ -515,7 +515,7 @@ public static class Keepalive
public static Frame from(ByteBuffer data, boolean respond)
{
final Frame frame =
POOL.acquireFrame(FrameHeaderFlyweight.computeFrameHeaderLength(FrameType.KEEPALIVE, 0, data.capacity()));
POOL.acquireFrame(FrameHeaderFlyweight.computeFrameHeaderLength(FrameType.KEEPALIVE, 0, data.remaining()));

final int flags = (respond ? FrameHeaderFlyweight.FLAGS_KEEPALIVE_R : 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static int encode(
final ByteBuffer metadata,
final ByteBuffer data
) {
final int frameLength = computeFrameLength(metadata.capacity(), data.capacity());
final int frameLength = computeFrameLength(metadata.remaining(), data.remaining());

int length = FrameHeaderFlyweight.encodeFrameHeader(
mutableDirectBuffer, offset, frameLength, 0, FrameType.ERROR, streamId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,17 @@ public static int encodeMetadata(
final ByteBuffer metadata)
{
int length = 0;
final int metadataLength = metadata.remaining();

if (0 < metadata.capacity())
if (0 < metadataLength)
{
int flags = mutableDirectBuffer.getShort(frameHeaderStartOffset + FLAGS_FIELD_OFFSET, ByteOrder.BIG_ENDIAN);
flags |= FLAGS_M;
mutableDirectBuffer.putShort(frameHeaderStartOffset + FLAGS_FIELD_OFFSET, (short)flags, ByteOrder.BIG_ENDIAN);
mutableDirectBuffer.putInt(metadataOffset, metadata.capacity() + BitUtil.SIZE_OF_INT, ByteOrder.BIG_ENDIAN);
length += BitUtil.SIZE_OF_INT;
mutableDirectBuffer.putBytes(metadataOffset + length, metadata, metadata.capacity());
length += metadata.capacity();
mutableDirectBuffer.putBytes(metadataOffset + length, metadata, metadataLength);
length += metadataLength;
}

return length;
Expand All @@ -131,11 +132,12 @@ public static int encodeData(
final ByteBuffer data)
{
int length = 0;
final int dataLength = data.remaining();

if (0 < data.capacity())
{
mutableDirectBuffer.putBytes(dataOffset, data, data.capacity());
length += data.capacity();
mutableDirectBuffer.putBytes(dataOffset, data, dataLength);
length += dataLength;
}

return length;
Expand All @@ -151,7 +153,7 @@ public static int encode(
final ByteBuffer metadata,
final ByteBuffer data)
{
final int frameLength = computeFrameHeaderLength(frameType, metadata.capacity(), data.capacity());
final int frameLength = computeFrameHeaderLength(frameType, metadata.remaining(), data.remaining());

final FrameType outFrameType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static int encode(
final int offset,
final ByteBuffer data)
{
final int frameLength = computeFrameLength(data.capacity());
final int frameLength = computeFrameLength(data.remaining());

int length = FrameHeaderFlyweight.encodeFrameHeader(mutableDirectBuffer, offset, frameLength, 0, FrameType.KEEPALIVE, 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static int encode(
final int numRequests,
final ByteBuffer metadata)
{
final int frameLength = computeFrameLength(metadata.capacity());
final int frameLength = computeFrameLength(metadata.remaining());

int length = FrameHeaderFlyweight.encodeFrameHeader(mutableDirectBuffer, offset, frameLength, 0, FrameType.LEASE, 0);

Expand Down
10 changes: 6 additions & 4 deletions src/main/java/io/reactivesocket/internal/PayloadBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,16 @@ public void append(final Payload payload)
{
final ByteBuffer payloadData = payload.getData();
final ByteBuffer payloadMetadata = payload.getMetadata();
final int dataLength = payloadData.remaining();
final int metadataLength = payloadMetadata.remaining();

ensureDataCapacity(payloadData.capacity());
ensureMetadataCapacity(payloadMetadata.capacity());
ensureDataCapacity(dataLength);
ensureMetadataCapacity(metadataLength);

dataMutableDirectBuffer.putBytes(dataLimit, payloadData, payloadData.capacity());
dataLimit += payloadData.capacity();
dataLimit += dataLength;
metadataMutableDirectBuffer.putBytes(metadataLimit, payloadMetadata, payloadMetadata.capacity());
metadataLimit += payloadMetadata.capacity();
metadataLimit += metadataLength;
}

private void ensureDataCapacity(final int additionalCapacity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ public void resetForRequestChannel(final int streamId, final Payload payload, fi

public boolean hasNext()
{
return (dataOffset < data.capacity() || metadataOffset < metadata.capacity());
return (dataOffset < data.capacity() || metadataOffset < metadata.remaining());
}

public Frame next()
{
final int metadataLength = Math.min(metadataMtu, metadata.capacity() - metadataOffset);
final int dataLength = Math.min(dataMtu, data.capacity() - dataOffset);
final int metadataLength = Math.min(metadataMtu, metadata.remaining() - metadataOffset);
final int dataLength = Math.min(dataMtu, data.remaining() - dataOffset);

Frame result = null;

Expand All @@ -84,7 +84,7 @@ public Frame next()
metadataOffset += metadataLength;
dataOffset += dataLength;

final boolean isMoreFollowing = (metadataOffset < metadata.capacity() || dataOffset < data.capacity());
final boolean isMoreFollowing = (metadataOffset < metadata.remaining() || dataOffset < data.remaining());
int flags = 0;

if (Type.RESPONSE == type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public static int encode(
final ByteBuffer metadata,
final ByteBuffer data)
{
final int frameLength = computeFrameLength(type, metadata.capacity(), data.capacity());
final int frameLength = computeFrameLength(type, metadata.remaining(), data.remaining());

flags |= FLAGS_REQUEST_CHANNEL_N;
int length = FrameHeaderFlyweight.encodeFrameHeader(mutableDirectBuffer, offset, frameLength, flags, type, streamId);
Expand All @@ -76,7 +76,7 @@ public static int encode(
final ByteBuffer metadata,
final ByteBuffer data)
{
final int frameLength = computeFrameLength(type, metadata.capacity(), data.capacity());
final int frameLength = computeFrameLength(type, metadata.remaining(), data.remaining());

int length = FrameHeaderFlyweight.encodeFrameHeader(mutableDirectBuffer, offset, frameLength, flags, type, streamId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static int encode(
final ByteBuffer metadata,
final ByteBuffer data)
{
final int frameLength = computeFrameLength(metadataMimeType, dataMimeType, metadata.capacity(), data.capacity());
final int frameLength = computeFrameLength(metadataMimeType, dataMimeType, metadata.remaining(), data.remaining());

int length = FrameHeaderFlyweight.encodeFrameHeader(mutableDirectBuffer, offset, frameLength, flags, FrameType.SETUP, 0);

Expand Down
32 changes: 20 additions & 12 deletions src/main/java/io/reactivesocket/internal/ThreadSafeFramePool.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,19 @@ public class ThreadSafeFramePool implements FramePool
{
private static final int MAX_CACHED_FRAMES = 16;

private static final OneToOneConcurrentArrayQueue<Frame> FRAME_QUEUE =
new OneToOneConcurrentArrayQueue<>(MAX_CACHED_FRAMES);
private final OneToOneConcurrentArrayQueue<Frame> frameQueue;
private final OneToOneConcurrentArrayQueue<MutableDirectBuffer> directBufferQueue;

private static final OneToOneConcurrentArrayQueue<MutableDirectBuffer> DIRECTBUFFER_QUEUE =
new OneToOneConcurrentArrayQueue<>(MAX_CACHED_FRAMES);
public ThreadSafeFramePool()
{
this(MAX_CACHED_FRAMES, MAX_CACHED_FRAMES);
}

public ThreadSafeFramePool(final int frameQueueLength, final int directBufferQueueLength)
{
frameQueue = new OneToOneConcurrentArrayQueue<>(frameQueueLength);
directBufferQueue = new OneToOneConcurrentArrayQueue<>(directBufferQueueLength);
}

public Frame acquireFrame(int size)
{
Expand Down Expand Up @@ -89,33 +97,33 @@ public MutableDirectBuffer acquireMutableDirectBuffer(int size)

public void release(Frame frame)
{
synchronized (FRAME_QUEUE)
synchronized (frameQueue)
{
FRAME_QUEUE.offer(frame);
frameQueue.offer(frame);
}
}

public void release(MutableDirectBuffer mutableDirectBuffer)
{
synchronized (DIRECTBUFFER_QUEUE)
synchronized (directBufferQueue)
{
DIRECTBUFFER_QUEUE.offer(mutableDirectBuffer);
directBufferQueue.offer(mutableDirectBuffer);
}
}

private Frame pollFrame()
{
synchronized (FRAME_QUEUE)
synchronized (frameQueue)
{
return FRAME_QUEUE.poll();
return frameQueue.poll();
}
}

private MutableDirectBuffer pollMutableDirectBuffer()
{
synchronized (DIRECTBUFFER_QUEUE)
synchronized (directBufferQueue)
{
return DIRECTBUFFER_QUEUE.poll();
return directBufferQueue.poll();
}
}
}