Skip to content

Commit 0283373

Browse files
committed
reworks and improves Resumability impl
this includes: * rework of InMemoryResumableFramesStore and improvement in its tests coverage * improvements in Client/Server resume Session and ensuring that if connection is rejected for any reasons - it is fully closed on both outbound and inbound ends (This fix is needed for LocalDuplexConnection scenario which may be in unterminated state if it will not be subscribed on the inbound) * enabling resumability tests for LocalTransport * improvements in logging * general cleanups and polishing Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 894aa6d commit 0283373

File tree

15 files changed

+1500
-255
lines changed

15 files changed

+1500
-255
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,7 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
612612
final ResumableDuplexConnection resumableDuplexConnection =
613613
new ResumableDuplexConnection(
614614
CLIENT_TAG,
615+
resumeToken,
615616
clientServerConnection,
616617
resumableFramesStore);
617618
final ResumableClientSetup resumableClientSetup =

rsocket-core/src/main/java/io/rsocket/core/Resume.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ boolean isCleanupStoreOnKeepAlive() {
160160
Function<? super ByteBuf, ? extends ResumableFramesStore> getStoreFactory(String tag) {
161161
return storeFactory != null
162162
? storeFactory
163-
: token -> new InMemoryResumableFramesStore(tag, 100_000);
163+
: token -> new InMemoryResumableFramesStore(tag, token, 100_000);
164164
}
165165

166166
Duration getStreamTimeout() {

rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ public Mono<Void> acceptRSocketSetup(
109109

110110
final ResumableFramesStore resumableFramesStore = resumeStoreFactory.apply(resumeToken);
111111
final ResumableDuplexConnection resumableDuplexConnection =
112-
new ResumableDuplexConnection("server", duplexConnection, resumableFramesStore);
112+
new ResumableDuplexConnection(
113+
"server", resumeToken, duplexConnection, resumableFramesStore);
113114
final ServerRSocketSession serverRSocketSession =
114115
new ServerRSocketSession(
115116
resumeToken,

rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java

Lines changed: 56 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import io.netty.buffer.ByteBuf;
2020
import io.netty.buffer.ByteBufAllocator;
21+
import io.netty.util.CharsetUtil;
2122
import io.rsocket.DuplexConnection;
2223
import io.rsocket.exceptions.ConnectionErrorException;
2324
import io.rsocket.exceptions.Exceptions;
@@ -54,6 +55,7 @@ public class ClientRSocketSession
5455
final Retry retry;
5556
final boolean cleanupStoreOnKeepAlive;
5657
final ByteBuf resumeToken;
58+
final String session;
5759

5860
volatile Subscription s;
5961
static final AtomicReferenceFieldUpdater<ClientRSocketSession, Subscription> S =
@@ -71,20 +73,30 @@ public ClientRSocketSession(
7173
Retry retry,
7274
boolean cleanupStoreOnKeepAlive) {
7375
this.resumeToken = resumeToken;
76+
this.session = resumeToken.toString(CharsetUtil.UTF_8);
7477
this.connectionFactory =
7578
connectionFactory.flatMap(
7679
dc -> {
80+
final long impliedPosition = resumableFramesStore.frameImpliedPosition();
81+
final long position = resumableFramesStore.framePosition();
7782
dc.sendFrame(
7883
0,
7984
ResumeFrameCodec.encode(
8085
dc.alloc(),
8186
resumeToken.retain(),
8287
// server uses this to release its cache
83-
resumableFramesStore.frameImpliedPosition(), // observed on the client side
88+
impliedPosition, // observed on the client side
8489
// server uses this to check whether there is no mismatch
85-
resumableFramesStore.framePosition() // sent from the client sent
90+
position // sent from the client sent
8691
));
87-
logger.debug("Resume Frame has been sent");
92+
93+
if (logger.isDebugEnabled()) {
94+
logger.debug(
95+
"Side[client]|Session[{}]. ResumeFrame[impliedPosition[{}], position[{}]] has been sent.",
96+
session,
97+
impliedPosition,
98+
position);
99+
}
88100

89101
return connectionTransformer.apply(dc);
90102
});
@@ -105,7 +117,12 @@ void reconnect(int index) {
105117
if (this.s == Operators.cancelledSubscription()
106118
&& S.compareAndSet(this, Operators.cancelledSubscription(), null)) {
107119
keepAliveSupport.stop();
108-
logger.debug("Connection[" + index + "] is lost. Reconnecting to resume...");
120+
if (logger.isDebugEnabled()) {
121+
logger.debug(
122+
"Side[client]|Session[{}]. Connection[{}] is lost. Reconnecting to resume...",
123+
session,
124+
index);
125+
}
109126
connectionFactory.retryWhen(retry).timeout(resumeSessionDuration).subscribe(this);
110127
}
111128
}
@@ -155,21 +172,30 @@ public void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
155172
DuplexConnection nextDuplexConnection = tuple2.getT2();
156173

157174
if (!Operators.terminate(S, this)) {
158-
logger.debug("Session has already been expired. Terminating received connection");
175+
if (logger.isDebugEnabled()) {
176+
logger.debug(
177+
"Side[client]|Session[{}]. Session has already been expired. Terminating received connection",
178+
session);
179+
}
159180
final ConnectionErrorException connectionErrorException =
160181
new ConnectionErrorException("resumption_server=[Session Expired]");
161182
nextDuplexConnection.sendErrorAndClose(connectionErrorException);
183+
nextDuplexConnection.receive().subscribe().dispose();
162184
return;
163185
}
164186

165187
final int streamId = FrameHeaderCodec.streamId(shouldBeResumeOKFrame);
166188
if (streamId != 0) {
167-
logger.debug(
168-
"Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection");
169-
resumableConnection.dispose();
189+
if (logger.isDebugEnabled()) {
190+
logger.debug(
191+
"Side[client]|Session[{}]. Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection",
192+
session);
193+
}
170194
final ConnectionErrorException connectionErrorException =
171195
new ConnectionErrorException("RESUME_OK frame must be received before any others");
196+
resumableConnection.dispose(connectionErrorException);
172197
nextDuplexConnection.sendErrorAndClose(connectionErrorException);
198+
nextDuplexConnection.receive().subscribe().dispose();
173199
return;
174200
}
175201

@@ -183,7 +209,8 @@ public void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
183209
final long position = resumableFramesStore.framePosition();
184210
final long impliedPosition = resumableFramesStore.frameImpliedPosition();
185211
logger.debug(
186-
"ResumeOK FRAME received. ServerResumeState{observedFramesPosition[{}]}. ClientResumeState{observedFramesPosition[{}], sentFramesPosition[{}]}",
212+
"Side[client]|Session[{}]. ResumeOK FRAME received. ServerResumeState[remoteImpliedPosition[{}]]. ClientResumeState[impliedPosition[{}], position[{}]]",
213+
session,
187214
remoteImpliedPos,
188215
impliedPosition,
189216
position);
@@ -194,42 +221,54 @@ public void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
194221
}
195222
} catch (IllegalStateException e) {
196223
logger.debug("Exception occurred while releasing frames in the frameStore", e);
197-
resumableConnection.dispose();
224+
resumableConnection.dispose(e);
198225
final ConnectionErrorException t = new ConnectionErrorException(e.getMessage(), e);
199226
nextDuplexConnection.sendErrorAndClose(t);
227+
nextDuplexConnection.receive().subscribe().dispose();
200228
return;
201229
}
202230

203231
if (resumableConnection.connect(nextDuplexConnection)) {
204232
keepAliveSupport.start();
205-
logger.debug("Session has been resumed successfully");
233+
if (logger.isDebugEnabled()) {
234+
logger.debug(
235+
"Side[client]|Session[{}]. Session has been resumed successfully", session);
236+
}
206237
} else {
207-
logger.debug("Session has already been expired. Terminating received connection");
238+
if (logger.isDebugEnabled()) {
239+
logger.debug(
240+
"Side[client]|Session[{}]. Session has already been expired. Terminating received connection",
241+
session);
242+
}
208243
final ConnectionErrorException connectionErrorException =
209244
new ConnectionErrorException("resumption_server_pos=[Session Expired]");
210245
nextDuplexConnection.sendErrorAndClose(connectionErrorException);
246+
nextDuplexConnection.receive().subscribe().dispose();
211247
}
212248
} else {
213249
logger.debug(
214-
"Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}]. Terminating received connection",
250+
"Side[client]|Session[{}]. Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}]. Terminating received connection",
251+
session,
215252
remoteImpliedPos,
216253
position);
217-
resumableConnection.dispose();
218254
final ConnectionErrorException connectionErrorException =
219255
new ConnectionErrorException("resumption_server_pos=[" + remoteImpliedPos + "]");
256+
resumableConnection.dispose(connectionErrorException);
220257
nextDuplexConnection.sendErrorAndClose(connectionErrorException);
258+
nextDuplexConnection.receive().subscribe().dispose();
221259
}
222260
} else if (frameType == FrameType.ERROR) {
223261
final RuntimeException exception = Exceptions.from(0, shouldBeResumeOKFrame);
224262
logger.debug("Received error frame. Terminating received connection", exception);
225-
resumableConnection.dispose();
263+
resumableConnection.dispose(exception);
226264
} else {
227265
logger.debug(
228266
"Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection");
229-
resumableConnection.dispose();
230267
final ConnectionErrorException connectionErrorException =
231268
new ConnectionErrorException("RESUME_OK frame must be received before any others");
269+
resumableConnection.dispose(connectionErrorException);
232270
nextDuplexConnection.sendErrorAndClose(connectionErrorException);
271+
nextDuplexConnection.receive().subscribe().dispose();
233272
}
234273
}
235274

@@ -239,7 +278,7 @@ public void onError(Throwable t) {
239278
Operators.onErrorDropped(t, currentContext());
240279
}
241280

242-
resumableConnection.dispose();
281+
resumableConnection.dispose(t);
243282
}
244283

245284
@Override

0 commit comments

Comments
 (0)