Skip to content

more performance improvements #659

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jul 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions rsocket-core/src/main/java/io/rsocket/RSocketRequester.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,21 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.exceptions.Exceptions;
import io.rsocket.frame.*;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.internal.*;
import io.rsocket.internal.LimitableRequestPublisher;
import io.rsocket.internal.SynchronizedIntObjectHashMap;
import io.rsocket.internal.UnboundedProcessor;
import io.rsocket.internal.UnicastMonoProcessor;
import io.rsocket.keepalive.KeepAliveFramesAcceptor;
import io.rsocket.keepalive.KeepAliveHandler;
import io.rsocket.keepalive.KeepAliveSupport;
import io.rsocket.lease.RequesterLeaseHandler;
import io.rsocket.util.OnceConsumer;
import java.nio.channels.ClosedChannelException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
Expand All @@ -59,8 +60,8 @@ class RSocketRequester implements RSocket {
private final PayloadDecoder payloadDecoder;
private final Consumer<Throwable> errorConsumer;
private final StreamIdSupplier streamIdSupplier;
private final Map<Integer, LimitableRequestPublisher> senders;
private final Map<Integer, Processor<Payload, Payload>> receivers;
private final IntObjectMap<LimitableRequestPublisher> senders;
private final IntObjectMap<Processor<Payload, Payload>> receivers;
private final UnboundedProcessor<ByteBuf> sendProcessor;
private final RequesterLeaseHandler leaseHandler;
private final ByteBufAllocator allocator;
Expand All @@ -83,8 +84,8 @@ class RSocketRequester implements RSocket {
this.errorConsumer = errorConsumer;
this.streamIdSupplier = streamIdSupplier;
this.leaseHandler = leaseHandler;
this.senders = Collections.synchronizedMap(new IntObjectHashMap<>());
this.receivers = Collections.synchronizedMap(new IntObjectHashMap<>());
this.senders = new SynchronizedIntObjectHashMap<>();
this.receivers = new SynchronizedIntObjectHashMap<>();

// DO NOT Change the order here. The Send processor must be subscribed to before receiving
this.sendProcessor = new UnboundedProcessor<>();
Expand Down
17 changes: 8 additions & 9 deletions rsocket-core/src/main/java/io/rsocket/RSocketResponder.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.rsocket.exceptions.ApplicationErrorException;
import io.rsocket.frame.*;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.internal.LimitableRequestPublisher;
import io.rsocket.internal.SynchronizedIntObjectHashMap;
import io.rsocket.internal.UnboundedProcessor;
import io.rsocket.lease.ResponderLeaseHandler;
import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
Expand All @@ -47,9 +46,9 @@ class RSocketResponder implements ResponderRSocket {
private final Consumer<Throwable> errorConsumer;
private final ResponderLeaseHandler leaseHandler;

private final Map<Integer, LimitableRequestPublisher> sendingLimitableSubscriptions;
private final Map<Integer, Subscription> sendingSubscriptions;
private final Map<Integer, Processor<Payload, Payload>> channelProcessors;
private final IntObjectMap<LimitableRequestPublisher> sendingLimitableSubscriptions;
private final IntObjectMap<Subscription> sendingSubscriptions;
private final IntObjectMap<Processor<Payload, Payload>> channelProcessors;

private final UnboundedProcessor<ByteBuf> sendProcessor;
private final ByteBufAllocator allocator;
Expand All @@ -71,9 +70,9 @@ class RSocketResponder implements ResponderRSocket {
this.payloadDecoder = payloadDecoder;
this.errorConsumer = errorConsumer;
this.leaseHandler = leaseHandler;
this.sendingLimitableSubscriptions = Collections.synchronizedMap(new IntObjectHashMap<>());
this.sendingSubscriptions = Collections.synchronizedMap(new IntObjectHashMap<>());
this.channelProcessors = Collections.synchronizedMap(new IntObjectHashMap<>());
this.sendingLimitableSubscriptions = new SynchronizedIntObjectHashMap<>();
this.sendingSubscriptions = new SynchronizedIntObjectHashMap<>();
this.channelProcessors = new SynchronizedIntObjectHashMap<>();

// DO NOT Change the order here. The Send processor must be subscribed to before receiving
// connections
Expand Down
114 changes: 84 additions & 30 deletions rsocket-core/src/main/java/io/rsocket/buffer/AbstractTupleByteBuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,93 +54,152 @@ public ByteBuffer nioBuffer(int index, int length) {
}

@Override
protected byte _getByte(int index) {
public ByteBuffer[] nioBuffers(int index, int length) {
checkIndex(index, length);
if (length == 0) {
return new ByteBuffer[] {EMPTY_NIO_BUFFER};
}
return _nioBuffers(index, length);
}

protected abstract ByteBuffer[] _nioBuffers(int index, int length);

@Override
protected byte _getByte(final int index) {
long ri = calculateRelativeIndex(index);
ByteBuf byteBuf = getPart(index);

index = (int) (ri & Integer.MAX_VALUE);
int calculatedIndex = (int) (ri & Integer.MAX_VALUE);

return byteBuf.getByte(index);
return byteBuf.getByte(calculatedIndex);
}

@Override
protected short _getShort(int index) {
protected short _getShort(final int index) {
long ri = calculateRelativeIndex(index);
ByteBuf byteBuf = getPart(index);

index = (int) (ri & Integer.MAX_VALUE);
final int calculatedIndex = (int) (ri & Integer.MAX_VALUE);

return byteBuf.getShort(index);
if (calculatedIndex + Short.BYTES <= byteBuf.writerIndex()) {
return byteBuf.getShort(calculatedIndex);
} else if (order() == ByteOrder.BIG_ENDIAN) {
return (short) ((_getByte(index) & 0xff) << 8 | _getByte(index + 1) & 0xff);
} else {
return (short) (_getByte(index) & 0xff | (_getByte(index + 1) & 0xff) << 8);
}
}

@Override
protected short _getShortLE(int index) {
long ri = calculateRelativeIndex(index);
ByteBuf byteBuf = getPart(index);

index = (int) (ri & Integer.MAX_VALUE);
final int calculatedIndex = (int) (ri & Integer.MAX_VALUE);

return byteBuf.getShortLE(index);
if (calculatedIndex + Short.BYTES <= byteBuf.writerIndex()) {
return byteBuf.getShortLE(calculatedIndex);
} else if (order() == ByteOrder.BIG_ENDIAN) {
return (short) (_getByte(index) & 0xff | (_getByte(index + 1) & 0xff) << 8);
} else {
return (short) ((_getByte(index) & 0xff) << 8 | _getByte(index + 1) & 0xff);
}
}

@Override
protected int _getUnsignedMedium(int index) {
protected int _getUnsignedMedium(final int index) {
long ri = calculateRelativeIndex(index);
ByteBuf byteBuf = getPart(index);

index = (int) (ri & Integer.MAX_VALUE);
int calculatedIndex = (int) (ri & Integer.MAX_VALUE);

return byteBuf.getUnsignedMedium(index);
if (calculatedIndex + 3 <= byteBuf.writerIndex()) {
return byteBuf.getUnsignedMedium(calculatedIndex);
} else if (order() == ByteOrder.BIG_ENDIAN) {
return (_getShort(index) & 0xffff) << 8 | _getByte(index + 2) & 0xff;
} else {
return _getShort(index) & 0xFFFF | (_getByte(index + 2) & 0xFF) << 16;
}
}

@Override
protected int _getUnsignedMediumLE(int index) {
long ri = calculateRelativeIndex(index);
ByteBuf byteBuf = getPart(index);

index = (int) (ri & Integer.MAX_VALUE);
int calculatedIndex = (int) (ri & Integer.MAX_VALUE);

return byteBuf.getUnsignedMediumLE(index);
if (calculatedIndex + 3 <= byteBuf.writerIndex()) {
return byteBuf.getUnsignedMediumLE(calculatedIndex);
} else if (order() == ByteOrder.BIG_ENDIAN) {
return _getShortLE(index) & 0xffff | (_getByte(index + 2) & 0xff) << 16;
} else {
return (_getShortLE(index) & 0xffff) << 8 | _getByte(index + 2) & 0xff;
}
}

@Override
protected int _getInt(int index) {
protected int _getInt(final int index) {
long ri = calculateRelativeIndex(index);
ByteBuf byteBuf = getPart(index);

index = (int) (ri & Integer.MAX_VALUE);
int calculatedIndex = (int) (ri & Integer.MAX_VALUE);

return byteBuf.getInt(index);
if (calculatedIndex + Integer.BYTES <= byteBuf.writerIndex()) {
return byteBuf.getInt(calculatedIndex);
} else if (order() == ByteOrder.BIG_ENDIAN) {
return (_getShort(index) & 0xffff) << 16 | _getShort(index + 2) & 0xffff;
} else {
return _getShort(index) & 0xFFFF | (_getShort(index + 2) & 0xFFFF) << 16;
}
}

@Override
protected int _getIntLE(int index) {
protected int _getIntLE(final int index) {
long ri = calculateRelativeIndex(index);
ByteBuf byteBuf = getPart(index);

index = (int) (ri & Integer.MAX_VALUE);
int calculatedIndex = (int) (ri & Integer.MAX_VALUE);

return byteBuf.getIntLE(index);
if (calculatedIndex + Integer.BYTES <= byteBuf.writerIndex()) {
return byteBuf.getIntLE(calculatedIndex);
} else if (order() == ByteOrder.BIG_ENDIAN) {
return _getShortLE(index) & 0xffff | (_getShortLE(index + 2) & 0xffff) << 16;
} else {
return (_getShortLE(index) & 0xffff) << 16 | _getShortLE(index + 2) & 0xffff;
}
}

@Override
protected long _getLong(int index) {
protected long _getLong(final int index) {
long ri = calculateRelativeIndex(index);
ByteBuf byteBuf = getPart(index);

index = (int) (ri & Integer.MAX_VALUE);
int calculatedIndex = (int) (ri & Integer.MAX_VALUE);

return byteBuf.getLong(index);
if (calculatedIndex + Long.BYTES <= byteBuf.writerIndex()) {
return byteBuf.getLong(calculatedIndex);
} else if (order() == ByteOrder.BIG_ENDIAN) {
return (_getInt(index) & 0xffffffffL) << 32 | _getInt(index + 4) & 0xffffffffL;
} else {
return _getInt(index) & 0xFFFFFFFFL | (_getInt(index + 4) & 0xFFFFFFFFL) << 32;
}
}

@Override
protected long _getLongLE(int index) {
protected long _getLongLE(final int index) {
long ri = calculateRelativeIndex(index);
ByteBuf byteBuf = getPart(index);

index = (int) (ri & Integer.MAX_VALUE);
int calculatedIndex = (int) (ri & Integer.MAX_VALUE);

return byteBuf.getLongLE(index);
if (calculatedIndex + Long.BYTES <= byteBuf.writerIndex()) {
return byteBuf.getLongLE(calculatedIndex);
} else if (order() == ByteOrder.BIG_ENDIAN) {
return (_getInt(index) & 0xffffffffL) << 32 | _getInt(index + 4) & 0xffffffffL;
} else {
return _getInt(index) & 0xFFFFFFFFL | (_getInt(index + 4) & 0xFFFFFFFFL) << 32;
}
}

@Override
Expand Down Expand Up @@ -518,11 +577,6 @@ public long memoryAddress() {
throw new UnsupportedOperationException();
}

@Override
public int compareTo(ByteBuf buffer) {
return 0;
}

@Override
protected void _setByte(int index, int value) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,7 @@ public ByteBuffer nioBuffer() {
}

@Override
public ByteBuffer[] nioBuffers(int index, int length) {
if (length == 0) {
return new ByteBuffer[] {EMPTY_NIO_BUFFER};
}

public ByteBuffer[] _nioBuffers(int index, int length) {
long ri = calculateRelativeIndex(index);
index = (int) (ri & Integer.MAX_VALUE);
switch ((int) ((ri & MASK) >>> 32L)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,7 @@ public ByteBuffer nioBuffer() {
}

@Override
public ByteBuffer[] nioBuffers(int index, int length) {
if (length == 0) {
return new ByteBuffer[] {EMPTY_NIO_BUFFER};
}

public ByteBuffer[] _nioBuffers(int index, int length) {
long ri = calculateRelativeIndex(index);
index = (int) (ri & Integer.MAX_VALUE);
switch ((int) ((ri & MASK) >>> 32L)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.rsocket.buffer.TupleByteBuf;

class DataAndMetadataFlyweight {
public static final int FRAME_LENGTH_MASK = 0xFFFFFF;
Expand Down Expand Up @@ -32,19 +33,19 @@ private static int decodeLength(final ByteBuf byteBuf) {

static ByteBuf encodeOnlyMetadata(
ByteBufAllocator allocator, final ByteBuf header, ByteBuf metadata) {
return allocator.compositeBuffer().addComponents(true, header, metadata);
return TupleByteBuf.of(allocator, header, metadata);
}

static ByteBuf encodeOnlyData(ByteBufAllocator allocator, final ByteBuf header, ByteBuf data) {
return allocator.compositeBuffer().addComponents(true, header, data);
return TupleByteBuf.of(allocator, header, data);
}

static ByteBuf encode(
ByteBufAllocator allocator, final ByteBuf header, ByteBuf metadata, ByteBuf data) {

int length = metadata.readableBytes();
encodeLength(header, length);
return allocator.compositeBuffer().addComponents(true, header, metadata, data);
return TupleByteBuf.of(allocator, header, metadata, data);
}

static ByteBuf metadataWithoutMarking(ByteBuf byteBuf, boolean hasMetadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.buffer.TupleByteBuf;

/**
* Some transports like TCP aren't framed, and require a length. This is used by DuplexConnections
Expand Down Expand Up @@ -34,7 +35,7 @@ private static int decodeLength(final ByteBuf byteBuf) {
public static ByteBuf encode(ByteBufAllocator allocator, int length, ByteBuf frame) {
ByteBuf buffer = allocator.buffer();
encodeLength(buffer, length);
return allocator.compositeBuffer().addComponents(true, buffer, frame);
return TupleByteBuf.of(allocator, buffer, frame);
}

public static int length(ByteBuf byteBuf) {
Expand Down
Loading