Skip to content

Commit a0a2687

Browse files
authored
more performance improvements (#659)
* added logic to handle different websocket frames Signed-off-by: Robert Roeser <[email protected]> * tests and formatting Signed-off-by: Robert Roeser <[email protected]> * disable debug logging Signed-off-by: Robert Roeser <[email protected]> * fixes tuple frames to properly span internal byte arrays Signed-off-by: Robert Roeser <[email protected]> * fixes tuple frames to properly span internal byte arrays Signed-off-by: Robert Roeser <[email protected]> * update tuple buffer Signed-off-by: Robert Roeser <[email protected]> * tests and formatting Signed-off-by: Robert Roeser <[email protected]> * disable debug logging Signed-off-by: Robert Roeser <[email protected]> * fixes tuple frames to properly span internal byte arrays Signed-off-by: Robert Roeser <[email protected]> * fixes tuple frames to properly span internal byte arrays Signed-off-by: Robert Roeser <[email protected]> * update tuple buffer Signed-off-by: Robert Roeser <[email protected]> * performance improvements Signed-off-by: Robert Roeser <[email protected]> * delete file Signed-off-by: Robert Roeser <[email protected]> * fix javadoc Signed-off-by: Robert Roeser <[email protected]> * fixing javadoc Signed-off-by: Robert Roeser <[email protected]>
1 parent fa5f179 commit a0a2687

24 files changed

+2984
-24
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketRequester.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,21 @@
2222
import io.netty.buffer.ByteBuf;
2323
import io.netty.buffer.ByteBufAllocator;
2424
import io.netty.util.ReferenceCountUtil;
25-
import io.netty.util.collection.IntObjectHashMap;
25+
import io.netty.util.collection.IntObjectMap;
2626
import io.rsocket.exceptions.ConnectionErrorException;
2727
import io.rsocket.exceptions.Exceptions;
2828
import io.rsocket.frame.*;
2929
import io.rsocket.frame.decoder.PayloadDecoder;
30-
import io.rsocket.internal.*;
30+
import io.rsocket.internal.LimitableRequestPublisher;
31+
import io.rsocket.internal.SynchronizedIntObjectHashMap;
32+
import io.rsocket.internal.UnboundedProcessor;
33+
import io.rsocket.internal.UnicastMonoProcessor;
3134
import io.rsocket.keepalive.KeepAliveFramesAcceptor;
3235
import io.rsocket.keepalive.KeepAliveHandler;
3336
import io.rsocket.keepalive.KeepAliveSupport;
3437
import io.rsocket.lease.RequesterLeaseHandler;
3538
import io.rsocket.util.OnceConsumer;
3639
import java.nio.channels.ClosedChannelException;
37-
import java.util.Collections;
38-
import java.util.Map;
3940
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
4041
import java.util.function.Consumer;
4142
import java.util.function.LongConsumer;
@@ -59,8 +60,8 @@ class RSocketRequester implements RSocket {
5960
private final PayloadDecoder payloadDecoder;
6061
private final Consumer<Throwable> errorConsumer;
6162
private final StreamIdSupplier streamIdSupplier;
62-
private final Map<Integer, LimitableRequestPublisher> senders;
63-
private final Map<Integer, Processor<Payload, Payload>> receivers;
63+
private final IntObjectMap<LimitableRequestPublisher> senders;
64+
private final IntObjectMap<Processor<Payload, Payload>> receivers;
6465
private final UnboundedProcessor<ByteBuf> sendProcessor;
6566
private final RequesterLeaseHandler leaseHandler;
6667
private final ByteBufAllocator allocator;
@@ -83,8 +84,8 @@ class RSocketRequester implements RSocket {
8384
this.errorConsumer = errorConsumer;
8485
this.streamIdSupplier = streamIdSupplier;
8586
this.leaseHandler = leaseHandler;
86-
this.senders = Collections.synchronizedMap(new IntObjectHashMap<>());
87-
this.receivers = Collections.synchronizedMap(new IntObjectHashMap<>());
87+
this.senders = new SynchronizedIntObjectHashMap<>();
88+
this.receivers = new SynchronizedIntObjectHashMap<>();
8889

8990
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
9091
this.sendProcessor = new UnboundedProcessor<>();

rsocket-core/src/main/java/io/rsocket/RSocketResponder.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,14 @@
1919
import io.netty.buffer.ByteBuf;
2020
import io.netty.buffer.ByteBufAllocator;
2121
import io.netty.util.ReferenceCountUtil;
22-
import io.netty.util.collection.IntObjectHashMap;
22+
import io.netty.util.collection.IntObjectMap;
2323
import io.rsocket.exceptions.ApplicationErrorException;
2424
import io.rsocket.frame.*;
2525
import io.rsocket.frame.decoder.PayloadDecoder;
2626
import io.rsocket.internal.LimitableRequestPublisher;
27+
import io.rsocket.internal.SynchronizedIntObjectHashMap;
2728
import io.rsocket.internal.UnboundedProcessor;
2829
import io.rsocket.lease.ResponderLeaseHandler;
29-
import java.util.Collections;
30-
import java.util.Map;
3130
import java.util.function.Consumer;
3231
import org.reactivestreams.Processor;
3332
import org.reactivestreams.Publisher;
@@ -47,9 +46,9 @@ class RSocketResponder implements ResponderRSocket {
4746
private final Consumer<Throwable> errorConsumer;
4847
private final ResponderLeaseHandler leaseHandler;
4948

50-
private final Map<Integer, LimitableRequestPublisher> sendingLimitableSubscriptions;
51-
private final Map<Integer, Subscription> sendingSubscriptions;
52-
private final Map<Integer, Processor<Payload, Payload>> channelProcessors;
49+
private final IntObjectMap<LimitableRequestPublisher> sendingLimitableSubscriptions;
50+
private final IntObjectMap<Subscription> sendingSubscriptions;
51+
private final IntObjectMap<Processor<Payload, Payload>> channelProcessors;
5352

5453
private final UnboundedProcessor<ByteBuf> sendProcessor;
5554
private final ByteBufAllocator allocator;
@@ -71,9 +70,9 @@ class RSocketResponder implements ResponderRSocket {
7170
this.payloadDecoder = payloadDecoder;
7271
this.errorConsumer = errorConsumer;
7372
this.leaseHandler = leaseHandler;
74-
this.sendingLimitableSubscriptions = Collections.synchronizedMap(new IntObjectHashMap<>());
75-
this.sendingSubscriptions = Collections.synchronizedMap(new IntObjectHashMap<>());
76-
this.channelProcessors = Collections.synchronizedMap(new IntObjectHashMap<>());
73+
this.sendingLimitableSubscriptions = new SynchronizedIntObjectHashMap<>();
74+
this.sendingSubscriptions = new SynchronizedIntObjectHashMap<>();
75+
this.channelProcessors = new SynchronizedIntObjectHashMap<>();
7776

7877
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
7978
// connections
Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
/*
2+
* Copyright 2014-2019 Real Logic Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket.internal;
17+
18+
import static java.nio.charset.StandardCharsets.UTF_8;
19+
20+
import java.util.concurrent.ThreadLocalRandom;
21+
22+
/** Miscellaneous useful functions for dealing with low level bits and bytes. */
23+
public class BitUtil {
24+
/** Size of a byte in bytes */
25+
public static final int SIZE_OF_BYTE = 1;
26+
27+
/** Size of a boolean in bytes */
28+
public static final int SIZE_OF_BOOLEAN = 1;
29+
30+
/** Size of a char in bytes */
31+
public static final int SIZE_OF_CHAR = 2;
32+
33+
/** Size of a short in bytes */
34+
public static final int SIZE_OF_SHORT = 2;
35+
36+
/** Size of an int in bytes */
37+
public static final int SIZE_OF_INT = 4;
38+
39+
/** Size of a float in bytes */
40+
public static final int SIZE_OF_FLOAT = 4;
41+
42+
/** Size of a long in bytes */
43+
public static final int SIZE_OF_LONG = 8;
44+
45+
/** Size of a double in bytes */
46+
public static final int SIZE_OF_DOUBLE = 8;
47+
48+
/** Length of the data blocks used by the CPU cache sub-system in bytes. */
49+
public static final int CACHE_LINE_LENGTH = 64;
50+
51+
private static final byte[] HEX_DIGIT_TABLE = {
52+
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'
53+
};
54+
55+
private static final byte[] FROM_HEX_DIGIT_TABLE;
56+
57+
static {
58+
FROM_HEX_DIGIT_TABLE = new byte[128];
59+
60+
FROM_HEX_DIGIT_TABLE['0'] = 0x00;
61+
FROM_HEX_DIGIT_TABLE['1'] = 0x01;
62+
FROM_HEX_DIGIT_TABLE['2'] = 0x02;
63+
FROM_HEX_DIGIT_TABLE['3'] = 0x03;
64+
FROM_HEX_DIGIT_TABLE['4'] = 0x04;
65+
FROM_HEX_DIGIT_TABLE['5'] = 0x05;
66+
FROM_HEX_DIGIT_TABLE['6'] = 0x06;
67+
FROM_HEX_DIGIT_TABLE['7'] = 0x07;
68+
FROM_HEX_DIGIT_TABLE['8'] = 0x08;
69+
FROM_HEX_DIGIT_TABLE['9'] = 0x09;
70+
FROM_HEX_DIGIT_TABLE['a'] = 0x0a;
71+
FROM_HEX_DIGIT_TABLE['A'] = 0x0a;
72+
FROM_HEX_DIGIT_TABLE['b'] = 0x0b;
73+
FROM_HEX_DIGIT_TABLE['B'] = 0x0b;
74+
FROM_HEX_DIGIT_TABLE['c'] = 0x0c;
75+
FROM_HEX_DIGIT_TABLE['C'] = 0x0c;
76+
FROM_HEX_DIGIT_TABLE['d'] = 0x0d;
77+
FROM_HEX_DIGIT_TABLE['D'] = 0x0d;
78+
FROM_HEX_DIGIT_TABLE['e'] = 0x0e;
79+
FROM_HEX_DIGIT_TABLE['E'] = 0x0e;
80+
FROM_HEX_DIGIT_TABLE['f'] = 0x0f;
81+
FROM_HEX_DIGIT_TABLE['F'] = 0x0f;
82+
}
83+
84+
private static final int LAST_DIGIT_MASK = 0b1;
85+
86+
/**
87+
* Fast method of finding the next power of 2 greater than or equal to the supplied value.
88+
*
89+
* <p>If the value is &lt;= 0 then 1 will be returned.
90+
*
91+
* <p>This method is not suitable for {@link Integer#MIN_VALUE} or numbers greater than 2^30.
92+
*
93+
* @param value from which to search for next power of 2
94+
* @return The next power of 2 or the value itself if it is a power of 2
95+
*/
96+
public static int findNextPositivePowerOfTwo(final int value) {
97+
return 1 << (Integer.SIZE - Integer.numberOfLeadingZeros(value - 1));
98+
}
99+
100+
/**
101+
* Align a value to the next multiple up of alignment. If the value equals an alignment multiple
102+
* then it is returned unchanged.
103+
*
104+
* <p>This method executes without branching. This code is designed to be use in the fast path and
105+
* should not be used with negative numbers. Negative numbers will result in undefined behaviour.
106+
*
107+
* @param value to be aligned up.
108+
* @param alignment to be used.
109+
* @return the value aligned to the next boundary.
110+
*/
111+
public static int align(final int value, final int alignment) {
112+
return (value + (alignment - 1)) & -alignment;
113+
}
114+
115+
/**
116+
* Generate a byte array from the hex representation of the given byte array.
117+
*
118+
* @param buffer to convert from a hex representation (in Big Endian).
119+
* @return new byte array that is decimal representation of the passed array.
120+
*/
121+
public static byte[] fromHexByteArray(final byte[] buffer) {
122+
final byte[] outputBuffer = new byte[buffer.length >> 1];
123+
124+
for (int i = 0; i < buffer.length; i += 2) {
125+
final int hi = FROM_HEX_DIGIT_TABLE[buffer[i]] << 4;
126+
final int lo = FROM_HEX_DIGIT_TABLE[buffer[i + 1]]; // lgtm [java/index-out-of-bounds]
127+
outputBuffer[i >> 1] = (byte) (hi | lo);
128+
}
129+
130+
return outputBuffer;
131+
}
132+
133+
/**
134+
* Generate a byte array that is a hex representation of a given byte array.
135+
*
136+
* @param buffer to convert to a hex representation.
137+
* @return new byte array that is hex representation (in Big Endian) of the passed array.
138+
*/
139+
public static byte[] toHexByteArray(final byte[] buffer) {
140+
return toHexByteArray(buffer, 0, buffer.length);
141+
}
142+
143+
/**
144+
* Generate a byte array that is a hex representation of a given byte array.
145+
*
146+
* @param buffer to convert to a hex representation.
147+
* @param offset the offset into the buffer.
148+
* @param length the number of bytes to convert.
149+
* @return new byte array that is hex representation (in Big Endian) of the passed array.
150+
*/
151+
public static byte[] toHexByteArray(final byte[] buffer, final int offset, final int length) {
152+
final byte[] outputBuffer = new byte[length << 1];
153+
154+
for (int i = 0; i < (length << 1); i += 2) {
155+
final byte b = buffer[offset + (i >> 1)];
156+
157+
outputBuffer[i] = HEX_DIGIT_TABLE[(b >> 4) & 0x0F];
158+
outputBuffer[i + 1] = HEX_DIGIT_TABLE[b & 0x0F];
159+
}
160+
161+
return outputBuffer;
162+
}
163+
164+
/**
165+
* Generate a byte array from a string that is the hex representation of the given byte array.
166+
*
167+
* @param string to convert from a hex representation (in Big Endian).
168+
* @return new byte array holding the decimal representation of the passed array.
169+
*/
170+
public static byte[] fromHex(final String string) {
171+
return fromHexByteArray(string.getBytes(UTF_8));
172+
}
173+
174+
/**
175+
* Generate a string that is the hex representation of a given byte array.
176+
*
177+
* @param buffer to convert to a hex representation.
178+
* @param offset the offset into the buffer.
179+
* @param length the number of bytes to convert.
180+
* @return new String holding the hex representation (in Big Endian) of the passed array.
181+
*/
182+
public static String toHex(final byte[] buffer, final int offset, final int length) {
183+
return new String(toHexByteArray(buffer, offset, length), UTF_8);
184+
}
185+
186+
/**
187+
* Generate a string that is the hex representation of a given byte array.
188+
*
189+
* @param buffer to convert to a hex representation.
190+
* @return new String holding the hex representation (in Big Endian) of the passed array.
191+
*/
192+
public static String toHex(final byte[] buffer) {
193+
return new String(toHexByteArray(buffer), UTF_8);
194+
}
195+
196+
/**
197+
* Is a number even.
198+
*
199+
* @param value to check.
200+
* @return true if the number is even otherwise false.
201+
*/
202+
public static boolean isEven(final int value) {
203+
return (value & LAST_DIGIT_MASK) == 0;
204+
}
205+
206+
/**
207+
* Is a value a positive power of 2.
208+
*
209+
* @param value to be checked.
210+
* @return true if the number is a positive power of 2, otherwise false.
211+
*/
212+
public static boolean isPowerOfTwo(final int value) {
213+
return value > 0 && ((value & (~value + 1)) == value);
214+
}
215+
216+
/**
217+
* Cycles indices of an array one at a time in a forward fashion
218+
*
219+
* @param current value to be incremented.
220+
* @param max value for the cycle.
221+
* @return the next value, or zero if max is reached.
222+
*/
223+
public static int next(final int current, final int max) {
224+
int next = current + 1;
225+
if (next == max) {
226+
next = 0;
227+
}
228+
229+
return next;
230+
}
231+
232+
/**
233+
* Cycles indices of an array one at a time in a backwards fashion
234+
*
235+
* @param current value to be decremented.
236+
* @param max value of the cycle.
237+
* @return the next value, or max - 1 if current is zero.
238+
*/
239+
public static int previous(final int current, final int max) {
240+
if (0 == current) {
241+
return max - 1;
242+
}
243+
244+
return current - 1;
245+
}
246+
247+
/**
248+
* Calculate the shift value to scale a number based on how refs are compressed or not.
249+
*
250+
* @param scale of the number reported by Unsafe.
251+
* @return how many times the number needs to be shifted to the left.
252+
*/
253+
public static int calculateShiftForScale(final int scale) {
254+
if (4 == scale) {
255+
return 2;
256+
} else if (8 == scale) {
257+
return 3;
258+
}
259+
260+
throw new IllegalArgumentException("unknown pointer size for scale=" + scale);
261+
}
262+
263+
/**
264+
* Generate a randomised integer over [{@link Integer#MIN_VALUE}, {@link Integer#MAX_VALUE}].
265+
*
266+
* @return randomised integer suitable as an Id.
267+
*/
268+
public static int generateRandomisedId() {
269+
return ThreadLocalRandom.current().nextInt();
270+
}
271+
272+
/**
273+
* Is an address aligned on a boundary.
274+
*
275+
* @param address to be tested.
276+
* @param alignment boundary the address is tested against.
277+
* @return true if the address is on the aligned boundary otherwise false.
278+
* @throws IllegalArgumentException if the alignment is not a power of 2.
279+
*/
280+
public static boolean isAligned(final long address, final int alignment) {
281+
if (!BitUtil.isPowerOfTwo(alignment)) {
282+
throw new IllegalArgumentException("alignment must be a power of 2: alignment=" + alignment);
283+
}
284+
285+
return (address & (alignment - 1)) == 0;
286+
}
287+
}

0 commit comments

Comments
 (0)