Skip to content

Switch resume position tracking to byte count #611

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Apr 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions rsocket-core/src/main/java/io/rsocket/RSocketFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.rsocket.exceptions.InvalidSetupException;
import io.rsocket.exceptions.RejectedSetupException;
import io.rsocket.frame.FrameHeaderFlyweight;
import io.rsocket.frame.ResumeFrameFlyweight;
import io.rsocket.frame.SetupFrameFlyweight;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.internal.ClientServerInputMultiplexer;
Expand Down Expand Up @@ -106,9 +107,10 @@ public static class ClientRSocketFactory implements ClientTransportAcceptor {
private String dataMimeType = "application/binary";

private boolean resumeEnabled;
private Supplier<ResumeToken> resumeTokenSupplier = ResumeToken::generate;
private Function<? super ResumeToken, ? extends ResumableFramesStore> resumeStoreFactory =
token -> new InMemoryResumableFramesStore("client", 1024);
private boolean resumeCleanupStoreOnKeepAlive;
private Supplier<ByteBuf> resumeTokenSupplier = ResumeFrameFlyweight::generateResumeToken;
private Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory =
token -> new InMemoryResumableFramesStore("client", 100_000);
private Duration resumeSessionDuration = Duration.ofMinutes(2);
private Duration resumeStreamTimeout = Duration.ofSeconds(10);
private Supplier<ResumeStrategy> resumeStrategySupplier =
Expand Down Expand Up @@ -192,13 +194,13 @@ public ClientRSocketFactory resume() {
return this;
}

public ClientRSocketFactory resumeToken(Supplier<ResumeToken> resumeTokenSupplier) {
public ClientRSocketFactory resumeToken(Supplier<ByteBuf> resumeTokenSupplier) {
this.resumeTokenSupplier = Objects.requireNonNull(resumeTokenSupplier);
return this;
}

public ClientRSocketFactory resumeStore(
Function<? super ResumeToken, ? extends ResumableFramesStore> resumeStoreFactory) {
Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory) {
this.resumeStoreFactory = resumeStoreFactory;
return this;
}
Expand All @@ -218,6 +220,11 @@ public ClientRSocketFactory resumeStrategy(Supplier<ResumeStrategy> resumeStrate
return this;
}

public ClientRSocketFactory resumeCleanupOnKeepAlive() {
resumeCleanupStoreOnKeepAlive = true;
return this;
}

@Override
public Start<RSocket> transport(Supplier<ClientTransport> transportClient) {
return new StartClient(transportClient);
Expand Down Expand Up @@ -267,6 +274,7 @@ public Mono<RSocket> start() {
connection -> {
ClientSetup clientSetup = clientSetup();
DuplexConnection wrappedConnection = clientSetup.wrappedConnection(connection);
ByteBuf resumeToken = clientSetup.resumeToken();

ClientServerInputMultiplexer multiplexer =
new ClientServerInputMultiplexer(wrappedConnection, plugins);
Expand Down Expand Up @@ -299,7 +307,7 @@ public Mono<RSocket> start() {
false,
(int) keepAliveTickPeriod(),
(int) keepAliveTimeout(),
clientSetup.resumeToken().toByteBuf(),
resumeToken,
metadataMimeType,
dataMimeType,
setupPayload.sliceMetadata(),
Expand All @@ -319,15 +327,16 @@ private long keepAliveTimeout() {

private ClientSetup clientSetup() {
if (resumeEnabled) {
ResumeToken resumeToken = resumeTokenSupplier.get();
ByteBuf resumeToken = resumeTokenSupplier.get();
return new ClientSetup.ResumableClientSetup(
allocator,
newConnection(),
resumeToken,
resumeStoreFactory.apply(resumeToken),
resumeSessionDuration,
resumeStreamTimeout,
resumeStrategySupplier);
resumeStrategySupplier,
resumeCleanupStoreOnKeepAlive);
} else {
return new ClientSetup.DefaultClientSetup();
}
Expand All @@ -342,8 +351,7 @@ private Mono<KeepAliveConnection> newConnection() {
KeepAliveConnection.ofClient(
allocator,
connection,
notUsed ->
Mono.just(new KeepAliveData(keepAliveTickPeriod(), keepAliveTimeout())),
notUsed -> new KeepAliveData(keepAliveTickPeriod(), keepAliveTimeout()),
errorConsumer));
}
}
Expand All @@ -358,10 +366,11 @@ public static class ServerRSocketFactory {
private boolean resumeSupported;
private Duration resumeSessionDuration = Duration.ofSeconds(120);
private Duration resumeStreamTimeout = Duration.ofSeconds(10);
private Function<? super ResumeToken, ? extends ResumableFramesStore> resumeStoreFactory =
token -> new InMemoryResumableFramesStore("server", 1024);
private Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory =
token -> new InMemoryResumableFramesStore("server", 100_000);

private ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
private boolean resumeCleanupStoreOnKeepAlive;

private ServerRSocketFactory() {}

Expand Down Expand Up @@ -412,7 +421,7 @@ public ServerRSocketFactory resume() {
}

public ServerRSocketFactory resumeStore(
Function<? super ResumeToken, ? extends ResumableFramesStore> resumeStoreFactory) {
Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory) {
this.resumeStoreFactory = resumeStoreFactory;
return this;
}
Expand All @@ -427,6 +436,11 @@ public ServerRSocketFactory resumeStreamTimeout(Duration resumeStreamTimeout) {
return this;
}

public ServerRSocketFactory resumeCleanupOnKeepAlive() {
resumeCleanupStoreOnKeepAlive = true;
return this;
}

private class ServerStart<T extends Closeable> implements Start<T>, ServerTransportAcceptor {
private Supplier<ServerTransport<T>> transportServer;

Expand All @@ -452,7 +466,7 @@ public <T extends Closeable> Start<T> transport(Supplier<ServerTransport<T>> tra
private Mono<Void> acceptor(ServerSetup serverSetup, DuplexConnection connection) {
connection =
KeepAliveConnection.ofServer(
allocator, connection, serverSetup.keepAliveData(), errorConsumer);
allocator, connection, serverSetup::keepAliveData, errorConsumer);
ClientServerInputMultiplexer multiplexer =
new ClientServerInputMultiplexer(connection, plugins);

Expand Down Expand Up @@ -556,7 +570,8 @@ private ServerSetup serverSetup() {
new SessionManager(),
resumeSessionDuration,
resumeStreamTimeout,
resumeStoreFactory)
resumeStoreFactory,
resumeCleanupStoreOnKeepAlive)
: new ServerSetup.DefaultServerSetup(allocator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,19 @@ public static boolean hasMetadata(ByteBuf byteBuf) {
return (flags(byteBuf) & FLAGS_M) == FLAGS_M;
}

/**
* faster version of {@link #frameType(ByteBuf)} which does not replace PAYLOAD with synthetic
* type
*/
public static FrameType nativeFrameType(ByteBuf byteBuf) {
byteBuf.markReaderIndex();
byteBuf.skipBytes(Integer.BYTES);
int typeAndFlags = byteBuf.readShort() & 0xFFFF;
FrameType result = FrameType.fromEncodedType(typeAndFlags >> FRAME_TYPE_SHIFT);
byteBuf.resetReaderIndex();
return result;
}

public static FrameType frameType(ByteBuf byteBuf) {
byteBuf.markReaderIndex();
byteBuf.skipBytes(Integer.BYTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import java.util.UUID;

public class ResumeFrameFlyweight {
static final int CURRENT_VERSION = SetupFrameFlyweight.CURRENT_VERSION;

public static ByteBuf encode(
final ByteBufAllocator allocator,
byte[] token,
ByteBufAllocator allocator,
ByteBuf token,
long lastReceivedServerPos,
long firstAvailableClientPos) {

ByteBuf byteBuf = FrameHeaderFlyweight.encodeStreamZero(allocator, FrameType.RESUME, 0);
byteBuf.writeInt(CURRENT_VERSION);
byteBuf.writeShort(token.length);
token.markReaderIndex();
byteBuf.writeShort(token.readableBytes());
byteBuf.writeBytes(token);
token.resetReaderIndex();
byteBuf.writeLong(lastReceivedServerPos);
byteBuf.writeLong(firstAvailableClientPos);

Expand All @@ -49,7 +53,7 @@ public static int version(ByteBuf byteBuf) {
return version;
}

public static byte[] token(ByteBuf byteBuf) {
public static ByteBuf token(ByteBuf byteBuf) {
FrameHeaderFlyweight.ensureFrameType(FrameType.RESUME, byteBuf);

byteBuf.markReaderIndex();
Expand All @@ -58,8 +62,7 @@ public static byte[] token(ByteBuf byteBuf) {
byteBuf.skipBytes(tokenPos);
// token
int tokenLength = byteBuf.readShort() & 0xFFFF;
byte[] token = new byte[tokenLength];
byteBuf.readBytes(token);
ByteBuf token = byteBuf.readSlice(tokenLength);
byteBuf.resetReaderIndex();

return token;
Expand Down Expand Up @@ -98,4 +101,12 @@ public static long firstAvailableClientPos(ByteBuf byteBuf) {

return firstAvailableClientPos;
}

public static ByteBuf generateResumeToken() {
UUID uuid = UUID.randomUUID();
ByteBuf bb = Unpooled.buffer(16);
bb.writeLong(uuid.getMostSignificantBits());
bb.writeLong(uuid.getLeastSignificantBits());
return bb;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ public static ByteBuf encode(
header.writeInt(CURRENT_VERSION).writeInt(keepaliveInterval).writeInt(maxLifetime);

if ((flags & FLAGS_RESUME_ENABLE) != 0) {
resumeToken.markReaderIndex();
header.writeShort(resumeToken.readableBytes()).writeBytes(resumeToken);
resumeToken.resetReaderIndex();
}

// Write metadata mime-type
Expand Down Expand Up @@ -141,7 +143,7 @@ public static boolean resumeEnabled(ByteBuf byteBuf) {
return (FLAGS_RESUME_ENABLE & FrameHeaderFlyweight.flags(byteBuf)) == FLAGS_RESUME_ENABLE;
}

public static byte[] resumeToken(ByteBuf byteBuf) {
public static ByteBuf resumeToken(ByteBuf byteBuf) {
if (resumeEnabled(byteBuf)) {
byteBuf.markReaderIndex();
// header
Expand All @@ -158,8 +160,7 @@ public static byte[] resumeToken(ByteBuf byteBuf) {
Integer.BYTES;

int tokenLength = byteBuf.skipBytes(resumePos).readShort() & 0xFFFF;
byte[] resumeToken = new byte[tokenLength];
byteBuf.readBytes(resumeToken);
ByteBuf resumeToken = byteBuf.readSlice(tokenLength);
byteBuf.resetReaderIndex();
return resumeToken;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
package io.rsocket.internal;

import io.rsocket.DuplexConnection;
import io.rsocket.resume.ResumeAwareConnection;
import io.rsocket.resume.ResumePositionsConnection;
import io.rsocket.resume.ResumeStateHolder;
import io.rsocket.util.DuplexConnectionProxy;
import reactor.core.publisher.Flux;

class ClientServerConnection extends DuplexConnectionProxy implements ResumeAwareConnection {
class ClientServerConnection extends DuplexConnectionProxy implements ResumePositionsConnection {

private final DuplexConnection resumeAware;

Expand All @@ -32,9 +31,9 @@ public ClientServerConnection(DuplexConnection delegate, DuplexConnection resume
}

@Override
public Flux<Long> receiveResumePositions(ResumeStateHolder resumeStateHolder) {
return resumeAware instanceof ResumeAwareConnection
? ((ResumeAwareConnection) resumeAware).receiveResumePositions(resumeStateHolder)
: Flux.never();
public void acceptResumeState(ResumeStateHolder resumeStateHolder) {
if (resumeAware instanceof ResumePositionsConnection) {
((ResumePositionsConnection) resumeAware).acceptResumeState(resumeStateHolder);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import io.rsocket.frame.FrameUtil;
import io.rsocket.plugins.DuplexConnectionInterceptor.Type;
import io.rsocket.plugins.PluginRegistry;
import io.rsocket.resume.ResumeAwareConnection;
import io.rsocket.resume.ResumePositionsConnection;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -52,7 +52,7 @@ public class ClientServerInputMultiplexer implements Closeable {
private final DuplexConnection serverConnection;
private final DuplexConnection clientConnection;
private final DuplexConnection source;
private final ResumeAwareConnection clientServerConnection;
private final ResumePositionsConnection clientServerConnection;

public ClientServerInputMultiplexer(DuplexConnection source) {
this(source, emptyPluginRegistry);
Expand Down Expand Up @@ -115,7 +115,7 @@ public ClientServerInputMultiplexer(DuplexConnection source, PluginRegistry plug
});
}

public ResumeAwareConnection asClientServerConnection() {
public ResumePositionsConnection asClientServerConnection() {
return clientServerConnection;
}

Expand Down
Loading