Skip to content

Commit 80ca353

Browse files
committed
resume: fix race between send / release of frames on received implied position from keepalives
replace state enum with constants remove unnecessary allocation make cleanup of store on keep-alive optional formatter Signed-off-by: Maksym Ostroverkhov <[email protected]>
1 parent 7212b3b commit 80ca353

File tree

8 files changed

+88
-46
lines changed

8 files changed

+88
-46
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ public static class ClientRSocketFactory implements ClientTransportAcceptor {
104104
private String dataMimeType = "application/binary";
105105

106106
private boolean resumeEnabled;
107+
private boolean resumeCleanupStoreOnKeepAlive;
107108
private Supplier<ByteBuf> resumeTokenSupplier = ResumeFrameFlyweight::generateResumeToken;
108109
private Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory =
109110
token -> new InMemoryResumableFramesStore("client", 100_000);
@@ -216,6 +217,11 @@ public ClientRSocketFactory resumeStrategy(Supplier<ResumeStrategy> resumeStrate
216217
return this;
217218
}
218219

220+
public ClientRSocketFactory resumeCleanupOnKeepAlive() {
221+
resumeCleanupStoreOnKeepAlive = true;
222+
return this;
223+
}
224+
219225
@Override
220226
public Start<RSocket> transport(Supplier<ClientTransport> transportClient) {
221227
return new StartClient(transportClient);
@@ -326,7 +332,8 @@ private ClientSetup clientSetup() {
326332
resumeStoreFactory.apply(resumeToken),
327333
resumeSessionDuration,
328334
resumeStreamTimeout,
329-
resumeStrategySupplier);
335+
resumeStrategySupplier,
336+
resumeCleanupStoreOnKeepAlive);
330337
} else {
331338
return new ClientSetup.DefaultClientSetup();
332339
}
@@ -360,6 +367,7 @@ public static class ServerRSocketFactory {
360367
token -> new InMemoryResumableFramesStore("server", 100_000);
361368

362369
private ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
370+
private boolean resumeCleanupStoreOnKeepAlive;
363371

364372
private ServerRSocketFactory() {}
365373

@@ -425,6 +433,11 @@ public ServerRSocketFactory resumeStreamTimeout(Duration resumeStreamTimeout) {
425433
return this;
426434
}
427435

436+
public ServerRSocketFactory resumeCleanupOnKeepAlive() {
437+
resumeCleanupStoreOnKeepAlive = true;
438+
return this;
439+
}
440+
428441
private class ServerStart<T extends Closeable> implements Start<T> {
429442
private final Supplier<ServerTransport<T>> transportServer;
430443

@@ -543,7 +556,8 @@ private ServerSetup serverSetup() {
543556
new SessionManager(),
544557
resumeSessionDuration,
545558
resumeStreamTimeout,
546-
resumeStoreFactory)
559+
resumeStoreFactory,
560+
resumeCleanupStoreOnKeepAlive)
547561
: new ServerSetup.DefaultServerSetup(allocator);
548562
}
549563

