Skip to content

Commit 2e6b899

Browse files
committed
Merge pull request #47 from ReactiveSocket/thread-safe-frame-pool
add thread safe frame pool
2 parents 0a5a666 + 61b14c8 commit 2e6b899

File tree

1 file changed

+109
-0
lines changed

1 file changed

+109
-0
lines changed
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.reactivesocket.internal;
17+
18+
import io.reactivesocket.Frame;
19+
import uk.co.real_logic.agrona.MutableDirectBuffer;
20+
import uk.co.real_logic.agrona.concurrent.OneToOneConcurrentArrayQueue;
21+
import uk.co.real_logic.agrona.concurrent.UnsafeBuffer;
22+
23+
import java.nio.ByteBuffer;
24+
25+
public class ThreadSafeFramePool implements FramePool
26+
{
27+
private static final int MAX_CACHED_FRAMES = 16;
28+
29+
private static final OneToOneConcurrentArrayQueue<Frame> FRAME_QUEUE =
30+
new OneToOneConcurrentArrayQueue<>(MAX_CACHED_FRAMES);
31+
32+
private static final OneToOneConcurrentArrayQueue<MutableDirectBuffer> DIRECTBUFFER_QUEUE =
33+
new OneToOneConcurrentArrayQueue<>(MAX_CACHED_FRAMES);
34+
35+
public Frame acquireFrame(int size)
36+
{
37+
final MutableDirectBuffer directBuffer = acquireMutableDirectBuffer(size);
38+
39+
Frame frame = pollFrame();
40+
if (null == frame)
41+
{
42+
frame = Frame.allocate(directBuffer);
43+
}
44+
45+
return frame;
46+
}
47+
48+
public Frame acquireFrame(ByteBuffer byteBuffer)
49+
{
50+
return Frame.allocate(new UnsafeBuffer(byteBuffer));
51+
}
52+
53+
public Frame acquireFrame(MutableDirectBuffer mutableDirectBuffer)
54+
{
55+
Frame frame = pollFrame();
56+
if (null == frame)
57+
{
58+
frame = Frame.allocate(mutableDirectBuffer);
59+
}
60+
61+
return frame;
62+
}
63+
64+
public MutableDirectBuffer acquireMutableDirectBuffer(ByteBuffer byteBuffer)
65+
{
66+
MutableDirectBuffer directBuffer = pollMutableDirectBuffer();
67+
if (null == directBuffer)
68+
{
69+
directBuffer = new UnsafeBuffer(byteBuffer);
70+
}
71+
72+
return directBuffer;
73+
}
74+
75+
public MutableDirectBuffer acquireMutableDirectBuffer(int size)
76+
{
77+
UnsafeBuffer directBuffer = (UnsafeBuffer)pollMutableDirectBuffer();
78+
if (null == directBuffer || directBuffer.byteBuffer().capacity() < size)
79+
{
80+
directBuffer = new UnsafeBuffer(ByteBuffer.allocate(size));
81+
}
82+
else
83+
{
84+
directBuffer.byteBuffer().limit(size).position(0);
85+
}
86+
87+
return directBuffer;
88+
}
89+
90+
public synchronized void release(Frame frame)
91+
{
92+
FRAME_QUEUE.offer(frame);
93+
}
94+
95+
public synchronized void release(MutableDirectBuffer mutableDirectBuffer)
96+
{
97+
DIRECTBUFFER_QUEUE.offer(mutableDirectBuffer);
98+
}
99+
100+
private synchronized Frame pollFrame()
101+
{
102+
return FRAME_QUEUE.poll();
103+
}
104+
105+
private synchronized MutableDirectBuffer pollMutableDirectBuffer()
106+
{
107+
return DIRECTBUFFER_QUEUE.poll();
108+
}
109+
}

0 commit comments

Comments
 (0)