Skip to content

Commit e19117c

Browse files
committed
improves Resumability implementation
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 9a4d5ab commit e19117c

File tree

14 files changed

+303
-94
lines changed

14 files changed

+303
-94
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -583,15 +583,16 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
583583
resume.getStoreFactory(CLIENT_TAG).apply(resumeToken);
584584
final ResumableDuplexConnection resumableDuplexConnection =
585585
new ResumableDuplexConnection(
586-
clientServerConnection, resumableFramesStore);
586+
CLIENT_TAG,
587+
clientServerConnection,
588+
resumableFramesStore);
587589
final ResumableClientSetup resumableClientSetup =
588590
new ResumableClientSetup();
589591
final ClientRSocketSession session =
590592
new ClientRSocketSession(
591593
resumeToken,
592-
clientServerConnection,
593594
resumableDuplexConnection,
594-
connectionMono,
595+
connectionMono, // supplies pure
595596
resumableClientSetup::init,
596597
resumableFramesStore,
597598
resume.getSessionDuration(),

rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public Mono<Void> acceptRSocketSetup(
109109

110110
final ResumableFramesStore resumableFramesStore = resumeStoreFactory.apply(resumeToken);
111111
final ResumableDuplexConnection resumableDuplexConnection =
112-
new ResumableDuplexConnection(duplexConnection, resumableFramesStore);
112+
new ResumableDuplexConnection("server", duplexConnection, resumableFramesStore);
113113
final ServerRSocketSession serverRSocketSession =
114114
new ServerRSocketSession(
115115
resumeToken,
@@ -134,7 +134,8 @@ public Mono<Void> acceptRSocketSetup(
134134
public Mono<Void> acceptRSocketResume(ByteBuf frame, DuplexConnection duplexConnection) {
135135
ServerRSocketSession session = sessionManager.get(ResumeFrameCodec.token(frame));
136136
if (session != null) {
137-
return session.resumeWith(frame, duplexConnection);
137+
session.resumeWith(frame, duplexConnection);
138+
return duplexConnection.onClose();
138139
} else {
139140
sendError(duplexConnection, new RejectedResumeException("unknown resume token"));
140141
return duplexConnection.onClose();

rsocket-core/src/main/java/io/rsocket/internal/BaseDuplexConnection.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
public abstract class BaseDuplexConnection implements DuplexConnection {
99
protected MonoProcessor<Void> onClose = MonoProcessor.create();
1010

11-
protected UnboundedProcessor<ByteBuf> sender = new UnboundedProcessor<>();
11+
protected UnboundedProcessor<ByteBuf> sender =
12+
new UnboundedProcessor<>(this.getClass().toGenericString());
1213

1314
public BaseDuplexConnection() {
1415
onClose.doFinally(s -> doOnClose()).subscribe();

rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.rsocket.internal;
1818

19+
import io.netty.util.ReferenceCountUtil;
1920
import io.netty.util.ReferenceCounted;
2021
import io.rsocket.internal.jctools.queues.MpscUnboundedArrayQueue;
2122
import java.util.Objects;
@@ -45,6 +46,7 @@ public final class UnboundedProcessor<T> extends FluxProcessor<T, T>
4546

4647
final Queue<T> queue;
4748
final Queue<T> priorityQueue;
49+
private final String tag;
4850

4951
volatile boolean done;
5052
Throwable error;
@@ -82,6 +84,11 @@ public final class UnboundedProcessor<T> extends FluxProcessor<T, T>
8284
boolean outputFused;
8385

8486
public UnboundedProcessor() {
87+
this("");
88+
}
89+
90+
public UnboundedProcessor(String tag) {
91+
this.tag = tag;
8592
this.queue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE);
8693
this.priorityQueue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE);
8794
}
@@ -131,6 +138,8 @@ void drainRegular(Subscriber<? super T> a) {
131138
break;
132139
}
133140

141+
ReferenceCountUtil.touch(t, tag + ": drainRegular");
142+
134143
a.onNext(t);
135144

136145
e++;
@@ -263,6 +272,8 @@ public void onNextPrioritized(T t) {
263272
return;
264273
}
265274

275+
ReferenceCountUtil.touch(t, tag + ": onNextPrioritized");
276+
266277
if (!priorityQueue.offer(t)) {
267278
Throwable ex =
268279
Operators.onOperatorError(null, Exceptions.failWithOverflow(), t, currentContext());
@@ -281,6 +292,8 @@ public void onNext(T t) {
281292
return;
282293
}
283294

295+
ReferenceCountUtil.touch(t, tag + ": onNext");
296+
284297
if (!queue.offer(t)) {
285298
Throwable ex =
286299
Operators.onOperatorError(null, Exceptions.failWithOverflow(), t, currentContext());

rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ public class ClientRSocketSession
6363

6464
public ClientRSocketSession(
6565
ByteBuf resumeToken,
66-
DuplexConnection initialDuplexConnection,
6766
ResumableDuplexConnection resumableDuplexConnection,
6867
Mono<DuplexConnection> connectionFactory,
6968
Function<DuplexConnection, Mono<Tuple2<ByteBuf, DuplexConnection>>> connectionTransformer,
@@ -80,7 +79,9 @@ public ClientRSocketSession(
8079
ResumeFrameCodec.encode(
8180
dc.alloc(),
8281
resumeToken.retain(),
82+
// server uses this to release its cache
8383
resumableFramesStore.frameImpliedPosition(), // observed on the client side
84+
// server uses this to check whether there is no mismatch
8485
resumableFramesStore.framePosition() // sent from the client sent
8586
));
8687
logger.debug("Resume Frame has been sent");
@@ -95,25 +96,20 @@ public ClientRSocketSession(
9596
this.resumableConnection = resumableDuplexConnection;
9697

9798
resumableDuplexConnection.onClose().doFinally(__ -> dispose()).subscribe();
98-
99-
observeDisconnection(initialDuplexConnection);
99+
resumableDuplexConnection.onActiveConnectionClosed().subscribe(this::reconnect);
100100

101101
S.lazySet(this, Operators.cancelledSubscription());
102102
}
103103

104-
void reconnect() {
104+
void reconnect(int index) {
105105
if (this.s == Operators.cancelledSubscription()
106106
&& S.compareAndSet(this, Operators.cancelledSubscription(), null)) {
107107
keepAliveSupport.stop();
108+
logger.debug("Connection[" + index + "] is lost. Reconnecting...");
108109
connectionFactory.retryWhen(retry).timeout(resumeSessionDuration).subscribe(this);
109-
logger.debug("Connection is lost. Reconnecting...");
110110
}
111111
}
112112

113-
void observeDisconnection(DuplexConnection activeConnection) {
114-
activeConnection.onClose().subscribe(null, e -> reconnect(), () -> reconnect());
115-
}
116-
117113
@Override
118114
public long impliedPosition() {
119115
return resumableFramesStore.frameImpliedPosition();
@@ -132,9 +128,11 @@ public void onImpliedPosition(long remoteImpliedPos) {
132128

133129
@Override
134130
public void dispose() {
135-
if (Operators.terminate(S, this)) {
136-
resumableFramesStore.dispose();
137-
resumableConnection.dispose();
131+
Operators.terminate(S, this);
132+
resumableConnection.dispose();
133+
resumableFramesStore.dispose();
134+
135+
if (resumeToken.refCnt() > 0) {
138136
resumeToken.release();
139137
}
140138
}
@@ -152,8 +150,8 @@ public void onSubscribe(Subscription s) {
152150
}
153151

154152
@Override
155-
public synchronized void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
156-
ByteBuf frame = tuple2.getT1();
153+
public void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
154+
ByteBuf shouldBeResumeOKFrame = tuple2.getT1();
157155
DuplexConnection nextDuplexConnection = tuple2.getT2();
158156

159157
if (!Operators.terminate(S, this)) {
@@ -164,9 +162,7 @@ public synchronized void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
164162
return;
165163
}
166164

167-
final FrameType frameType = FrameHeaderCodec.nativeFrameType(frame);
168-
final int streamId = FrameHeaderCodec.streamId(frame);
169-
165+
final int streamId = FrameHeaderCodec.streamId(shouldBeResumeOKFrame);
170166
if (streamId != 0) {
171167
logger.debug(
172168
"Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection");
@@ -177,8 +173,13 @@ public synchronized void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
177173
return;
178174
}
179175

176+
final FrameType frameType = FrameHeaderCodec.nativeFrameType(shouldBeResumeOKFrame);
180177
if (frameType == FrameType.RESUME_OK) {
181-
long remoteImpliedPos = ResumeOkFrameCodec.lastReceivedClientPos(frame);
178+
// how many frames the server has received from the client
179+
// so the client can release cached frames by this point
180+
long remoteImpliedPos = ResumeOkFrameCodec.lastReceivedClientPos(shouldBeResumeOKFrame);
181+
// what was the last notification from the server about number of frames being
182+
// observed
182183
final long position = resumableFramesStore.framePosition();
183184
final long impliedPosition = resumableFramesStore.frameImpliedPosition();
184185
logger.debug(
@@ -200,7 +201,6 @@ public synchronized void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
200201
}
201202

202203
if (resumableConnection.connect(nextDuplexConnection)) {
203-
observeDisconnection(nextDuplexConnection);
204204
keepAliveSupport.start();
205205
logger.debug("Session has been resumed successfully");
206206
} else {
@@ -220,7 +220,7 @@ public synchronized void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
220220
nextDuplexConnection.sendErrorAndClose(connectionErrorException);
221221
}
222222
} else if (frameType == FrameType.ERROR) {
223-
final RuntimeException exception = Exceptions.from(0, frame);
223+
final RuntimeException exception = Exceptions.from(0, shouldBeResumeOKFrame);
224224
logger.debug("Received error frame. Terminating received connection", exception);
225225
resumableConnection.dispose();
226226
} else {

rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public class InMemoryResumableFramesStore extends Flux<ByteBuf>
6161

6262
CoreSubscriber<? super ByteBuf> actual;
6363

64+
// indicates whether there is active connection or not
6465
volatile int state;
6566
static final AtomicIntegerFieldUpdater<InMemoryResumableFramesStore> STATE =
6667
AtomicIntegerFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "state");
@@ -94,9 +95,6 @@ public void releaseFrames(long remoteImpliedPos) {
9495
while (toRemoveBytes > removedBytes && frames.size() > 0) {
9596
ByteBuf cachedFrame = frames.remove(0);
9697
int frameSize = cachedFrame.readableBytes();
97-
// logger.debug(
98-
// "{} Removing frame {}", tag,
99-
// cachedFrame.toString(CharsetUtil.UTF_8));
10098
cachedFrame.release();
10199
removedBytes += frameSize;
102100
}
@@ -110,7 +108,7 @@ public void releaseFrames(long remoteImpliedPos) {
110108
toRemoveBytes));
111109
} else if (toRemoveBytes < removedBytes) {
112110
throw new IllegalStateException(
113-
"Local and remote state disagreement: " + "local and remote frame sizes are not equal");
111+
"Local and remote state disagreement: local and remote frame sizes are not equal");
114112
} else {
115113
POSITION.addAndGet(this, removedBytes);
116114
if (cacheLimit != Integer.MAX_VALUE) {
@@ -184,6 +182,7 @@ public void dispose() {
184182
if (STATE.getAndSet(this, 2) != 2) {
185183
cacheSize = 0;
186184
synchronized (this) {
185+
logger.debug("Tag {}.Disposing InMemoryFrameStore", tag);
187186
for (ByteBuf frame : cachedFrames) {
188187
if (frame != null) {
189188
frame.release();
@@ -197,7 +196,7 @@ public void dispose() {
197196

198197
@Override
199198
public boolean isDisposed() {
200-
return disposed.isTerminated();
199+
return state == 2;
201200
}
202201

203202
@Override
@@ -218,6 +217,9 @@ public void onComplete() {
218217

219218
@Override
220219
public void onNext(ByteBuf frame) {
220+
frame.touch("Tag : " + tag + ". InMemoryResumableFramesStore:onNext");
221+
222+
final int state;
221223
final boolean isResumable = isResumableFrame(frame);
222224
if (isResumable) {
223225
final ArrayList<ByteBuf> frames = cachedFrames;
@@ -244,20 +246,23 @@ public void onNext(ByteBuf frame) {
244246
POSITION.addAndGet(this, removedBytes);
245247
}
246248
}
247-
248249
synchronized (this) {
249-
frames.add(frame);
250+
state = this.state;
251+
if (state != 2) {
252+
frames.add(frame);
253+
}
250254
}
251255
if (cacheLimit != Integer.MAX_VALUE) {
252256
CACHE_SIZE.addAndGet(this, incomingFrameSize);
253257
}
258+
} else {
259+
state = this.state;
254260
}
255261

256-
final int state = this.state;
257262
final CoreSubscriber<? super ByteBuf> actual = this.actual;
258263
if (state == 1) {
259264
actual.onNext(frame.retain());
260-
} else if (!isResumable) {
265+
} else if (!isResumable || state == 2) {
261266
frame.release();
262267
}
263268
}
@@ -274,18 +279,25 @@ public void cancel() {
274279
@Override
275280
public void subscribe(CoreSubscriber<? super ByteBuf> actual) {
276281
final int state = this.state;
277-
logger.debug("Tag: {}. Subscribed State[{}]", tag, state);
278-
actual.onSubscribe(this);
279282
if (state != 2) {
283+
resumeImplied();
284+
logger.debug(
285+
"Tag: {}. Subscribed at Position[{}] and ImpliedPosition[{}]",
286+
tag,
287+
position,
288+
impliedPosition);
289+
actual.onSubscribe(this);
280290
synchronized (this) {
281291
for (final ByteBuf frame : cachedFrames) {
292+
frame.touch("Tag : " + tag + ". InMemoryResumableFramesStore:subscribe");
282293
actual.onNext(frame.retain());
283294
}
284295
}
285296

286297
this.actual = actual;
287-
resumeImplied();
288298
STATE.compareAndSet(this, 0, 1);
299+
} else {
300+
Operators.complete(actual);
289301
}
290302
}
291303
}

0 commit comments

Comments
 (0)