Skip to content

Commit a21399d

Browse files
committed
Make subjects/processors have public factory methods
1 parent b441481 commit a21399d

18 files changed

+75
-144
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupJoin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ void drain() {
241241
@SuppressWarnings("unchecked")
242242
TLeft left = (TLeft)val;
243243

244-
UnicastProcessor<TRight> up = new UnicastProcessor<TRight>();
244+
UnicastProcessor<TRight> up = UnicastProcessor.<TRight>create();
245245
int idx = leftIndex++;
246246
lefts.put(idx, up);
247247

src/main/java/io/reactivex/internal/operators/flowable/FlowableRepeatWhen.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public void subscribeActual(Subscriber<? super T> s) {
3939

4040
SerializedSubscriber<T> z = new SerializedSubscriber<T>(s);
4141

42-
FlowableProcessor<Object> processor = new UnicastProcessor<Object>(8).toSerialized();
42+
FlowableProcessor<Object> processor = UnicastProcessor.<Object>create(8).toSerialized();
4343

4444
Publisher<?> when;
4545

src/main/java/io/reactivex/internal/operators/flowable/FlowableRetryWhen.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public FlowableRetryWhen(Publisher<T> source,
3737
public void subscribeActual(Subscriber<? super T> s) {
3838
SerializedSubscriber<T> z = new SerializedSubscriber<T>(s);
3939

40-
FlowableProcessor<Throwable> processor = new UnicastProcessor<Throwable>(8).toSerialized();
40+
FlowableProcessor<Throwable> processor = UnicastProcessor.<Throwable>create(8).toSerialized();
4141

4242
Publisher<?> when;
4343

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindow.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public void onNext(T t) {
102102
if (i == 0) {
103103
getAndIncrement();
104104

105-
w = new UnicastProcessor<T>(bufferSize, this);
105+
w = UnicastProcessor.<T>create(bufferSize, this);
106106
window = w;
107107

108108
actual.onNext(w);
@@ -232,7 +232,7 @@ public void onNext(T t) {
232232
getAndIncrement();
233233

234234

235-
w = new UnicastProcessor<T>(bufferSize, this);
235+
w = UnicastProcessor.<T>create(bufferSize, this);
236236
window = w;
237237

238238
actual.onNext(w);
@@ -388,7 +388,7 @@ public void onNext(T t) {
388388
if (!cancelled) {
389389
getAndIncrement();
390390

391-
UnicastProcessor<T> w = new UnicastProcessor<T>(bufferSize, this);
391+
UnicastProcessor<T> w = UnicastProcessor.<T>create(bufferSize, this);
392392

393393
windows.offer(w);
394394

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundary.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public void onSubscribe(Subscription s) {
8686
return;
8787
}
8888

89-
UnicastProcessor<T> w = new UnicastProcessor<T>(bufferSize);
89+
UnicastProcessor<T> w = UnicastProcessor.<T>create(bufferSize);
9090

9191
long r = requested();
9292
if (r != 0L) {
@@ -226,7 +226,7 @@ void drainLoop() {
226226
continue;
227227
}
228228

229-
w = new UnicastProcessor<T>(bufferSize);
229+
w = UnicastProcessor.<T>create(bufferSize);
230230

231231
long r = requested();
232232
if (r != 0L) {

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ void drainLoop() {
263263
}
264264

265265

266-
w = new UnicastProcessor<T>(bufferSize);
266+
w = UnicastProcessor.<T>create(bufferSize);
267267

268268
long r = requested();
269269
if (r != 0L) {

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public void onSubscribe(Subscription s) {
104104
return;
105105
}
106106

107-
UnicastProcessor<T> w = new UnicastProcessor<T>(bufferSize);
107+
UnicastProcessor<T> w = UnicastProcessor.<T>create(bufferSize);
108108

109109
long r = requested();
110110
if (r != 0L) {
@@ -262,7 +262,7 @@ void drainLoop() {
262262
return;
263263
}
264264

265-
w = new UnicastProcessor<T>(bufferSize);
265+
w = UnicastProcessor.<T>create(bufferSize);
266266

267267
long r = requested();
268268
if (r != 0L) {

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public void onSubscribe(Subscription s) {
112112
}
113113
this.s = s;
114114

115-
window = new UnicastProcessor<T>(bufferSize);
115+
window = UnicastProcessor.<T>create(bufferSize);
116116

117117
Subscriber<? super Flowable<T>> a = actual;
118118
a.onSubscribe(this);
@@ -267,7 +267,7 @@ void drainLoop() {
267267
if (o == NEXT) {
268268
w.onComplete();
269269
if (!term) {
270-
w = new UnicastProcessor<T>(bufferSize);
270+
w = UnicastProcessor.<T>create(bufferSize);
271271
window = w;
272272

273273
long r = requested();
@@ -362,7 +362,7 @@ public void onSubscribe(Subscription s) {
362362
return;
363363
}
364364

365-
UnicastProcessor<T> w = new UnicastProcessor<T>(bufferSize);
365+
UnicastProcessor<T> w = UnicastProcessor.<T>create(bufferSize);
366366
window = w;
367367

368368
long r = requested();
@@ -416,7 +416,7 @@ public void onNext(T t) {
416416
long r = requested();
417417

418418
if (r != 0L) {
419-
w = new UnicastProcessor<T>(bufferSize);
419+
w = UnicastProcessor.<T>create(bufferSize);
420420
window = w;
421421
actual.onNext(w);
422422
if (r != Long.MAX_VALUE) {
@@ -558,7 +558,7 @@ void drainLoop() {
558558
if (isHolder) {
559559
ConsumerIndexHolder consumerIndexHolder = (ConsumerIndexHolder) o;
560560
if (producerIndex == consumerIndexHolder.index) {
561-
w = new UnicastProcessor<T>(bufferSize);
561+
w = UnicastProcessor.<T>create(bufferSize);
562562
window = w;
563563

564564
long r = requested();
@@ -591,7 +591,7 @@ void drainLoop() {
591591
long r = requested();
592592

593593
if (r != 0L) {
594-
w = new UnicastProcessor<T>(bufferSize);
594+
w = UnicastProcessor.<T>create(bufferSize);
595595
window = w;
596596
actual.onNext(w);
597597
if (r != Long.MAX_VALUE) {
@@ -699,7 +699,7 @@ public void onSubscribe(Subscription s) {
699699

700700
long r = requested();
701701
if (r != 0L) {
702-
final UnicastProcessor<T> w = new UnicastProcessor<T>(bufferSize);
702+
final UnicastProcessor<T> w = UnicastProcessor.<T>create(bufferSize);
703703
windows.add(w);
704704

705705
actual.onNext(w);
@@ -864,7 +864,7 @@ void drainLoop() {
864864

865865
long r = requested();
866866
if (r != 0L) {
867-
final UnicastProcessor<T> w = new UnicastProcessor<T>(bufferSize);
867+
final UnicastProcessor<T> w = UnicastProcessor.<T>create(bufferSize);
868868
ws.add(w);
869869
a.onNext(w);
870870
if (r != Long.MAX_VALUE) {
@@ -906,7 +906,7 @@ public void run() {
906906
@Override
907907
public void run() {
908908

909-
UnicastProcessor<T> w = new UnicastProcessor<T>(bufferSize);
909+
UnicastProcessor<T> w = UnicastProcessor.<T>create(bufferSize);
910910

911911
SubjectWork<T> sw = new SubjectWork<T>(w, true);
912912
if (!cancelled) {

src/main/java/io/reactivex/processors/ReplayProcessor.java

Lines changed: 7 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public static <T> ReplayProcessor<T> create() {
109109
* @return the created subject
110110
*/
111111
public static <T> ReplayProcessor<T> create(int capacityHint) {
112-
return new ReplayProcessor<T>(new UnboundedReplayBuffer<T>(capacityHint));
112+
return create(capacityHint);
113113
}
114114

115115
/**
@@ -133,7 +133,7 @@ public static <T> ReplayProcessor<T> create(int capacityHint) {
133133
* @return the created subject
134134
*/
135135
public static <T> ReplayProcessor<T> createWithSize(int maxSize) {
136-
return new ReplayProcessor<T>(maxSize);
136+
return new ReplayProcessor<T>(new UnboundedReplayBuffer<T>(maxSize));
137137
}
138138

139139
/**
@@ -186,7 +186,7 @@ public static <T> ReplayProcessor<T> createWithSize(int maxSize) {
186186
* @return the created subject
187187
*/
188188
public static <T> ReplayProcessor<T> createWithTime(long maxAge, TimeUnit unit, Scheduler scheduler) {
189-
return new ReplayProcessor<T>(maxAge, unit, scheduler);
189+
return new ReplayProcessor<T>(new SizeAndTimeBoundReplayBuffer<T>(Integer.MAX_VALUE, maxAge, unit, scheduler));
190190
}
191191

192192
/**
@@ -217,69 +217,26 @@ public static <T> ReplayProcessor<T> createWithTime(long maxAge, TimeUnit unit,
217217
* the maximum age of the contained items
218218
* @param unit
219219
* the time unit of {@code time}
220-
* @param size
220+
* @param maxSize
221221
* the maximum number of buffered items
222222
* @param scheduler
223223
* the {@link Scheduler} that provides the current time
224224
* @return the created subject
225225
*/
226-
public static <T> ReplayProcessor<T> createWithTimeAndSize(long maxAge, TimeUnit unit, Scheduler scheduler, int size) {
227-
return new ReplayProcessor<T>(maxAge, unit, scheduler, size);
226+
public static <T> ReplayProcessor<T> createWithTimeAndSize(long maxAge, TimeUnit unit, Scheduler scheduler, int maxSize) {
227+
return new ReplayProcessor<T>(new SizeAndTimeBoundReplayBuffer<T>(maxSize, maxAge, unit, scheduler));
228228
}
229229

230230
/**
231231
* Constructs a ReplayProcessor with the given custom ReplayBuffer instance.
232232
* @param buffer the ReplayBuffer instance, not null (not verified)
233233
*/
234234
@SuppressWarnings("unchecked")
235-
protected ReplayProcessor(ReplayBuffer<T> buffer) {
235+
ReplayProcessor(ReplayBuffer<T> buffer) {
236236
this.buffer = buffer;
237237
this.subscribers = new AtomicReference<ReplaySubscription<T>[]>(EMPTY);
238238
}
239239

240-
/**
241-
* Constructs an unbounded ReplayProcessor with capacity hint of 16.
242-
* @since 2.0
243-
*/
244-
public ReplayProcessor() {
245-
this(new UnboundedReplayBuffer<T>(16));
246-
}
247-
248-
/**
249-
* Constructs a size-bounded ReplayProcessor with the given maximum size.
250-
* @param maxSize the maximum items to keep for replay
251-
* @since 2.0
252-
*/
253-
public ReplayProcessor(int maxSize) {
254-
this(new SizeBoundReplayBuffer<T>(maxSize));
255-
}
256-
257-
/**
258-
* Constructs a time-bounded ReplayProcessor where the elements replayed on
259-
* fresh subscription start no older than the specified age.
260-
* @param maxAge the maximum age of events to retain
261-
* @param unit the time unit
262-
* @param scheduler the Scheduler that provides the notion of current time
263-
* @since 2.0
264-
*/
265-
public ReplayProcessor(long maxAge, TimeUnit unit, Scheduler scheduler) {
266-
this(new SizeAndTimeBoundReplayBuffer<T>(Integer.MAX_VALUE, maxAge, unit, scheduler));
267-
}
268-
269-
/**
270-
* Constructs a time and size-bounded ReplayProcessor where the elements replayed
271-
* on fresh subscription start no older than the specified age and the Processor
272-
* stores no more than size elements at once.
273-
* @param maxAge the maximum age of events to retain
274-
* @param unit the time unit
275-
* @param scheduler the Scheduler that provides the notion of current time
276-
* @param maxSize the maximum items to keep for replay
277-
* @since 2.0
278-
*/
279-
public ReplayProcessor(long maxAge, TimeUnit unit, Scheduler scheduler, int maxSize) {
280-
this(new SizeAndTimeBoundReplayBuffer<T>(maxSize, maxAge, unit, scheduler));
281-
}
282-
283240
@Override
284241
protected void subscribeActual(Subscriber<? super T> s) {
285242
ReplaySubscription<T> rs = new ReplaySubscription<T>(s, this);

src/main/java/io/reactivex/processors/UnicastProcessor.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,20 +61,46 @@ public final class UnicastProcessor<T> extends FlowableProcessor<T> {
6161
boolean enableOperatorFusion;
6262

6363
/**
64-
* Constructs an UnicastProcessor with the default capacity hint of {@link io.reactivex.Flowable#bufferSize()}
65-
* elements.
66-
* @since 2.0
64+
* Creates an UnicastSubject with an internal buffer capacity hint 16.
65+
* @param <T> the value type
66+
* @return an UnicastSubject instance
67+
*/
68+
public static <T> UnicastProcessor<T> create() {
69+
return new UnicastProcessor<T>(bufferSize());
70+
}
71+
72+
/**
73+
* Creates an UnicastProcessor with the given internal buffer capacity hint.
74+
* @param <T> the value type
75+
* @param capacityHint the hint to size the internal unbounded buffer
76+
* @return an UnicastProcessor instance
77+
*/
78+
public static <T> UnicastProcessor<T> create(int capacityHint) {
79+
return new UnicastProcessor<T>(capacityHint);
80+
}
81+
82+
/**
83+
* Creates an UnicastProcessor with the given internal buffer capacity hint and a callback for
84+
* the case when the single Subscriber cancels its subscription.
85+
*
86+
* <p>The callback, if not null, is called exactly once and
87+
* non-overlapped with any active replay.
88+
*
89+
* @param <T> the value type
90+
* @param capacityHint the hint to size the internal unbounded buffer
91+
* @param onCancelled the optional callback
92+
* @return an UnicastProcessor instance
6793
*/
68-
public UnicastProcessor() {
69-
this(bufferSize());
94+
public static <T> UnicastProcessor<T> create(int capacityHint, Runnable onCancelled) {
95+
return new UnicastProcessor<T>(capacityHint, onCancelled);
7096
}
7197

7298
/**
7399
* Creates an UnicastProcessor with the given capacity hint.
74100
* @param capacityHint the capacity hint for the internal, unbounded queue
75101
* @since 2.0
76102
*/
77-
public UnicastProcessor(int capacityHint) {
103+
UnicastProcessor(int capacityHint) {
78104
this.queue = new SpscLinkedArrayQueue<T>(verifyPositive(capacityHint, "capacityHint"));
79105
this.onTerminate = new AtomicReference<Runnable>();
80106
this.actual = new AtomicReference<Subscriber<? super T>>();
@@ -90,7 +116,7 @@ public UnicastProcessor(int capacityHint) {
90116
* @param onTerminate the callback to run when the Processor is terminated or cancelled, null allowed
91117
* @since 2.0
92118
*/
93-
public UnicastProcessor(int capacityHint, Runnable onTerminate) {
119+
UnicastProcessor(int capacityHint, Runnable onTerminate) {
94120
this.queue = new SpscLinkedArrayQueue<T>(verifyPositive(capacityHint, "capacityHint"));
95121
this.onTerminate = new AtomicReference<Runnable>(ObjectHelper.requireNonNull(onTerminate, "onTerminate"));
96122
this.actual = new AtomicReference<Subscriber<? super T>>();

src/main/java/io/reactivex/subjects/AsyncSubject.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public static <T> AsyncSubject<T> create() {
6060
* @since 2.0
6161
*/
6262
@SuppressWarnings("unchecked")
63-
public AsyncSubject() {
63+
AsyncSubject() {
6464
this.subscribers = new AtomicReference<AsyncDisposable<T>[]>(EMPTY);
6565
}
6666

src/main/java/io/reactivex/subjects/BehaviorSubject.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public static <T> BehaviorSubject<T> createDefault(T defaultValue) {
121121
* @since 2.0
122122
*/
123123
@SuppressWarnings("unchecked")
124-
public BehaviorSubject() {
124+
BehaviorSubject() {
125125
this.lock = new ReentrantReadWriteLock();
126126
this.readLock = lock.readLock();
127127
this.writeLock = lock.writeLock();
@@ -135,7 +135,7 @@ public BehaviorSubject() {
135135
* @throws NullPointerException if {@code defaultValue} is null
136136
* @since 2.0
137137
*/
138-
public BehaviorSubject(T defaultValue) {
138+
BehaviorSubject(T defaultValue) {
139139
this();
140140
this.value.lazySet(ObjectHelper.requireNonNull(defaultValue, "defaultValue is null"));
141141
}

src/main/java/io/reactivex/subjects/PublishSubject.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public static <T> PublishSubject<T> create() {
7272
* @since 2.0
7373
*/
7474
@SuppressWarnings("unchecked")
75-
public PublishSubject() {
75+
PublishSubject() {
7676
subscribers = new AtomicReference<PublishDisposable<T>[]>(EMPTY);
7777
}
7878

0 commit comments

Comments
 (0)