Skip to content

Commit 6e26b76

Browse files
Move Other Rx Classes to .internal.rx
1 parent c4a2762 commit 6e26b76

13 files changed

+97
-12
lines changed

src/main/java/io/reactivesocket/ReactiveSocket.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@
2626
import org.reactivestreams.Subscriber;
2727
import org.reactivestreams.Subscription;
2828

29-
import io.reactivesocket.internal.CompositeCompletable;
30-
import io.reactivesocket.internal.CompositeDisposable;
3129
import io.reactivesocket.internal.Requester;
3230
import io.reactivesocket.internal.Responder;
31+
import io.reactivesocket.internal.rx.CompositeCompletable;
32+
import io.reactivesocket.internal.rx.CompositeDisposable;
3333
import io.reactivesocket.observable.Disposable;
3434
import io.reactivesocket.observable.Observable;
3535
import io.reactivesocket.observable.Observer;

src/main/java/io/reactivesocket/internal/PublisherUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727

2828
import io.reactivesocket.Frame;
2929
import io.reactivesocket.Payload;
30+
import io.reactivesocket.internal.rx.BackpressureHelper;
31+
import io.reactivesocket.internal.rx.BackpressureUtils;
32+
import io.reactivesocket.internal.rx.EmptySubscription;
33+
import io.reactivesocket.internal.rx.SubscriptionHelper;
3034

3135
public class PublisherUtils {
3236

src/main/java/io/reactivesocket/internal/Requester.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838
import io.reactivesocket.exceptions.CancelException;
3939
import io.reactivesocket.exceptions.Exceptions;
4040
import io.reactivesocket.exceptions.Retryable;
41+
import io.reactivesocket.internal.rx.BackpressureUtils;
42+
import io.reactivesocket.internal.rx.EmptyDisposable;
43+
import io.reactivesocket.internal.rx.EmptySubscription;
4144
import io.reactivesocket.observable.Disposable;
4245
import io.reactivesocket.observable.Observer;
4346
import uk.co.real_logic.agrona.collections.Int2ObjectHashMap;

src/main/java/io/reactivesocket/internal/Responder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.reactivestreams.Subscription;
2929

3030
import io.reactivesocket.exceptions.SetupException;
31+
import io.reactivesocket.internal.rx.EmptyDisposable;
32+
import io.reactivesocket.internal.rx.EmptySubscription;
3133
import io.reactivesocket.observable.Disposable;
3234
import io.reactivesocket.observable.Observer;
3335
import uk.co.real_logic.agrona.collections.Int2ObjectHashMap;

src/main/java/io/reactivesocket/internal/BackpressureHelper.java renamed to src/main/java/io/reactivesocket/internal/rx/BackpressureHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
1111
* the License for the specific language governing permissions and limitations under the License.
1212
*/
13-
package io.reactivesocket.internal;
13+
package io.reactivesocket.internal.rx;
1414

1515
import java.util.concurrent.atomic.*;
1616

src/main/java/io/reactivesocket/internal/BackpressureUtils.java renamed to src/main/java/io/reactivesocket/internal/rx/BackpressureUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.reactivesocket.internal;
1+
package io.reactivesocket.internal.rx;
22

33
/**
44
* Copyright 2015 Netflix, Inc.

src/main/java/io/reactivesocket/internal/BooleanDisposable.java renamed to src/main/java/io/reactivesocket/internal/rx/BooleanDisposable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.reactivesocket.internal;
1+
package io.reactivesocket.internal.rx;
22

33
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
44

src/main/java/io/reactivesocket/internal/CompositeCompletable.java renamed to src/main/java/io/reactivesocket/internal/rx/CompositeCompletable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.reactivesocket.internal;
1+
package io.reactivesocket.internal.rx;
22

33
import java.util.HashSet;
44
import java.util.Set;

src/main/java/io/reactivesocket/internal/CompositeDisposable.java renamed to src/main/java/io/reactivesocket/internal/rx/CompositeDisposable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.reactivesocket.internal;
1+
package io.reactivesocket.internal.rx;
22

33
import java.util.HashSet;
44
import java.util.Set;

src/main/java/io/reactivesocket/internal/EmptyDisposable.java renamed to src/main/java/io/reactivesocket/internal/rx/EmptyDisposable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.internal;
16+
package io.reactivesocket.internal.rx;
1717

1818
import io.reactivesocket.observable.Disposable;
1919

src/main/java/io/reactivesocket/internal/EmptySubscription.java renamed to src/main/java/io/reactivesocket/internal/rx/EmptySubscription.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* the License for the specific language governing permissions and limitations under the License.
1212
*/
1313

14-
package io.reactivesocket.internal;
14+
package io.reactivesocket.internal.rx;
1515

1616
import org.reactivestreams.*;
1717

src/main/java/io/reactivesocket/internal/rx/SubscriptionArbiter.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,6 @@
3030

3131
import org.reactivestreams.Subscription;
3232

33-
import io.reactivesocket.internal.BackpressureHelper;
34-
import io.reactivesocket.internal.SubscriptionHelper;
35-
3633
/**
3734
* Arbitrates requests and cancellation between Subscriptions.
3835
*/
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket.internal.rx;
15+
16+
import org.reactivestreams.*;
17+
18+
public enum SubscriptionHelper {
19+
;
20+
21+
public static boolean validateSubscription(Subscription current, Subscription next) {
22+
if (next == null) {
23+
return true;
24+
}
25+
if (current != null) {
26+
next.cancel();
27+
return true;
28+
}
29+
return false;
30+
}
31+
32+
/**
33+
* <p>
34+
* Make sure error reporting via s.onError is serialized.
35+
*
36+
* @param current
37+
* @param next
38+
* @param s
39+
* @return
40+
*/
41+
public static boolean validateSubscription(Subscription current, Subscription next, Subscriber<?> s) {
42+
if (next == null) {
43+
s.onError(new NullPointerException("next is null"));
44+
return true;
45+
}
46+
if (current != null) {
47+
next.cancel();
48+
return true;
49+
}
50+
return false;
51+
}
52+
53+
public static boolean validateRequest(long n) {
54+
if (n <= 0) {
55+
return true;
56+
}
57+
return false;
58+
}
59+
60+
/**
61+
* <p>
62+
* Make sure error reporting via s.onError is serialized.
63+
*
64+
* @param n
65+
* @param current
66+
* @param s
67+
* @return
68+
*/
69+
public static boolean validateRequest(long n, Subscription current, Subscriber<?> s) {
70+
if (n <= 0) {
71+
if (current != null) {
72+
current.cancel();
73+
}
74+
s.onError(new IllegalArgumentException("n > 0 required but it was " + n));
75+
return true;
76+
}
77+
return false;
78+
}
79+
}

0 commit comments

Comments
 (0)