Skip to content

Commit 694d5fb

Browse files
committed
adds jcstress support
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent eab6754 commit 694d5fb

File tree

5 files changed

+305
-3
lines changed

5 files changed

+305
-3
lines changed

build.gradle

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@
1515
*/
1616

1717
plugins {
18-
id 'com.github.sherter.google-java-format' version '0.8' apply false
18+
id 'com.github.sherter.google-java-format' version '0.9' apply false
1919
id 'com.jfrog.artifactory' version '4.15.2' apply false
2020
id 'com.jfrog.bintray' version '1.8.5' apply false
21-
id 'me.champeau.gradle.jmh' version '0.5.0' apply false
22-
id 'io.spring.dependency-management' version '1.0.9.RELEASE' apply false
21+
id 'me.champeau.gradle.jmh' version '0.5.3' apply false
22+
id 'io.spring.dependency-management' version '1.0.11.RELEASE' apply false
2323
id 'io.morethan.jmhreport' version '0.9.0' apply false
24+
id 'com.github.erizo.gradle.jcstress' version '0.8.8' apply false
2425
}
2526

2627
boolean isCiServer = ["CI", "CONTINUOUS_INTEGRATION", "TRAVIS", "CIRCLECI", "bamboo_planKey", "GITHUB_ACTION"].with {

rsocket-core/build.gradle

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ plugins {
2121
id 'com.jfrog.bintray'
2222
id 'io.morethan.jmhreport'
2323
id 'me.champeau.gradle.jmh'
24+
id 'com.github.erizo.gradle.jcstress'
2425
}
2526

2627
dependencies {
@@ -43,6 +44,15 @@ dependencies {
4344
testCompileOnly 'junit:junit'
4445
testImplementation 'org.hamcrest:hamcrest-library'
4546
testRuntimeOnly 'org.junit.vintage:junit-vintage-engine'
47+
48+
// jcstressImplementation(project(":rsocket-core"))
49+
jcstressImplementation "ch.qos.logback:logback-classic"
50+
}
51+
52+
jcstress {
53+
mode = 'default' //quick, default, tough
54+
jcstressDependency = "org.openjdk.jcstress:jcstress-core:0.8"
4655
}
56+
tasks.check.dependsOn(tasks.jcstress)
4757

4858
description = "Core functionality for the RSocket library"
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/*
2+
* Copyright (c) 2020-Present Pivotal Software Inc, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket.core;
17+
18+
import java.util.ArrayList;
19+
import java.util.List;
20+
import java.util.concurrent.CopyOnWriteArrayList;
21+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
22+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
23+
import java.util.function.Consumer;
24+
import org.reactivestreams.Subscription;
25+
import reactor.core.CoreSubscriber;
26+
import reactor.core.publisher.Operators;
27+
import reactor.util.context.Context;
28+
29+
public class StressSubscriber<T> implements CoreSubscriber<T> {
30+
31+
enum Operation {
32+
ON_NEXT,
33+
ON_ERROR,
34+
ON_COMPLETE,
35+
ON_SUBSCRIBE
36+
}
37+
38+
final long initRequest;
39+
40+
final Context context;
41+
42+
volatile Subscription subscription;
43+
44+
@SuppressWarnings("rawtypes")
45+
static final AtomicReferenceFieldUpdater<StressSubscriber, Subscription> S =
46+
AtomicReferenceFieldUpdater.newUpdater(
47+
StressSubscriber.class, Subscription.class, "subscription");
48+
49+
public Throwable error;
50+
51+
public List<Throwable> droppedErrors = new CopyOnWriteArrayList<>();
52+
53+
public List<T> values = new ArrayList<>();
54+
55+
public volatile Operation guard;
56+
57+
@SuppressWarnings("rawtypes")
58+
static final AtomicReferenceFieldUpdater<StressSubscriber, Operation> GUARD =
59+
AtomicReferenceFieldUpdater.newUpdater(StressSubscriber.class, Operation.class, "guard");
60+
61+
public volatile boolean concurrentOnNext;
62+
63+
public volatile boolean concurrentOnError;
64+
65+
public volatile boolean concurrentOnComplete;
66+
67+
public volatile boolean concurrentOnSubscribe;
68+
69+
public volatile int onNextCalls;
70+
71+
@SuppressWarnings("rawtypes")
72+
static final AtomicIntegerFieldUpdater<StressSubscriber> ON_NEXT_CALLS =
73+
AtomicIntegerFieldUpdater.newUpdater(StressSubscriber.class, "onNextCalls");
74+
75+
public volatile int onNextDiscarded;
76+
77+
@SuppressWarnings("rawtypes")
78+
static final AtomicIntegerFieldUpdater<StressSubscriber> ON_NEXT_DISCARDED =
79+
AtomicIntegerFieldUpdater.newUpdater(StressSubscriber.class, "onNextDiscarded");
80+
81+
public volatile int onErrorCalls;
82+
83+
@SuppressWarnings("rawtypes")
84+
static final AtomicIntegerFieldUpdater<StressSubscriber> ON_ERROR_CALLS =
85+
AtomicIntegerFieldUpdater.newUpdater(StressSubscriber.class, "onErrorCalls");
86+
87+
public volatile int onCompleteCalls;
88+
89+
@SuppressWarnings("rawtypes")
90+
static final AtomicIntegerFieldUpdater<StressSubscriber> ON_COMPLETE_CALLS =
91+
AtomicIntegerFieldUpdater.newUpdater(StressSubscriber.class, "onCompleteCalls");
92+
93+
public volatile int onSubscribeCalls;
94+
95+
@SuppressWarnings("rawtypes")
96+
static final AtomicIntegerFieldUpdater<StressSubscriber> ON_SUBSCRIBE_CALLS =
97+
AtomicIntegerFieldUpdater.newUpdater(StressSubscriber.class, "onSubscribeCalls");
98+
99+
/** Build a {@link StressSubscriber} that makes an unbounded request upon subscription. */
100+
public StressSubscriber() {
101+
this(Long.MAX_VALUE);
102+
}
103+
104+
/**
105+
* Build a {@link StressSubscriber} that requests the provided amount in {@link
106+
* #onSubscribe(Subscription)}. Use {@code 0} to avoid any initial request upon subscription.
107+
*
108+
* @param initRequest the requested amount upon subscription, or zero to disable initial request
109+
*/
110+
public StressSubscriber(long initRequest) {
111+
this.initRequest = initRequest;
112+
this.context =
113+
Operators.enableOnDiscard(
114+
Context.of(
115+
"reactor.onErrorDropped.local",
116+
(Consumer<Throwable>)
117+
throwable -> {
118+
droppedErrors.add(throwable);
119+
}),
120+
(__) -> ON_NEXT_DISCARDED.incrementAndGet(this));
121+
}
122+
123+
@Override
124+
public Context currentContext() {
125+
return this.context;
126+
}
127+
128+
@Override
129+
public void onSubscribe(Subscription subscription) {
130+
if (!GUARD.compareAndSet(this, null, Operation.ON_SUBSCRIBE)) {
131+
concurrentOnSubscribe = true;
132+
} else {
133+
boolean wasSet = Operators.setOnce(S, this, subscription);
134+
GUARD.compareAndSet(this, Operation.ON_SUBSCRIBE, null);
135+
if (wasSet) {
136+
if (initRequest > 0) {
137+
subscription.request(initRequest);
138+
}
139+
}
140+
}
141+
ON_SUBSCRIBE_CALLS.incrementAndGet(this);
142+
}
143+
144+
@Override
145+
public void onNext(T value) {
146+
if (!GUARD.compareAndSet(this, null, Operation.ON_NEXT)) {
147+
concurrentOnNext = true;
148+
} else {
149+
values.add(value);
150+
GUARD.compareAndSet(this, Operation.ON_NEXT, null);
151+
}
152+
ON_NEXT_CALLS.incrementAndGet(this);
153+
}
154+
155+
@Override
156+
public void onError(Throwable throwable) {
157+
if (!GUARD.compareAndSet(this, null, Operation.ON_ERROR)) {
158+
concurrentOnError = true;
159+
} else {
160+
GUARD.compareAndSet(this, Operation.ON_ERROR, null);
161+
}
162+
error = throwable;
163+
ON_ERROR_CALLS.incrementAndGet(this);
164+
}
165+
166+
@Override
167+
public void onComplete() {
168+
if (!GUARD.compareAndSet(this, null, Operation.ON_COMPLETE)) {
169+
concurrentOnComplete = true;
170+
} else {
171+
GUARD.compareAndSet(this, Operation.ON_COMPLETE, null);
172+
}
173+
ON_COMPLETE_CALLS.incrementAndGet(this);
174+
}
175+
176+
public void request(long n) {
177+
if (Operators.validate(n)) {
178+
Subscription s = this.subscription;
179+
if (s != null) {
180+
s.request(n);
181+
}
182+
}
183+
}
184+
185+
public void cancel() {
186+
Operators.terminate(S, this);
187+
}
188+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright (c) 2020-Present Pivotal Software Inc, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket.core;
17+
18+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
19+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
20+
import org.reactivestreams.Subscription;
21+
import reactor.core.CoreSubscriber;
22+
import reactor.core.publisher.Operators;
23+
24+
public class StressSubscription<T> implements Subscription {
25+
26+
CoreSubscriber<? super T> actual;
27+
28+
public volatile int subscribes;
29+
30+
@SuppressWarnings("rawtypes")
31+
static final AtomicIntegerFieldUpdater<StressSubscription> SUBSCRIBES =
32+
AtomicIntegerFieldUpdater.newUpdater(StressSubscription.class, "subscribes");
33+
34+
public volatile long requested;
35+
36+
@SuppressWarnings("rawtypes")
37+
static final AtomicLongFieldUpdater<StressSubscription> REQUESTED =
38+
AtomicLongFieldUpdater.newUpdater(StressSubscription.class, "requested");
39+
40+
public volatile int requestsCount;
41+
42+
@SuppressWarnings("rawtypes")
43+
static final AtomicIntegerFieldUpdater<StressSubscription> REQUESTS_COUNT =
44+
AtomicIntegerFieldUpdater.newUpdater(StressSubscription.class, "requestsCount");
45+
46+
public volatile boolean cancelled;
47+
48+
void subscribe(CoreSubscriber<? super T> actual) {
49+
this.actual = actual;
50+
actual.onSubscribe(this);
51+
SUBSCRIBES.getAndIncrement(this);
52+
}
53+
54+
@Override
55+
public void request(long n) {
56+
REQUESTS_COUNT.incrementAndGet(this);
57+
Operators.addCap(REQUESTED, this, n);
58+
}
59+
60+
@Override
61+
public void cancel() {
62+
cancelled = true;
63+
}
64+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<!--
2+
~ Copyright (c) 2011-present Pivotal Software Inc, All Rights Reserved.
3+
~
4+
~ Licensed under the Apache License, Version 2.0 (the "License");
5+
~ you may not use this file except in compliance with the License.
6+
~ You may obtain a copy of the License at
7+
~
8+
~ https://www.apache.org/licenses/LICENSE-2.0
9+
~
10+
~ Unless required by applicable law or agreed to in writing, software
11+
~ distributed under the License is distributed on an "AS IS" BASIS,
12+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
~ See the License for the specific language governing permissions and
14+
~ limitations under the License.
15+
-->
16+
17+
<configuration>
18+
19+
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
20+
<encoder>
21+
<pattern>
22+
%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
23+
</pattern>
24+
</encoder>
25+
</appender>
26+
27+
<!--<logger name="reactor.core.publisher.tck" level="debug"/>-->
28+
<logger name="reactor" level="info"/>
29+
<logger name="wqp" level="off"/>
30+
<logger name="after-flatmap" level="debug"/>
31+
<logger name="logError.default" level="trace"/>
32+
<logger name="logError.fine" level="trace"/>
33+
<logger name="logError.finest" level="trace"/>
34+
35+
<root level="info">
36+
<appender-ref ref="stdout"/>
37+
</root>
38+
39+
</configuration>

0 commit comments

Comments
 (0)