Skip to content

Commit 1927bf4

Browse files
authored
bumps libs versions and provides few UnboundedProcessor fixes (#1028)
1 parent efd1269 commit 1927bf4

File tree

6 files changed

+53
-31
lines changed

6 files changed

+53
-31
lines changed

build.gradle

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616

1717
plugins {
1818
id 'com.github.sherter.google-java-format' version '0.9' apply false
19-
id 'me.champeau.jmh' version '0.6.4' apply false
19+
id 'me.champeau.jmh' version '0.6.6' apply false
2020
id 'io.spring.dependency-management' version '1.0.11.RELEASE' apply false
2121
id 'io.morethan.jmhreport' version '0.9.0' apply false
22-
id "io.github.reyerizo.gradle.jcstress" version "0.8.11" apply false
22+
id 'io.github.reyerizo.gradle.jcstress' version '0.8.11' apply false
23+
id 'com.github.vlsi.gradle-extensions' version '1.76' apply false
2324
}
2425

2526
boolean isCiServer = ["CI", "CONTINUOUS_INTEGRATION", "TRAVIS", "CIRCLECI", "bamboo_planKey", "GITHUB_ACTION"].with {
@@ -30,19 +31,20 @@ boolean isCiServer = ["CI", "CONTINUOUS_INTEGRATION", "TRAVIS", "CIRCLECI", "bam
3031
subprojects {
3132
apply plugin: 'io.spring.dependency-management'
3233
apply plugin: 'com.github.sherter.google-java-format'
34+
apply plugin: 'com.github.vlsi.gradle-extensions'
3335

34-
ext['reactor-bom.version'] = '2020.0.7'
36+
ext['reactor-bom.version'] = '2020.0.12'
3537
ext['logback.version'] = '1.2.3'
36-
ext['netty-bom.version'] = '4.1.64.Final'
37-
ext['netty-boringssl.version'] = '2.0.39.Final'
38+
ext['netty-bom.version'] = '4.1.70.Final'
39+
ext['netty-boringssl.version'] = '2.0.45.Final'
3840
ext['hdrhistogram.version'] = '2.1.12'
39-
ext['mockito.version'] = '3.10.0'
41+
ext['mockito.version'] = '3.12.4'
4042
ext['slf4j.version'] = '1.7.30'
41-
ext['jmh.version'] = '1.31'
42-
ext['junit.version'] = '5.7.2'
43+
ext['jmh.version'] = '1.33'
44+
ext['junit.version'] = '5.8.1'
4345
ext['hamcrest.version'] = '1.3'
44-
ext['micrometer.version'] = '1.6.7'
45-
ext['assertj.version'] = '3.19.0'
46+
ext['micrometer.version'] = '1.7.5'
47+
ext['assertj.version'] = '3.21.0'
4648
ext['netflix.limits.version'] = '0.3.6'
4749
ext['bouncycastle-bcpkix.version'] = '1.68'
4850

@@ -123,6 +125,7 @@ subprojects {
123125
}
124126

125127
plugins.withType(JavaPlugin) {
128+
126129
compileJava {
127130
sourceCompatibility = 1.8
128131

@@ -198,7 +201,7 @@ subprojects {
198201
if (JavaVersion.current().isJava9Compatible()) {
199202
println "Java 9+: lowering MaxGCPauseMillis to 20ms in ${project.name} ${name}"
200203
println "Java 9+: enabling leak detection [ADVANCED]"
201-
jvmArgs = ["-XX:MaxGCPauseMillis=20", "-Dio.netty.leakDetection.level=ADVANCED"]
204+
jvmArgs = ["-XX:MaxGCPauseMillis=20", "-Dio.netty.leakDetection.level=ADVANCED", "-Dio.netty.leakDetection.samplingInterval=32"]
202205
}
203206

204207
systemProperty("java.awt.headless", "true")
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
3-
distributionUrl=https\://services.gradle.org/distributions/gradle-7.0.2-bin.zip
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-7.3-bin.zip
44
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists

rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,7 @@ public void onNextPrioritized(ByteBuf t) {
146146
return;
147147
}
148148

149-
if (isWorkInProgress(previousState)
150-
|| isCancelled(previousState)
151-
|| isDisposed(previousState)
152-
|| isTerminated(previousState)) {
149+
if (isWorkInProgress(previousState)) {
153150
return;
154151
}
155152

@@ -185,10 +182,7 @@ public void onNext(ByteBuf t) {
185182
return;
186183
}
187184

188-
if (isWorkInProgress(previousState)
189-
|| isCancelled(previousState)
190-
|| isDisposed(previousState)
191-
|| isTerminated(previousState)) {
185+
if (isWorkInProgress(previousState)) {
192186
return;
193187
}
194188

@@ -449,6 +443,7 @@ public void subscribe(CoreSubscriber<? super ByteBuf> actual) {
449443

450444
if (isCancelled(previousState)) {
451445
clearAndFinalize(this);
446+
return;
452447
}
453448

454449
if (isDisposed(previousState)) {

rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.slf4j.Logger;
3737
import org.slf4j.LoggerFactory;
3838
import reactor.core.CoreSubscriber;
39+
import reactor.core.Disposable;
3940
import reactor.core.publisher.Mono;
4041
import reactor.core.publisher.Operators;
4142
import reactor.util.function.Tuple2;
@@ -58,6 +59,7 @@ public class ClientRSocketSession
5859
final boolean cleanupStoreOnKeepAlive;
5960
final ByteBuf resumeToken;
6061
final String session;
62+
final Disposable reconnectDisposable;
6163

6264
volatile Subscription s;
6365
static final AtomicReferenceFieldUpdater<ClientRSocketSession, Subscription> S =
@@ -110,10 +112,22 @@ public ClientRSocketSession(
110112
this.resumableConnection = resumableDuplexConnection;
111113

112114
resumableDuplexConnection.onClose().doFinally(__ -> dispose()).subscribe();
113-
resumableDuplexConnection.onActiveConnectionClosed().subscribe(this::reconnect);
115+
116+
this.reconnectDisposable =
117+
resumableDuplexConnection.onActiveConnectionClosed().subscribe(this::reconnect);
114118
}
115119

116120
void reconnect(int index) {
121+
if (this.s == Operators.cancelledSubscription()) {
122+
if (logger.isDebugEnabled()) {
123+
logger.debug(
124+
"Side[client]|Session[{}]. Connection[{}] is lost. Reconnecting rejected since session is closed",
125+
session,
126+
index);
127+
}
128+
return;
129+
}
130+
117131
keepAliveSupport.stop();
118132
if (logger.isDebugEnabled()) {
119133
logger.debug(
@@ -147,6 +161,8 @@ public void onImpliedPosition(long remoteImpliedPos) {
147161
@Override
148162
public void dispose() {
149163
Operators.terminate(S, this);
164+
165+
reconnectDisposable.dispose();
150166
resumableConnection.dispose();
151167
resumableFramesStore.dispose();
152168

rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ void dispose(@Nullable Throwable e) {
253253

254254
framesSaverDisposable.dispose();
255255
activeReceivingSubscriber.dispose();
256-
savableFramesSender.dispose();
256+
savableFramesSender.onComplete();
257257
onConnectionClosedSink.tryEmitComplete();
258258

259259
if (e != null) {

rsocket-test/src/main/java/io/rsocket/test/TransportTest.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.reactivestreams.Subscription;
5858
import reactor.core.CoreSubscriber;
5959
import reactor.core.Disposable;
60+
import reactor.core.Disposables;
6061
import reactor.core.Exceptions;
6162
import reactor.core.Fuseable;
6263
import reactor.core.publisher.Flux;
@@ -641,7 +642,11 @@ public String expectedPayloadMetadata() {
641642
}
642643

643644
public void awaitClosed() {
644-
server.onClose().and(client.onClose()).block(Duration.ofMinutes(1));
645+
server
646+
.onClose()
647+
.onErrorResume(__ -> Mono.empty())
648+
.and(client.onClose().onErrorResume(__ -> Mono.empty()))
649+
.block(Duration.ofMinutes(1));
645650
}
646651

647652
private static class AsyncDuplexConnection implements DuplexConnection {
@@ -706,6 +711,7 @@ private static class DisconnectingDuplexConnection implements DuplexConnection {
706711
private final String tag;
707712
final DuplexConnection source;
708713
final Duration delay;
714+
final Disposable.Swap disposables = Disposables.swap();
709715

710716
DisconnectingDuplexConnection(String tag, DuplexConnection source, Duration delay) {
711717
this.tag = tag;
@@ -715,6 +721,7 @@ private static class DisconnectingDuplexConnection implements DuplexConnection {
715721

716722
@Override
717723
public void dispose() {
724+
disposables.dispose();
718725
source.dispose();
719726
}
720727

@@ -743,14 +750,15 @@ public Flux<ByteBuf> receive() {
743750
bb -> {
744751
if (!receivedFirst) {
745752
receivedFirst = true;
746-
Mono.delay(delay)
747-
.takeUntilOther(source.onClose())
748-
.subscribe(
749-
__ -> {
750-
logger.warn(
751-
"Tag {}. Disposing Connection[{}]", tag, source.hashCode());
752-
source.dispose();
753-
});
753+
disposables.replace(
754+
Mono.delay(delay)
755+
.takeUntilOther(source.onClose())
756+
.subscribe(
757+
__ -> {
758+
logger.warn(
759+
"Tag {}. Disposing Connection[{}]", tag, source.hashCode());
760+
source.dispose();
761+
}));
754762
}
755763
});
756764
}

0 commit comments

Comments
 (0)