Skip to content

Commit 480ef2d

Browse files
Oleh DokukaOlegDokuka
Oleh Dokuka
authored andcommitted
migrates from deprecated RaceTestUtils.race; fixes observed issues
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 026ec4b commit 480ef2d

File tree

11 files changed

+426
-221
lines changed

11 files changed

+426
-221
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ subprojects {
128128
links 'https://projectreactor.io/docs/core/release/api/'
129129
links 'https://netty.io/4.1/api/'
130130
}
131+
failOnError = false
131132
}
132133

133134
tasks.named("javadoc").configure {

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ public RSocketConnector metadataMimeType(String metadataMimeType) {
213213
* <li>For server-to-server connections, a reasonable time interval between client {@code
214214
* KEEPALIVE} frames is 500ms.
215215
* <li>For mobile-to-server connections, the time interval between client {@code KEEPALIVE}
216-
* frames is often > 30,000ms.
216+
* frames is often {@code >} 30,000ms.
217217
* </ul>
218218
*
219219
* <p>By default these are set to 20 seconds and 90 seconds respectively.

rsocket-core/src/main/java/io/rsocket/core/ResolvingOperator.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,19 +153,19 @@ public T block(@Nullable Duration timeout) {
153153
delay = System.nanoTime() + timeout.toNanos();
154154
}
155155
for (; ; ) {
156-
BiConsumer<T, Throwable>[] inners = this.subscribers;
156+
subscribers = this.subscribers;
157157

158-
if (inners == READY) {
158+
if (subscribers == READY) {
159159
final T value = this.value;
160160
if (value != null) {
161161
return value;
162162
} else {
163163
// value == null means racing between invalidate and this block
164164
// thus, we have to update the state again and see what happened
165-
inners = this.subscribers;
165+
subscribers = this.subscribers;
166166
}
167167
}
168-
if (inners == TERMINATED) {
168+
if (subscribers == TERMINATED) {
169169
RuntimeException re = Exceptions.propagate(this.t);
170170
re = Exceptions.addSuppressed(re, new Exception("Terminated with an error"));
171171
throw re;
@@ -174,6 +174,12 @@ public T block(@Nullable Duration timeout) {
174174
throw new IllegalStateException("Timeout on Mono blocking read");
175175
}
176176

177+
// connect again since invalidate() has happened in between
178+
if (subscribers == EMPTY_UNSUBSCRIBED
179+
&& SUBSCRIBERS.compareAndSet(this, EMPTY_UNSUBSCRIBED, EMPTY_SUBSCRIBED)) {
180+
this.doSubscribe();
181+
}
182+
177183
Thread.sleep(1);
178184
}
179185
} catch (InterruptedException ie) {
@@ -186,6 +192,7 @@ public T block(@Nullable Duration timeout) {
186192
@SuppressWarnings("unchecked")
187193
final void terminate(Throwable t) {
188194
if (isDisposed()) {
195+
Operators.onErrorDropped(t, Context.empty());
189196
return;
190197
}
191198

rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,9 @@ void drainRegular(Subscriber<? super T> a) {
115115
while (r != e) {
116116
boolean d = done;
117117

118-
T t;
119-
boolean empty;
120-
121-
if (!pq.isEmpty()) {
122-
t = pq.poll();
123-
empty = false;
124-
} else {
118+
T t = pq.poll();
119+
boolean empty = t == null;
120+
if (empty) {
125121
t = q.poll();
126122
empty = t == null;
127123
}
@@ -196,8 +192,9 @@ void drainFused(Subscriber<? super T> a) {
196192
}
197193

198194
public void drain() {
199-
if (WIP.getAndIncrement(this) != 0) {
200-
if ((!outputFused && cancelled) || terminated) {
195+
final int previousWip = WIP.getAndIncrement(this);
196+
if (previousWip != 0) {
197+
if (previousWip < 0 || terminated) {
201198
this.clear();
202199
}
203200
return;
@@ -231,6 +228,7 @@ boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a) {
231228
return true;
232229
}
233230
if (d && empty) {
231+
this.clear();
234232
Throwable e = error;
235233
hasDownstream = false;
236234
if (e != null) {
@@ -330,11 +328,7 @@ public void subscribe(CoreSubscriber<? super T> actual) {
330328

331329
actual.onSubscribe(this);
332330
this.actual = actual;
333-
if (cancelled) {
334-
this.hasDownstream = false;
335-
} else {
336-
drain();
337-
}
331+
drain();
338332
} else {
339333
Operators.error(
340334
actual,
@@ -388,6 +382,18 @@ public boolean isEmpty() {
388382
@Override
389383
public void clear() {
390384
terminated = true;
385+
for (; ; ) {
386+
int wip = this.wip;
387+
388+
clearSafely();
389+
390+
if (WIP.compareAndSet(this, wip, Integer.MIN_VALUE)) {
391+
return;
392+
}
393+
}
394+
}
395+
396+
void clearSafely() {
391397
if (DISCARD_GUARD.getAndIncrement(this) != 0) {
392398
return;
393399
}
@@ -428,34 +434,20 @@ public void dispose() {
428434
error = new CancellationException("Disposed");
429435
done = true;
430436

431-
boolean once = true;
432437
if (WIP.getAndIncrement(this) == 0) {
433438
cancelled = true;
434-
int m = 1;
435-
for (; ; ) {
436-
final CoreSubscriber<? super T> a = this.actual;
437-
438-
if (!outputFused || terminated) {
439-
clear();
440-
}
441-
442-
if (a != null && once) {
443-
try {
444-
a.onError(error);
445-
} catch (Throwable ignored) {
446-
}
447-
}
439+
final CoreSubscriber<? super T> a = this.actual;
448440

449-
cancelled = true;
450-
once = false;
441+
if (!outputFused || terminated) {
442+
clear();
443+
}
451444

452-
int wip = this.wip;
453-
if (wip == m) {
454-
break;
445+
if (a != null) {
446+
try {
447+
a.onError(error);
448+
} catch (Throwable ignored) {
455449
}
456-
m = wip;
457450
}
458-
459451
hasDownstream = false;
460452
}
461453
}

rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@
9191
import reactor.core.publisher.Mono;
9292
import reactor.core.publisher.MonoProcessor;
9393
import reactor.core.publisher.UnicastProcessor;
94-
import reactor.core.scheduler.Schedulers;
9594
import reactor.test.StepVerifier;
9695
import reactor.test.publisher.TestPublisher;
9796
import reactor.test.util.RaceTestUtils;
@@ -1082,15 +1081,11 @@ public void shouldTerminateAllStreamsIfThereRacingBetweenDisposeAndRequests(
10821081
Publisher<?> publisher2 = interaction2.apply(rule, payload2);
10831082
RaceTestUtils.race(
10841083
() -> rule.socket.dispose(),
1085-
() ->
1086-
RaceTestUtils.race(
1087-
() -> publisher1.subscribe(assertSubscriber1),
1088-
() -> publisher2.subscribe(assertSubscriber2),
1089-
Schedulers.parallel()),
1090-
Schedulers.parallel());
1084+
() -> publisher1.subscribe(assertSubscriber1),
1085+
() -> publisher2.subscribe(assertSubscriber2));
10911086

10921087
assertSubscriber1.await().assertTerminated();
1093-
if (interactionType1 != REQUEST_FNF) {
1088+
if (interactionType1 != REQUEST_FNF && interactionType1 != METADATA_PUSH) {
10941089
assertSubscriber1.assertError(ClosedChannelException.class);
10951090
} else {
10961091
try {
@@ -1101,7 +1096,7 @@ public void shouldTerminateAllStreamsIfThereRacingBetweenDisposeAndRequests(
11011096
}
11021097
}
11031098
assertSubscriber2.await().assertTerminated();
1104-
if (interactionType2 != REQUEST_FNF) {
1099+
if (interactionType2 != REQUEST_FNF && interactionType2 != METADATA_PUSH) {
11051100
assertSubscriber2.assertError(ClosedChannelException.class);
11061101
} else {
11071102
try {

rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,6 @@
8484
import reactor.core.publisher.Hooks;
8585
import reactor.core.publisher.Mono;
8686
import reactor.core.publisher.Operators;
87-
import reactor.core.scheduler.Scheduler;
88-
import reactor.core.scheduler.Schedulers;
8987
import reactor.test.publisher.TestPublisher;
9088
import reactor.test.util.RaceTestUtils;
9189

@@ -340,7 +338,6 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
340338

341339
@Test
342340
public void checkNoLeaksOnRacingBetweenDownstreamCancelAndOnNextFromRequestChannelTest1() {
343-
Scheduler parallel = Schedulers.parallel();
344341
Hooks.onErrorDropped((e) -> {});
345342
ByteBufAllocator allocator = rule.alloc();
346343
for (int i = 0; i < 10000; i++) {
@@ -366,17 +363,13 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
366363
ByteBuf requestNFrame = RequestNFrameCodec.encode(allocator, 1, Integer.MAX_VALUE);
367364
FluxSink<Payload> sink = sinks[0];
368365
RaceTestUtils.race(
369-
() ->
370-
RaceTestUtils.race(
371-
() -> rule.connection.addToReceivedBuffer(requestNFrame),
372-
() -> rule.connection.addToReceivedBuffer(cancelFrame),
373-
parallel),
366+
() -> rule.connection.addToReceivedBuffer(requestNFrame),
367+
() -> rule.connection.addToReceivedBuffer(cancelFrame),
374368
() -> {
375369
sink.next(ByteBufPayload.create("d1", "m1"));
376370
sink.next(ByteBufPayload.create("d2", "m2"));
377371
sink.next(ByteBufPayload.create("d3", "m3"));
378-
},
379-
parallel);
372+
});
380373

381374
Assertions.assertThat(rule.connection.getSent()).allMatch(ReferenceCounted::release);
382375

@@ -387,7 +380,6 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
387380
@Test
388381
public void
389382
checkNoLeaksOnRacingBetweenDownstreamCancelAndOnNextFromUpstreamOnErrorFromRequestChannelTest1() {
390-
Scheduler parallel = Schedulers.parallel();
391383
Hooks.onErrorDropped((e) -> {});
392384
ByteBufAllocator allocator = rule.alloc();
393385
for (int i = 0; i < 10000; i++) {
@@ -453,18 +445,14 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
453445

454446
FluxSink<Payload> sink = sinks[0];
455447
RaceTestUtils.race(
456-
() ->
457-
RaceTestUtils.race(
458-
() -> rule.connection.addToReceivedBuffer(requestNFrame),
459-
() -> rule.connection.addToReceivedBuffer(nextFrame1, nextFrame2, nextFrame3),
460-
parallel),
448+
() -> rule.connection.addToReceivedBuffer(requestNFrame),
449+
() -> rule.connection.addToReceivedBuffer(nextFrame1, nextFrame2, nextFrame3),
461450
() -> {
462451
sink.next(np1);
463452
sink.next(np2);
464453
sink.next(np3);
465454
sink.error(new RuntimeException());
466-
},
467-
parallel);
455+
});
468456

469457
Assertions.assertThat(rule.connection.getSent()).allMatch(ReferenceCounted::release);
470458

@@ -484,7 +472,6 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
484472

485473
@Test
486474
public void checkNoLeaksOnRacingBetweenDownstreamCancelAndOnNextFromRequestStreamTest1() {
487-
Scheduler parallel = Schedulers.parallel();
488475
Hooks.onErrorDropped((e) -> {});
489476
ByteBufAllocator allocator = rule.alloc();
490477
for (int i = 0; i < 10000; i++) {
@@ -510,8 +497,7 @@ public Flux<Payload> requestStream(Payload payload) {
510497
sink.next(ByteBufPayload.create("d1", "m1"));
511498
sink.next(ByteBufPayload.create("d2", "m2"));
512499
sink.next(ByteBufPayload.create("d3", "m3"));
513-
},
514-
parallel);
500+
});
515501

516502
Assertions.assertThat(rule.connection.getSent()).allMatch(ReferenceCounted::release);
517503

@@ -521,7 +507,6 @@ public Flux<Payload> requestStream(Payload payload) {
521507

522508
@Test
523509
public void checkNoLeaksOnRacingBetweenDownstreamCancelAndOnNextFromRequestResponseTest1() {
524-
Scheduler parallel = Schedulers.parallel();
525510
Hooks.onErrorDropped((e) -> {});
526511
ByteBufAllocator allocator = rule.alloc();
527512
for (int i = 0; i < 10000; i++) {
@@ -550,8 +535,7 @@ public void subscribe(CoreSubscriber<? super Payload> actual) {
550535
() -> rule.connection.addToReceivedBuffer(cancelFrame),
551536
() -> {
552537
sources[0].complete(ByteBufPayload.create("d1", "m1"));
553-
},
554-
parallel);
538+
});
555539

556540
Assertions.assertThat(rule.connection.getSent()).allMatch(ReferenceCounted::release);
557541

0 commit comments

Comments
 (0)