rsocket-core/src/main/java/io/rsocket/internal/ClientSetup.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class ResumableClientSetup implements ClientSetup {
5656
private final Supplier<ResumeStrategy> resumeStrategySupplier;
5757
private final ResumableFramesStore resumableFramesStore;
5858
private final Duration resumeStreamTimeout;
59+
private final boolean cleanupStoreOnKeepAlive;
5960

6061
public ResumableClientSetup(
6162
ByteBufAllocator allocator,
@@ -64,14 +65,16 @@ public ResumableClientSetup(
6465
ResumableFramesStore resumableFramesStore,
6566
Duration resumeSessionDuration,
6667
Duration resumeStreamTimeout,
67-
Supplier<ResumeStrategy> resumeStrategySupplier) {
68+
Supplier<ResumeStrategy> resumeStrategySupplier,
69+
boolean cleanupStoreOnKeepAlive) {
6870
this.allocator = allocator;
6971
this.newConnectionFactory = newConnectionFactory;
7072
this.resumeToken = resumeToken;
7173
this.resumeSessionDuration = resumeSessionDuration;
7274
this.resumeStrategySupplier = resumeStrategySupplier;
7375
this.resumableFramesStore = resumableFramesStore;
7476
this.resumeStreamTimeout = resumeStreamTimeout;
77+
this.cleanupStoreOnKeepAlive = cleanupStoreOnKeepAlive;
7578
}
7679

7780
@Override
@@ -83,7 +86,8 @@ public DuplexConnection wrappedConnection(KeepAliveConnection connection) {
8386
resumeSessionDuration,
8487
resumeStrategySupplier,
8588
resumableFramesStore,
86-
resumeStreamTimeout)
89+
resumeStreamTimeout,
90+
cleanupStoreOnKeepAlive)
8791
.continueWith(newConnectionFactory)
8892
.resumeToken(resumeToken);
8993

rsocket-core/src/main/java/io/rsocket/internal/ServerSetup.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,18 +106,21 @@ class ResumableServerSetup implements ServerSetup {
106106
private final Duration resumeSessionDuration;
107107
private final Duration resumeStreamTimeout;
108108
private final Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory;
109+
private final boolean cleanupStoreOnKeepAlive;
109110

110111
public ResumableServerSetup(
111112
ByteBufAllocator allocator,
112113
SessionManager sessionManager,
113114
Duration resumeSessionDuration,
114115
Duration resumeStreamTimeout,
115-
Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory) {
116+
Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory,
117+
boolean cleanupStoreOnKeepAlive) {
116118
this.allocator = allocator;
117119
this.sessionManager = sessionManager;
118120
this.resumeSessionDuration = resumeSessionDuration;
119121
this.resumeStreamTimeout = resumeStreamTimeout;
120122
this.resumeStoreFactory = resumeStoreFactory;
123+
this.cleanupStoreOnKeepAlive = cleanupStoreOnKeepAlive;
121124
}
122125

123126
@Override
@@ -144,7 +147,8 @@ public Mono<Void> acceptRSocketSetup(
144147
resumeStreamTimeout,
145148
resumeStoreFactory,
146149
resumeToken,
147-
keepAliveData))
150+
keepAliveData,
151+
cleanupStoreOnKeepAlive))
148152
.resumableConnection();
149153
return then.apply(new ClientServerInputMultiplexer(resumableConnection));
150154
} else {

rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,16 @@ public ClientRSocketSession(
4646
Duration resumeSessionDuration,
4747
Supplier<ResumeStrategy> resumeStrategy,
4848
ResumableFramesStore resumableFramesStore,
49-
Duration resumeStreamTimeout) {
49+
Duration resumeStreamTimeout,
50+
boolean cleanupStoreOnKeepAlive) {
5051
this.allocator = allocator;
5152
this.resumableConnection =
5253
new ResumableDuplexConnection(
53-
"client", duplexConnection, resumableFramesStore, resumeStreamTimeout);
54+
"client",
55+
duplexConnection,
56+
resumableFramesStore,
57+
resumeStreamTimeout,
58+
cleanupStoreOnKeepAlive);
5459

5560
/*session completed: release token initially retained in resumeToken(ByteBuf)*/
5661
onClose().doFinally(s -> resumeToken.release()).subscribe();

rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder {
4040
private final String tag;
4141
private final ResumableFramesStore resumableFramesStore;
4242
private final Duration resumeStreamTimeout;
43+
private final boolean cleanupOnKeepAlive;
4344

4445
private final ReplayProcessor<DuplexConnection> connections = ReplayProcessor.create(1);
4546
private final EmitterProcessor<Throwable> connectionErrors = EmitterProcessor.create();
@@ -57,23 +58,25 @@ class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder {
5758
private final UnicastProcessor<Flux<ByteBuf>> upstreams = UnicastProcessor.create();
5859
private final UpstreamFramesSubscriber upstreamSubscriber =
5960
new UpstreamFramesSubscriber(
60-
128,
61+
Queues.SMALL_BUFFER_SIZE,
6162
downStreamRequestListener.requests(),
6263
resumeSaveStreamRequestListener.requests(),
6364
this::dispatch);
6465

65-
private volatile State state;
66+
private volatile int state;
6667
private volatile Disposable resumedStreamDisposable = Disposables.disposed();
6768
private final AtomicBoolean disposed = new AtomicBoolean();
6869

6970
ResumableDuplexConnection(
7071
String tag,
7172
ResumePositionsConnection duplexConnection,
7273
ResumableFramesStore resumableFramesStore,
73-
Duration resumeStreamTimeout) {
74+
Duration resumeStreamTimeout,
75+
boolean cleanupOnKeepAlive) {
7476
this.tag = tag;
7577
this.resumableFramesStore = resumableFramesStore;
7678
this.resumeStreamTimeout = resumeStreamTimeout;
79+
this.cleanupOnKeepAlive = cleanupOnKeepAlive;
7780

7881
resumableFramesStore
7982
.saveFrames(resumeSaveStreamRequestListener.apply(resumeSaveFrames))
@@ -131,11 +134,8 @@ public Mono<Void> sendOne(ByteBuf frame) {
131134

132135
@Override
133136
public Mono<Void> send(Publisher<ByteBuf> frames) {
134-
return Mono.defer(
135-
() -> {
136-
upstreams.onNext(Flux.from(frames));
137-
return framesSent;
138-
});
137+
upstreams.onNext(Flux.from(frames));
138+
return framesSent;
139139
}
140140

141141
@Override
@@ -164,7 +164,9 @@ public long impliedPosition() {
164164
@Override
165165
public void onImpliedPosition(long remoteImpliedPos) {
166166
logger.debug("Got remote position from keep-alive: {}", remoteImpliedPos);
167-
releaseFramesToPosition(remoteImpliedPos);
167+
if (cleanupOnKeepAlive) {
168+
dispatch(new ReleaseFrames(remoteImpliedPos));
169+
}
168170
}
169171

170172
@Override
@@ -268,7 +270,7 @@ private void doResume(
268270
Mono.error(
269271
new ResumeStateException(
270272
localPosition, localImpliedPosition,
271-
remotePosition, remoteImpliedPos));
273+
remotePosition, remoteImpliedPosition));
272274
}
273275

274276
sendResumeFrame
@@ -320,7 +322,7 @@ private void onNewConnection(ResumePositionsConnection connection) {
320322

321323
private void disconnect(DuplexConnection connection) {
322324
/*do not report late disconnects on old connection if new one is available*/
323-
if (curConnection == connection && state.isActive()) {
325+
if (curConnection == connection && state != State.DISCONNECTED) {
324326
Throwable err = new ClosedChannelException();
325327
state = State.DISCONNECTED;
326328
logger.debug("{} Inner connection disconnected: {}", tag, err.getClass().getSimpleName());
@@ -330,7 +332,7 @@ private void disconnect(DuplexConnection connection) {
330332

331333
/*remove frames confirmed by implied pos,
332334
set current pos accordingly*/
333-
private void releaseFramesToPosition(Long remoteImpliedPos) {
335+
private void releaseFramesToPosition(long remoteImpliedPos) {
334336
resumableFramesStore.releaseFrames(remoteImpliedPos);
335337
}
336338

@@ -351,22 +353,12 @@ static boolean isResumableFrame(ByteBuf frame) {
351353
}
352354
}
353355

354-
private enum State {
355-
CONNECTED(true),
356-
RESUME_STARTED(true),
357-
RESUME(true),
358-
RESUME_COMPLETED(true),
359-
DISCONNECTED(false);
360-
361-
private final boolean active;
362-
363-
State(boolean active) {
364-
this.active = active;
365-
}
366-
367-
public boolean isActive() {
368-
return active;
369-
}
356+
static class State {
357+
static int CONNECTED = 0;
358+
static int RESUME_STARTED = 1;
359+
static int RESUME = 2;
360+
static int RESUME_COMPLETED = 3;
361+
static int DISCONNECTED = 4;
370362
}
371363

372364
class ResumeStart implements Runnable {
@@ -407,4 +399,17 @@ public void run() {
407399
doResumeComplete();
408400
}
409401
}
402+
403+
private class ReleaseFrames implements Runnable {
404+
private final long remoteImpliedPos;
405+
406+
public ReleaseFrames(long remoteImpliedPos) {
407+
this.remoteImpliedPos = remoteImpliedPos;
408+
}
409+
410+
@Override
411+
public void run() {
412+
releaseFramesToPosition(remoteImpliedPos);
413+
}
414+
}
410415
}

rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,18 @@ public ServerRSocketSession(
5050
Duration resumeStreamTimeout,
5151
Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory,
5252
ByteBuf resumeToken,
53-
KeepAliveData keepAliveData) {
53+
KeepAliveData keepAliveData,
54+
boolean cleanupStoreOnKeepAlive) {
5455
this.allocator = allocator;
5556
this.keepAliveData = keepAliveData;
5657
this.resumeToken = resumeToken;
5758
this.resumableConnection =
5859
new ResumableDuplexConnection(
59-
"server", duplexConnection, resumeStoreFactory.apply(resumeToken), resumeStreamTimeout);
60+
"server",
61+
duplexConnection,
62+
resumeStoreFactory.apply(resumeToken),
63+
resumeStreamTimeout,
64+
cleanupStoreOnKeepAlive);
6065

6166
Mono<ResumePositionsConnection> timeout =
6267
resumableConnection

rsocket-core/src/main/java/io/rsocket/resume/SessionManager.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import javax.annotation.Nullable;
2323

2424
public class SessionManager {
25-
private boolean isDisposed;
25+
private volatile boolean isDisposed;
2626
private final Map<ByteBuf, ServerRSocketSession> sessions = new ConcurrentHashMap<>();
2727

2828
public ServerRSocketSession save(ServerRSocketSession session) {
@@ -34,14 +34,17 @@ public ServerRSocketSession save(ServerRSocketSession session) {
3434
.onClose()
3535
.doOnSuccess(
3636
v -> {
37-
sessions.remove(token);
37+
if (isDisposed || sessions.get(token) == session) {
38+
sessions.remove(token);
39+
}
3840
token.release();
3941
})
4042
.subscribe();
41-
ServerRSocketSession prev = sessions.put(token, session);
42-
if (prev != null) {
43-
prev.dispose();
43+
ServerRSocketSession prevSession = sessions.remove(token);
44+
if (prevSession != null) {
45+
prevSession.dispose();
4446
}
47+
sessions.put(token, session);
4548
}
4649
return session;
4750
}

rsocket-examples/src/test/java/io/rsocket/resume/ResumeIntegrationTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -211,8 +211,9 @@ private static Mono<RSocket> newClientRSocket(
211211
return RSocketFactory.connect()
212212
.resume()
213213
.resumeSessionDuration(Duration.ofSeconds(sessionDurationSeconds))
214-
.resumeStore(t -> new InMemoryResumableFramesStore("client", 10_000_000))
215-
.keepAliveTickPeriod(Duration.ofSeconds(30))
214+
.resumeStore(t -> new InMemoryResumableFramesStore("client", 500_000))
215+
.resumeCleanupOnKeepAlive()
216+
.keepAliveTickPeriod(Duration.ofSeconds(5))
216217
.keepAliveAckTimeout(Duration.ofMinutes(5))
217218
.errorConsumer(errConsumer)
218219
.resumeStrategy(() -> new PeriodicResumeStrategy(Duration.ofSeconds(1)))
@@ -227,8 +228,9 @@ private static Mono<CloseableChannel> newServerRSocket() {
227228
private static Mono<CloseableChannel> newServerRSocket(int sessionDurationSeconds) {
228229
return RSocketFactory.receive()
229230
.resume()
230-
.resumeStore(t -> new InMemoryResumableFramesStore("server", 10_000_000))
231+
.resumeStore(t -> new InMemoryResumableFramesStore("server", 500_000))
231232
.resumeSessionDuration(Duration.ofSeconds(sessionDurationSeconds))
233+
.resumeCleanupOnKeepAlive()
232234
.acceptor((setupPayload, rSocket) -> Mono.just(new TestResponderRSocket()))
233235
.transport(serverTransport(SERVER_HOST, SERVER_PORT))
234236
.start();

0 commit comments

Comments
 (0)