Skip to content

Commit c744240

Browse files
committed
improves Resumability implementation
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 9a4d5ab commit c744240

File tree

12 files changed

+287
-92
lines changed

12 files changed

+287
-92
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -583,15 +583,16 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
583583
resume.getStoreFactory(CLIENT_TAG).apply(resumeToken);
584584
final ResumableDuplexConnection resumableDuplexConnection =
585585
new ResumableDuplexConnection(
586-
clientServerConnection, resumableFramesStore);
586+
CLIENT_TAG,
587+
clientServerConnection,
588+
resumableFramesStore);
587589
final ResumableClientSetup resumableClientSetup =
588590
new ResumableClientSetup();
589591
final ClientRSocketSession session =
590592
new ClientRSocketSession(
591593
resumeToken,
592-
clientServerConnection,
593594
resumableDuplexConnection,
594-
connectionMono,
595+
connectionMono, // supplies pure
595596
resumableClientSetup::init,
596597
resumableFramesStore,
597598
resume.getSessionDuration(),

rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public Mono<Void> acceptRSocketSetup(
109109

110110
final ResumableFramesStore resumableFramesStore = resumeStoreFactory.apply(resumeToken);
111111
final ResumableDuplexConnection resumableDuplexConnection =
112-
new ResumableDuplexConnection(duplexConnection, resumableFramesStore);
112+
new ResumableDuplexConnection("server", duplexConnection, resumableFramesStore);
113113
final ServerRSocketSession serverRSocketSession =
114114
new ServerRSocketSession(
115115
resumeToken,
@@ -134,7 +134,8 @@ public Mono<Void> acceptRSocketSetup(
134134
public Mono<Void> acceptRSocketResume(ByteBuf frame, DuplexConnection duplexConnection) {
135135
ServerRSocketSession session = sessionManager.get(ResumeFrameCodec.token(frame));
136136
if (session != null) {
137-
return session.resumeWith(frame, duplexConnection);
137+
session.resumeWith(frame, duplexConnection);
138+
return duplexConnection.onClose();
138139
} else {
139140
sendError(duplexConnection, new RejectedResumeException("unknown resume token"));
140141
return duplexConnection.onClose();

rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ public class ClientRSocketSession
6363

6464
public ClientRSocketSession(
6565
ByteBuf resumeToken,
66-
DuplexConnection initialDuplexConnection,
6766
ResumableDuplexConnection resumableDuplexConnection,
6867
Mono<DuplexConnection> connectionFactory,
6968
Function<DuplexConnection, Mono<Tuple2<ByteBuf, DuplexConnection>>> connectionTransformer,
@@ -80,7 +79,9 @@ public ClientRSocketSession(
8079
ResumeFrameCodec.encode(
8180
dc.alloc(),
8281
resumeToken.retain(),
82+
// server uses this to release its cache
8383
resumableFramesStore.frameImpliedPosition(), // observed on the client side
84+
// server uses this to check whether there is no mismatch
8485
resumableFramesStore.framePosition() // sent from the client sent
8586
));
8687
logger.debug("Resume Frame has been sent");
@@ -95,25 +96,20 @@ public ClientRSocketSession(
9596
this.resumableConnection = resumableDuplexConnection;
9697

9798
resumableDuplexConnection.onClose().doFinally(__ -> dispose()).subscribe();
98-
99-
observeDisconnection(initialDuplexConnection);
99+
resumableDuplexConnection.onActiveConnectionClosed().subscribe(this::reconnect);
100100

101101
S.lazySet(this, Operators.cancelledSubscription());
102102
}
103103

104-
void reconnect() {
104+
void reconnect(int index) {
105105
if (this.s == Operators.cancelledSubscription()
106106
&& S.compareAndSet(this, Operators.cancelledSubscription(), null)) {
107107
keepAliveSupport.stop();
108+
logger.debug("Connection[" + index + "] is lost. Reconnecting...");
108109
connectionFactory.retryWhen(retry).timeout(resumeSessionDuration).subscribe(this);
109-
logger.debug("Connection is lost. Reconnecting...");
110110
}
111111
}
112112

113-
void observeDisconnection(DuplexConnection activeConnection) {
114-
activeConnection.onClose().subscribe(null, e -> reconnect(), () -> reconnect());
115-
}
116-
117113
@Override
118114
public long impliedPosition() {
119115
return resumableFramesStore.frameImpliedPosition();
@@ -132,9 +128,11 @@ public void onImpliedPosition(long remoteImpliedPos) {
132128

133129
@Override
134130
public void dispose() {
135-
if (Operators.terminate(S, this)) {
136-
resumableFramesStore.dispose();
137-
resumableConnection.dispose();
131+
Operators.terminate(S, this);
132+
resumableConnection.dispose();
133+
resumableFramesStore.dispose();
134+
135+
if (resumeToken.refCnt() > 0) {
138136
resumeToken.release();
139137
}
140138
}
@@ -152,8 +150,8 @@ public void onSubscribe(Subscription s) {
152150
}
153151

154152
@Override
155-
public synchronized void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
156-
ByteBuf frame = tuple2.getT1();
153+
public void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
154+
ByteBuf shouldBeResumeOKFrame = tuple2.getT1();
157155
DuplexConnection nextDuplexConnection = tuple2.getT2();
158156

159157
if (!Operators.terminate(S, this)) {
@@ -164,9 +162,7 @@ public synchronized void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
164162
return;
165163
}
166164

167-
final FrameType frameType = FrameHeaderCodec.nativeFrameType(frame);
168-
final int streamId = FrameHeaderCodec.streamId(frame);
169-
165+
final int streamId = FrameHeaderCodec.streamId(shouldBeResumeOKFrame);
170166
if (streamId != 0) {
171167
logger.debug(
172168
"Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection");
@@ -177,8 +173,13 @@ public synchronized void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
177173
return;
178174
}
179175

176+
final FrameType frameType = FrameHeaderCodec.nativeFrameType(shouldBeResumeOKFrame);
180177
if (frameType == FrameType.RESUME_OK) {
181-
long remoteImpliedPos = ResumeOkFrameCodec.lastReceivedClientPos(frame);
178+
// how many frames the server has received from the client
179+
// so the client can release cached frames by this point
180+
long remoteImpliedPos = ResumeOkFrameCodec.lastReceivedClientPos(shouldBeResumeOKFrame);
181+
// what was the last notification from the server about number of frames being
182+
// observed
182183
final long position = resumableFramesStore.framePosition();
183184
final long impliedPosition = resumableFramesStore.frameImpliedPosition();
184185
logger.debug(
@@ -200,7 +201,6 @@ public synchronized void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
200201
}
201202

202203
if (resumableConnection.connect(nextDuplexConnection)) {
203-
observeDisconnection(nextDuplexConnection);
204204
keepAliveSupport.start();
205205
logger.debug("Session has been resumed successfully");
206206
} else {
@@ -220,7 +220,7 @@ public synchronized void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
220220
nextDuplexConnection.sendErrorAndClose(connectionErrorException);
221221
}
222222
} else if (frameType == FrameType.ERROR) {
223-
final RuntimeException exception = Exceptions.from(0, frame);
223+
final RuntimeException exception = Exceptions.from(0, shouldBeResumeOKFrame);
224224
logger.debug("Received error frame. Terminating received connection", exception);
225225
resumableConnection.dispose();
226226
} else {

rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public class InMemoryResumableFramesStore extends Flux<ByteBuf>
6161

6262
CoreSubscriber<? super ByteBuf> actual;
6363

64+
// indicates whether there is active connection or not
6465
volatile int state;
6566
static final AtomicIntegerFieldUpdater<InMemoryResumableFramesStore> STATE =
6667
AtomicIntegerFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "state");
@@ -94,9 +95,6 @@ public void releaseFrames(long remoteImpliedPos) {
9495
while (toRemoveBytes > removedBytes && frames.size() > 0) {
9596
ByteBuf cachedFrame = frames.remove(0);
9697
int frameSize = cachedFrame.readableBytes();
97-
// logger.debug(
98-
// "{} Removing frame {}", tag,
99-
// cachedFrame.toString(CharsetUtil.UTF_8));
10098
cachedFrame.release();
10199
removedBytes += frameSize;
102100
}
@@ -110,7 +108,7 @@ public void releaseFrames(long remoteImpliedPos) {
110108
toRemoveBytes));
111109
} else if (toRemoveBytes < removedBytes) {
112110
throw new IllegalStateException(
113-
"Local and remote state disagreement: " + "local and remote frame sizes are not equal");
111+
"Local and remote state disagreement: local and remote frame sizes are not equal");
114112
} else {
115113
POSITION.addAndGet(this, removedBytes);
116114
if (cacheLimit != Integer.MAX_VALUE) {
@@ -184,6 +182,7 @@ public void dispose() {
184182
if (STATE.getAndSet(this, 2) != 2) {
185183
cacheSize = 0;
186184
synchronized (this) {
185+
logger.debug("Tag {}.Disposing InMemoryFrameStore", tag);
187186
for (ByteBuf frame : cachedFrames) {
188187
if (frame != null) {
189188
frame.release();
@@ -197,7 +196,7 @@ public void dispose() {
197196

198197
@Override
199198
public boolean isDisposed() {
200-
return disposed.isTerminated();
199+
return state == 2;
201200
}
202201

203202
@Override
@@ -218,6 +217,9 @@ public void onComplete() {
218217

219218
@Override
220219
public void onNext(ByteBuf frame) {
220+
frame.touch("Tag : " + tag + ". InMemoryResumableFramesStore:onNext");
221+
222+
final int state;
221223
final boolean isResumable = isResumableFrame(frame);
222224
if (isResumable) {
223225
final ArrayList<ByteBuf> frames = cachedFrames;
@@ -244,20 +246,23 @@ public void onNext(ByteBuf frame) {
244246
POSITION.addAndGet(this, removedBytes);
245247
}
246248
}
247-
248249
synchronized (this) {
249-
frames.add(frame);
250+
state = this.state;
251+
if (state != 2) {
252+
frames.add(frame);
253+
}
250254
}
251255
if (cacheLimit != Integer.MAX_VALUE) {
252256
CACHE_SIZE.addAndGet(this, incomingFrameSize);
253257
}
258+
} else {
259+
state = this.state;
254260
}
255261

256-
final int state = this.state;
257262
final CoreSubscriber<? super ByteBuf> actual = this.actual;
258263
if (state == 1) {
259264
actual.onNext(frame.retain());
260-
} else if (!isResumable) {
265+
} else if (!isResumable || state == 2) {
261266
frame.release();
262267
}
263268
}
@@ -274,18 +279,25 @@ public void cancel() {
274279
@Override
275280
public void subscribe(CoreSubscriber<? super ByteBuf> actual) {
276281
final int state = this.state;
277-
logger.debug("Tag: {}. Subscribed State[{}]", tag, state);
278-
actual.onSubscribe(this);
279282
if (state != 2) {
283+
resumeImplied();
284+
logger.debug(
285+
"Tag: {}. Subscribed at Position[{}] and ImpliedPosition[{}]",
286+
tag,
287+
position,
288+
impliedPosition);
289+
actual.onSubscribe(this);
280290
synchronized (this) {
281291
for (final ByteBuf frame : cachedFrames) {
292+
frame.touch("Tag : " + tag + ". InMemoryResumableFramesStore:subscribe");
282293
actual.onNext(frame.retain());
283294
}
284295
}
285296

286297
this.actual = actual;
287-
resumeImplied();
288298
STATE.compareAndSet(this, 0, 1);
299+
} else {
300+
Operators.complete(actual);
289301
}
290302
}
291303
}

rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,22 +26,29 @@
2626
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2727
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
2828
import org.reactivestreams.Subscription;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
2931
import reactor.core.CoreSubscriber;
3032
import reactor.core.Disposable;
3133
import reactor.core.publisher.Flux;
3234
import reactor.core.publisher.Mono;
3335
import reactor.core.publisher.MonoProcessor;
3436
import reactor.core.publisher.Operators;
37+
import reactor.core.publisher.Sinks;
3538

3639
public class ResumableDuplexConnection extends Flux<ByteBuf>
3740
implements DuplexConnection, Subscription {
3841

42+
static final Logger logger = LoggerFactory.getLogger(ResumableDuplexConnection.class);
43+
44+
final String tag;
3945
final ResumableFramesStore resumableFramesStore;
4046

4147
final UnboundedProcessor<ByteBuf> savableFramesSender;
4248
final Disposable framesSaverDisposable;
4349
final MonoProcessor<Void> onClose;
4450
final SocketAddress remoteAddress;
51+
final Sinks.Many<Integer> onConnectionClosedSink;
4552

4653
CoreSubscriber<? super ByteBuf> receiveSubscriber;
4754
FrameReceivingSubscriber activeReceivingSubscriber;
@@ -56,8 +63,12 @@ public class ResumableDuplexConnection extends Flux<ByteBuf>
5663
AtomicReferenceFieldUpdater.newUpdater(
5764
ResumableDuplexConnection.class, DuplexConnection.class, "activeConnection");
5865

66+
int connectionIndex = 0;
67+
5968
public ResumableDuplexConnection(
60-
DuplexConnection initialConnection, ResumableFramesStore resumableFramesStore) {
69+
String tag, DuplexConnection initialConnection, ResumableFramesStore resumableFramesStore) {
70+
this.tag = tag;
71+
this.onConnectionClosedSink = Sinks.many().unsafe().unicast().onBackpressureBuffer();
6172
this.resumableFramesStore = resumableFramesStore;
6273
this.savableFramesSender = new UnboundedProcessor<>();
6374
this.framesSaverDisposable = resumableFramesStore.saveFrames(savableFramesSender).subscribe();
@@ -83,9 +94,15 @@ public boolean connect(DuplexConnection nextConnection) {
8394
}
8495

8596
void initConnection(DuplexConnection nextConnection) {
97+
logger.debug("Tag {}. Initializing connection {}", tag, nextConnection);
98+
99+
final int currentConnectionIndex = connectionIndex;
86100
final FrameReceivingSubscriber frameReceivingSubscriber =
87-
new FrameReceivingSubscriber(resumableFramesStore, receiveSubscriber);
101+
new FrameReceivingSubscriber(tag, resumableFramesStore, receiveSubscriber);
102+
103+
this.connectionIndex = currentConnectionIndex + 1;
88104
this.activeReceivingSubscriber = frameReceivingSubscriber;
105+
89106
final Disposable disposable =
90107
resumableFramesStore
91108
.resumeStream()
@@ -97,6 +114,7 @@ void initConnection(DuplexConnection nextConnection) {
97114
__ -> {
98115
frameReceivingSubscriber.dispose();
99116
disposable.dispose();
117+
onConnectionClosedSink.emitNext(currentConnectionIndex);
100118
})
101119
.subscribe();
102120
}
@@ -117,6 +135,10 @@ public void sendFrame(int streamId, ByteBuf frame) {
117135
}
118136
}
119137

138+
Flux<Integer> onActiveConnectionClosed() {
139+
return onConnectionClosedSink.asFlux();
140+
}
141+
120142
@Override
121143
public void sendErrorAndClose(RSocketErrorException rSocketErrorException) {
122144
final DuplexConnection activeConnection =
@@ -133,11 +155,13 @@ public void sendErrorAndClose(RSocketErrorException rSocketErrorException) {
133155
t -> {
134156
framesSaverDisposable.dispose();
135157
savableFramesSender.dispose();
158+
onConnectionClosedSink.emitComplete();
136159
onClose.onError(t);
137160
},
138161
() -> {
139162
framesSaverDisposable.dispose();
140163
savableFramesSender.dispose();
164+
onConnectionClosedSink.emitComplete();
141165
final Throwable cause = rSocketErrorException.getCause();
142166
if (cause == null) {
143167
onClose.onComplete();
@@ -177,6 +201,7 @@ public void dispose() {
177201
framesSaverDisposable.dispose();
178202
activeReceivingSubscriber.dispose();
179203
savableFramesSender.dispose();
204+
onConnectionClosedSink.emitComplete();
180205
onClose.onComplete();
181206
}
182207

@@ -256,6 +281,7 @@ private static final class FrameReceivingSubscriber
256281

257282
final ResumableFramesStore resumableFramesStore;
258283
final CoreSubscriber<? super ByteBuf> actual;
284+
final String tag;
259285

260286
volatile Subscription s;
261287
static final AtomicReferenceFieldUpdater<FrameReceivingSubscriber, Subscription> S =
@@ -265,7 +291,8 @@ private static final class FrameReceivingSubscriber
265291
boolean cancelled;
266292

267293
private FrameReceivingSubscriber(
268-
ResumableFramesStore store, CoreSubscriber<? super ByteBuf> actual) {
294+
String tag, ResumableFramesStore store, CoreSubscriber<? super ByteBuf> actual) {
295+
this.tag = tag;
269296
this.resumableFramesStore = store;
270297
this.actual = actual;
271298
}
@@ -279,6 +306,7 @@ public void onSubscribe(Subscription s) {
279306

280307
@Override
281308
public void onNext(ByteBuf frame) {
309+
frame.touch("Tag : " + tag + ". FrameReceivingSubscriber#onNext");
282310
if (cancelled || s == Operators.cancelledSubscription()) {
283311
return;
284312
}

0 commit comments

Comments
 (0)