Skip to content

Commit 20e6b5f

Browse files
committed
Merge #1.0.x into master
Signed-off-by: Oleh Dokuka <[email protected]>
2 parents c337cba + a2a3540 commit 20e6b5f

File tree

8 files changed

+387
-352
lines changed

8 files changed

+387
-352
lines changed

rsocket-core/src/main/java/io/rsocket/core/ResolvingOperator.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,19 +170,19 @@ public T block(@Nullable Duration timeout) {
170170
delay = System.nanoTime() + timeout.toNanos();
171171
}
172172
for (; ; ) {
173-
BiConsumer<T, Throwable>[] inners = this.subscribers;
173+
subscribers = this.subscribers;
174174

175-
if (inners == READY) {
175+
if (subscribers == READY) {
176176
final T value = this.value;
177177
if (value != null) {
178178
return value;
179179
} else {
180180
// value == null means racing between invalidate and this block
181181
// thus, we have to update the state again and see what happened
182-
inners = this.subscribers;
182+
subscribers = this.subscribers;
183183
}
184184
}
185-
if (inners == TERMINATED) {
185+
if (subscribers == TERMINATED) {
186186
RuntimeException re = Exceptions.propagate(this.t);
187187
re = Exceptions.addSuppressed(re, new Exception("Terminated with an error"));
188188
throw re;
@@ -191,6 +191,12 @@ public T block(@Nullable Duration timeout) {
191191
throw new IllegalStateException("Timeout on Mono blocking read");
192192
}
193193

194+
// connect again since invalidate() has happened in between
195+
if (subscribers == EMPTY_UNSUBSCRIBED
196+
&& SUBSCRIBERS.compareAndSet(this, EMPTY_UNSUBSCRIBED, EMPTY_SUBSCRIBED)) {
197+
this.doSubscribe();
198+
}
199+
194200
Thread.sleep(1);
195201
}
196202
} catch (InterruptedException ie) {
@@ -203,6 +209,7 @@ public T block(@Nullable Duration timeout) {
203209
@SuppressWarnings("unchecked")
204210
final void terminate(Throwable t) {
205211
if (isDisposed()) {
212+
Operators.onErrorDropped(t, Context.empty());
206213
return;
207214
}
208215

rsocket-core/src/main/java/io/rsocket/loadbalance/ResolvingOperator.java

Lines changed: 11 additions & 179 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,10 @@
1818
import java.time.Duration;
1919
import java.util.concurrent.CancellationException;
2020
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
21-
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
2221
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
2322
import java.util.function.BiConsumer;
24-
import org.reactivestreams.Subscription;
25-
import reactor.core.CoreSubscriber;
2623
import reactor.core.Disposable;
2724
import reactor.core.Exceptions;
28-
import reactor.core.Scannable;
2925
import reactor.core.publisher.Operators;
3026
import reactor.util.annotation.Nullable;
3127
import reactor.util.context.Context;
@@ -170,19 +166,19 @@ public T block(@Nullable Duration timeout) {
170166
delay = System.nanoTime() + timeout.toNanos();
171167
}
172168
for (; ; ) {
173-
BiConsumer<T, Throwable>[] inners = this.subscribers;
169+
subscribers = this.subscribers;
174170

175-
if (inners == READY) {
171+
if (subscribers == READY) {
176172
final T value = this.value;
177173
if (value != null) {
178174
return value;
179175
} else {
180176
// value == null means racing between invalidate and this block
181177
// thus, we have to update the state again and see what happened
182-
inners = this.subscribers;
178+
subscribers = this.subscribers;
183179
}
184180
}
185-
if (inners == TERMINATED) {
181+
if (subscribers == TERMINATED) {
186182
RuntimeException re = Exceptions.propagate(this.t);
187183
re = Exceptions.addSuppressed(re, new Exception("Terminated with an error"));
188184
throw re;
@@ -191,6 +187,12 @@ public T block(@Nullable Duration timeout) {
191187
throw new IllegalStateException("Timeout on Mono blocking read");
192188
}
193189

190+
// connect again since invalidate() has happened in between
191+
if (subscribers == EMPTY_UNSUBSCRIBED
192+
&& SUBSCRIBERS.compareAndSet(this, EMPTY_UNSUBSCRIBED, EMPTY_SUBSCRIBED)) {
193+
this.doSubscribe();
194+
}
195+
194196
Thread.sleep(1);
195197
}
196198
} catch (InterruptedException ie) {
@@ -203,6 +205,7 @@ public T block(@Nullable Duration timeout) {
203205
@SuppressWarnings("unchecked")
204206
final void terminate(Throwable t) {
205207
if (isDisposed()) {
208+
Operators.onErrorDropped(t, Context.empty());
206209
return;
207210
}
208211

@@ -390,175 +393,4 @@ final void remove(BiConsumer<T, Throwable> ps) {
390393
}
391394
}
392395
}
393-
394-
abstract static class DeferredResolution<T, R>
395-
implements CoreSubscriber<T>, Subscription, Scannable, BiConsumer<R, Throwable> {
396-
397-
final ResolvingOperator<R> parent;
398-
final CoreSubscriber<? super T> actual;
399-
400-
volatile long requested;
401-
402-
@SuppressWarnings("rawtypes")
403-
static final AtomicLongFieldUpdater<DeferredResolution> REQUESTED =
404-
AtomicLongFieldUpdater.newUpdater(DeferredResolution.class, "requested");
405-
406-
static final long STATE_SUBSCRIBED = -1;
407-
static final long STATE_CANCELLED = Long.MIN_VALUE;
408-
409-
Subscription s;
410-
boolean done;
411-
412-
DeferredResolution(ResolvingOperator<R> parent, CoreSubscriber<? super T> actual) {
413-
this.parent = parent;
414-
this.actual = actual;
415-
}
416-
417-
@Override
418-
public final Context currentContext() {
419-
return this.actual.currentContext();
420-
}
421-
422-
@Nullable
423-
@Override
424-
public Object scanUnsafe(Attr key) {
425-
long state = this.requested;
426-
427-
if (key == Attr.PARENT) {
428-
return this.s;
429-
}
430-
if (key == Attr.ACTUAL) {
431-
return this.parent;
432-
}
433-
if (key == Attr.TERMINATED) {
434-
return this.done;
435-
}
436-
if (key == Attr.CANCELLED) {
437-
return state == STATE_CANCELLED;
438-
}
439-
440-
return null;
441-
}
442-
443-
@Override
444-
public final void onSubscribe(Subscription s) {
445-
final long state = this.requested;
446-
Subscription a = this.s;
447-
if (state == STATE_CANCELLED) {
448-
s.cancel();
449-
return;
450-
}
451-
if (a != null) {
452-
s.cancel();
453-
return;
454-
}
455-
456-
long r;
457-
long accumulated = 0;
458-
for (; ; ) {
459-
r = this.requested;
460-
461-
if (r == STATE_CANCELLED || r == STATE_SUBSCRIBED) {
462-
s.cancel();
463-
return;
464-
}
465-
466-
this.s = s;
467-
468-
long toRequest = r - accumulated;
469-
if (toRequest > 0) { // if there is something,
470-
s.request(toRequest); // then we do a request on the given subscription
471-
}
472-
accumulated = r;
473-
474-
if (REQUESTED.compareAndSet(this, r, STATE_SUBSCRIBED)) {
475-
return;
476-
}
477-
}
478-
}
479-
480-
@Override
481-
public final void onNext(T payload) {
482-
this.actual.onNext(payload);
483-
}
484-
485-
@Override
486-
public final void onError(Throwable t) {
487-
if (this.done) {
488-
Operators.onErrorDropped(t, this.actual.currentContext());
489-
return;
490-
}
491-
492-
this.done = true;
493-
this.actual.onError(t);
494-
}
495-
496-
@Override
497-
public final void onComplete() {
498-
if (this.done) {
499-
return;
500-
}
501-
502-
this.done = true;
503-
this.actual.onComplete();
504-
}
505-
506-
@Override
507-
public void request(long n) {
508-
if (Operators.validate(n)) {
509-
long r = this.requested; // volatile read beforehand
510-
511-
if (r > STATE_SUBSCRIBED) { // works only in case onSubscribe has not happened
512-
long u;
513-
for (; ; ) { // normal CAS loop with overflow protection
514-
if (r == Long.MAX_VALUE) {
515-
// if r == Long.MAX_VALUE then we dont care and we can loose this
516-
// request just in case of racing
517-
return;
518-
}
519-
u = Operators.addCap(r, n);
520-
if (REQUESTED.compareAndSet(this, r, u)) {
521-
// Means increment happened before onSubscribe
522-
return;
523-
} else {
524-
// Means increment happened after onSubscribe
525-
526-
// update new state to see what exactly happened (onSubscribe |cancel | requestN)
527-
r = this.requested;
528-
529-
// check state (expect -1 | -2 to exit, otherwise repeat)
530-
if (r < 0) {
531-
break;
532-
}
533-
}
534-
}
535-
}
536-
537-
if (r == STATE_CANCELLED) { // if canceled, just exit
538-
return;
539-
}
540-
541-
// if onSubscribe -> subscription exists (and we sure of that because volatile read
542-
// after volatile write) so we can execute requestN on the subscription
543-
this.s.request(n);
544-
}
545-
}
546-
547-
public boolean isCancelled() {
548-
return this.requested == STATE_CANCELLED;
549-
}
550-
551-
public void cancel() {
552-
long state = REQUESTED.getAndSet(this, STATE_CANCELLED);
553-
if (state == STATE_CANCELLED) {
554-
return;
555-
}
556-
557-
if (state == STATE_SUBSCRIBED) {
558-
this.s.cancel();
559-
} else {
560-
this.parent.remove(this);
561-
}
562-
}
563-
}
564396
}

rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@
105105
import reactor.core.publisher.Hooks;
106106
import reactor.core.publisher.Mono;
107107
import reactor.core.publisher.Sinks;
108-
import reactor.core.scheduler.Schedulers;
109108
import reactor.test.StepVerifier;
110109
import reactor.test.publisher.TestPublisher;
111110
import reactor.test.util.RaceTestUtils;
@@ -1171,12 +1170,8 @@ public void shouldTerminateAllStreamsIfThereRacingBetweenDisposeAndRequests(
11711170
Publisher<?> publisher2 = interaction2.apply(rule, payload2);
11721171
RaceTestUtils.race(
11731172
() -> rule.socket.dispose(),
1174-
() ->
1175-
RaceTestUtils.race(
1176-
() -> publisher1.subscribe(assertSubscriber1),
1177-
() -> publisher2.subscribe(assertSubscriber2),
1178-
Schedulers.parallel()),
1179-
Schedulers.parallel());
1173+
() -> publisher1.subscribe(assertSubscriber1),
1174+
() -> publisher2.subscribe(assertSubscriber2));
11801175

11811176
assertSubscriber1.await().assertTerminated();
11821177
if (interactionType1 != REQUEST_FNF && interactionType1 != METADATA_PUSH) {

0 commit comments

Comments
 (0)