Skip to content

Commit 3f17a45

Browse files
fixes ReconnectMono behaviour when racing invalidate and subscribe #847
Co-authored-by: Rossen Stoyanchev <[email protected]>
1 parent bcee05b commit 3f17a45

File tree

2 files changed

+323
-25
lines changed

2 files changed

+323
-25
lines changed

rsocket-core/src/main/java/io/rsocket/core/ReconnectMono.java

Lines changed: 60 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,25 @@ public void subscribe(CoreSubscriber<? super T> actual) {
112112
final ReconnectInner<T> inner = new ReconnectInner<>(actual, this);
113113
actual.onSubscribe(inner);
114114

115-
final int state = this.add(inner);
115+
for (; ; ) {
116+
final int state = this.add(inner);
117+
118+
T value = this.value;
119+
120+
if (state == READY_STATE) {
121+
if (value != null) {
122+
inner.complete(value);
123+
return;
124+
}
125+
// value == null means racing between invalidate and this subscriber
126+
// thus, we have to loop again
127+
continue;
128+
} else if (state == TERMINATED_STATE) {
129+
inner.onError(this.t);
130+
return;
131+
}
116132

117-
if (state == READY_STATE) {
118-
inner.complete(this.value);
119-
} else if (state == TERMINATED_STATE) {
120-
inner.onError(this.t);
133+
return;
121134
}
122135
}
123136

@@ -150,7 +163,14 @@ public T block(@Nullable Duration timeout) {
150163
try {
151164
ReconnectInner<T>[] subscribers = this.subscribers;
152165
if (subscribers == READY) {
153-
return this.value;
166+
final T value = this.value;
167+
if (value != null) {
168+
return value;
169+
} else {
170+
// value == null means racing between invalidate and this block
171+
// thus, we have to update the state again and see what happened
172+
subscribers = this.subscribers;
173+
}
154174
}
155175

156176
if (subscribers == TERMINATED) {
@@ -175,7 +195,14 @@ public T block(@Nullable Duration timeout) {
175195
ReconnectInner<T>[] inners = this.subscribers;
176196

177197
if (inners == READY) {
178-
return this.value;
198+
final T value = this.value;
199+
if (value != null) {
200+
return value;
201+
} else {
202+
// value == null means racing between invalidate and this block
203+
// thus, we have to update the state again and see what happened
204+
inners = this.subscribers;
205+
}
179206
}
180207
if (inners == TERMINATED) {
181208
RuntimeException re = Exceptions.propagate(this.t);
@@ -282,13 +309,31 @@ public void invalidate() {
282309

283310
final ReconnectInner<T>[] subscribers = this.subscribers;
284311

285-
if (subscribers == READY && SUBSCRIBERS.compareAndSet(this, READY, EMPTY_UNSUBSCRIBED)) {
312+
if (subscribers == READY) {
313+
// guarded section to ensure we expire value exactly once if there is racing
314+
if (WIP.getAndIncrement(this) != 0) {
315+
return;
316+
}
317+
286318
final T value = this.value;
287319
this.value = null;
288-
289320
if (value != null) {
290321
this.onValueExpired.accept(value);
291322
}
323+
324+
int m = 1;
325+
for (; ; ) {
326+
if (isDisposed()) {
327+
return;
328+
}
329+
330+
m = WIP.addAndGet(this, -m);
331+
if (m == 0) {
332+
break;
333+
}
334+
}
335+
336+
SUBSCRIBERS.compareAndSet(this, READY, EMPTY_UNSUBSCRIBED);
292337
}
293338
}
294339

@@ -355,6 +400,11 @@ void remove(ReconnectInner<T> ps) {
355400
}
356401
}
357402

403+
/**
404+
* Subscriber that subscribes to the source {@link Mono} to receive its value. <br>
405+
* Note that the source is not expected to complete empty, and if this happens, execution will
406+
* terminate with an {@code IllegalStateException}.
407+
*/
358408
static final class ReconnectMainSubscriber<T> implements CoreSubscriber<T> {
359409

360410
final ReconnectMono<T> parent;
@@ -389,7 +439,7 @@ public void onComplete() {
389439
}
390440

391441
if (value == null) {
392-
p.terminate(new IllegalStateException("Unexpected Completion of the Upstream"));
442+
p.terminate(new IllegalStateException("Source completed empty"));
393443
} else {
394444
p.complete();
395445
}

0 commit comments

Comments
 (0)