@@ -248,7 +248,7 @@ public ClientRSocketFactory singleSubscriberRequester() {
248
248
* RSocketFactory
249
249
* .connect()
250
250
* .singleSubscriberRequester()
251
- * .reconnect()
251
+ * .reconnect(Retry.fixedDelay(3, Duration.ofSeconds(1)) )
252
252
* .transport(transport)
253
253
* .start();
254
254
*
@@ -260,7 +260,7 @@ public ClientRSocketFactory singleSubscriberRequester() {
260
260
* }</pre>
261
261
*
262
262
* Apart of the shared behavior, once the previous connection has been expired (e.g. connection
263
- * has been disposed), the same instance of {@code Mono<RSocket>} reestablish connection without
263
+ * has been disposed), the same instance of {@code Mono<RSocket>} reestablishes connection without
264
264
* any extra effort. <br>
265
265
* For example:
266
266
*
@@ -269,7 +269,7 @@ public ClientRSocketFactory singleSubscriberRequester() {
269
269
* RSocketFactory
270
270
* .connect()
271
271
* .singleSubscriberRequester()
272
- * .reconnect()
272
+ * .reconnect(Retry.fixedDelay(3, Duration.ofSeconds(1)) )
273
273
* .transport(transport)
274
274
* .start();
275
275
*
@@ -291,67 +291,7 @@ public ClientRSocketFactory singleSubscriberRequester() {
291
291
*
292
292
* }</pre>
293
293
*
294
- * @return a shared instance of {@code Mono<RSocket>}.
295
- */
296
- public ClientRSocketFactory reconnect () {
297
- this .reconnectEnabled = true ;
298
- return this ;
299
- }
300
-
301
- /**
302
- * Enables a reconnectable, shared instance of {@code Mono<RSocket>} so every subscriber will
303
- * observe the same RSocket instance up on connection establishment. <br>
304
- * For example:
305
- *
306
- * <pre>{@code
307
- * Mono<RSocket> sharedRSocketMono =
308
- * RSocketFactory
309
- * .connect()
310
- * .singleSubscriberRequester()
311
- * .reconnect(fluxOfThrowables -> flux.concatMap(t -> Mono.delay(Duration.ofSeconds(1)).take(5))
312
- * .transport(transport)
313
- * .start();
314
- *
315
- * RSocket r1 = sharedRSocketMono.block();
316
- * RSocket r2 = sharedRSocketMono.block();
317
- *
318
- * assert r1 == r2;
319
- *
320
- * }</pre>
321
- *
322
- * Apart of the shared behavior, once the previous connection has been expired (e.g. connection
323
- * has been disposed), the same instance of {@code Mono<RSocket>} reestablish connection without
324
- * any extra effort. <br>
325
- * For example:
326
- *
327
- * <pre>{@code
328
- * Mono<RSocket> sharedRSocketMono =
329
- * RSocketFactory
330
- * .connect()
331
- * .singleSubscriberRequester()
332
- * .reconnect(fluxOfThrowables -> flux.concatMap(t -> Mono.delay(Duration.ofSeconds(1)).take(5))
333
- * .transport(transport)
334
- * .start();
335
- *
336
- * RSocket r1 = sharedRSocketMono.block();
337
- * RSocket r2 = sharedRSocketMono.block();
338
- *
339
- * assert r1 == r2;
340
- *
341
- * r1.dispose()
342
- *
343
- * assert r2.isDisposed()
344
- *
345
- * RSocket r3 = sharedRSocketMono.block();
346
- * RSocket r4 = sharedRSocketMono.block();
347
- *
348
- *
349
- * assert r1 != r3;
350
- * assert r4 == r3;
351
- *
352
- * }</pre>
353
- *
354
- * @param whenFactory a retry factory applied for {@link Mono.retryWhen(whenFactory)}
294
+ * @param retrySpec a retry factory applied for {@link Mono#retryWhen(Retry)}
355
295
* @return a shared instance of {@code Mono<RSocket>}.
356
296
*/
357
297
public ClientRSocketFactory reconnect (Retry retrySpec ) {
@@ -527,7 +467,7 @@ public Mono<RSocket> start() {
527
467
source -> {
528
468
if (reconnectEnabled ) {
529
469
return new ReconnectMono <>(
530
- retrySpec == null ? source : source .retryWhen (retrySpec ),
470
+ source .retryWhen (retrySpec ),
531
471
Disposable ::dispose ,
532
472
INVALIDATE_FUNCTION );
533
473
} else {
0 commit comments