Skip to content

Commit 0d2a725

Browse files
adds docs and descriptions onto resumability related internals
Co-authored-by: Rossen Stoyanchev <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent c8a0d99 commit 0d2a725

File tree

5 files changed

+28
-7
lines changed

5 files changed

+28
-7
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,7 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
592592
new ClientRSocketSession(
593593
resumeToken,
594594
resumableDuplexConnection,
595-
connectionMono, // supplies pure
595+
connectionMono,
596596
resumableClientSetup::init,
597597
resumableFramesStore,
598598
resume.getSessionDuration(),

rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ void reconnect(int index) {
105105
if (this.s == Operators.cancelledSubscription()
106106
&& S.compareAndSet(this, Operators.cancelledSubscription(), null)) {
107107
keepAliveSupport.stop();
108-
logger.debug("Connection[" + index + "] is lost. Reconnecting...");
108+
logger.debug("Connection[" + index + "] is lost. Reconnecting to resume...");
109109
connectionFactory.retryWhen(retry).timeout(resumeSessionDuration).subscribe(this);
110110
}
111111
}

rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,23 @@ public class InMemoryResumableFramesStore extends Flux<ByteBuf>
6161

6262
CoreSubscriber<? super ByteBuf> actual;
6363

64-
// indicates whether there is active connection or not
64+
/**
65+
* Indicates whether there is an active connection or not.
66+
*
67+
* <ul>
68+
* <li>0 - no active connection
69+
* <li>1 - active connection
70+
* <li>2 - disposed
71+
* </ul>
72+
*
73+
* <pre>
74+
* 0 <-----> 1
75+
* | |
76+
* +--> 2 <--+
77+
* </pre>
78+
*/
6579
volatile int state;
80+
6681
static final AtomicIntegerFieldUpdater<InMemoryResumableFramesStore> STATE =
6782
AtomicIntegerFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "state");
6883

@@ -217,8 +232,6 @@ public void onComplete() {
217232

218233
@Override
219234
public void onNext(ByteBuf frame) {
220-
frame.touch("Tag : " + tag + ". InMemoryResumableFramesStore:onNext");
221-
222235
final int state;
223236
final boolean isResumable = isResumableFrame(frame);
224237
if (isResumable) {
@@ -289,7 +302,6 @@ public void subscribe(CoreSubscriber<? super ByteBuf> actual) {
289302
actual.onSubscribe(this);
290303
synchronized (this) {
291304
for (final ByteBuf frame : cachedFrames) {
292-
frame.touch("Tag : " + tag + ". InMemoryResumableFramesStore:subscribe");
293305
actual.onNext(frame.retain());
294306
}
295307
}

rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,11 @@ public void sendFrame(int streamId, ByteBuf frame) {
135135
}
136136
}
137137

138+
/**
139+
* Publisher for a sequence of integers starting at 1, with each next number emitted when the
140+
* currently active connection is closed and should be resumed. The Publisher never emits an error
141+
* and completes when the connection is disposed and not resumed.
142+
*/
138143
Flux<Integer> onActiveConnectionClosed() {
139144
return onConnectionClosedSink.asFlux();
140145
}
@@ -306,7 +311,6 @@ public void onSubscribe(Subscription s) {
306311

307312
@Override
308313
public void onNext(ByteBuf frame) {
309-
frame.touch("Tag : " + tag + ". FrameReceivingSubscriber#onNext");
310314
if (cancelled || s == Operators.cancelledSubscription()) {
311315
return;
312316
}

rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ public class ServerRSocketSession
4848
final ByteBufAllocator allocator;
4949
final boolean cleanupStoreOnKeepAlive;
5050

51+
/**
52+
* All incoming connections with the Resume intent are enqueued in this queue. Such an approach
53+
* ensure that the new connection will affect the resumption state anyhow until the previous
54+
* (active) connection is finally closed
55+
*/
5156
final Queue<Runnable> connectionsQueue;
5257

5358
volatile int wip;

0 commit comments

Comments
 (0)