Skip to content

Commit 77ac15b

Browse files
committed
No longer using Notification<T> for scheduling throttled events.
1 parent 622d861 commit 77ac15b

File tree

1 file changed

+47
-21
lines changed

1 file changed

+47
-21
lines changed

rxjava-core/src/main/java/rx/operators/OperationThrottle.java

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,13 @@
1515
*/
1616
package rx.operators;
1717

18-
import static org.mockito.Matchers.*;
19-
import static org.mockito.Mockito.*;
18+
import static org.mockito.Matchers.any;
19+
import static org.mockito.Matchers.anyString;
20+
import static org.mockito.Mockito.inOrder;
21+
import static org.mockito.Mockito.mock;
22+
import static org.mockito.Mockito.never;
23+
import static org.mockito.Mockito.times;
24+
import static org.mockito.Mockito.verify;
2025

2126
import java.util.concurrent.Executors;
2227
import java.util.concurrent.TimeUnit;
@@ -27,7 +32,6 @@
2732
import org.junit.Test;
2833
import org.mockito.InOrder;
2934

30-
import rx.Notification;
3135
import rx.Observable;
3236
import rx.Observer;
3337
import rx.Scheduler;
@@ -119,25 +123,25 @@ public ThrottledObserver(Observer<T> observer, long timeout, TimeUnit unit, Sche
119123

120124
@Override
121125
public void onCompleted() {
122-
throttle(new Notification<T>());
126+
throttle(new ThrottledOnComplete<T>(observer));
123127
}
124128

125129
@Override
126130
public void onError(Exception e) {
127-
throttle(new Notification<T>(e));
131+
throttle(new ThrottledOnError<T>(observer, e));
128132
}
129133

130134
@Override
131135
public void onNext(final T args) {
132-
throttle(new Notification<T>(args));
136+
throttle(new ThrottledOnNext<T>(observer, args));
133137
}
134138

135-
private void throttle(final Notification<T> args) {
139+
private void throttle(Action0 action) {
136140
synchronized (subscription) {
137141
if (!timerHasExpired()) {
138142
subscription.get().unsubscribe();
139143
}
140-
subscription.set(scheduler.schedule(new ThrottleAction<T>(observer, args), timeout, unit));
144+
subscription.set(scheduler.schedule(action, timeout, unit));
141145
}
142146
}
143147

@@ -149,27 +153,49 @@ private boolean timerHasExpired() {
149153
}
150154
}
151155

152-
private static final class ThrottleAction<T> implements Action0 {
156+
private static final class ThrottledOnNext<T> implements Action0 {
153157

154158
private final Observer<T> observer;
155-
private final Notification<T> notification;
159+
private final T value;
156160

157-
public ThrottleAction(Observer<T> observer, Notification<T> notification) {
161+
public ThrottledOnNext(Observer<T> observer, T value) {
158162
this.observer = observer;
159-
this.notification = notification;
163+
this.value = value;
160164
}
161165

162166
@Override
163167
public void call() {
164-
if (notification.isOnNext()) {
165-
observer.onNext(notification.getValue());
166-
}
167-
else if (notification.isOnError()) {
168-
observer.onError(notification.getException());
169-
}
170-
else if (notification.isOnCompleted()) {
171-
observer.onCompleted();
172-
}
168+
observer.onNext(value);
169+
}
170+
}
171+
172+
private static final class ThrottledOnError<T> implements Action0 {
173+
174+
private final Observer<T> observer;
175+
private final Exception error;
176+
177+
public ThrottledOnError(Observer<T> observer, Exception error) {
178+
this.observer = observer;
179+
this.error = error;
180+
}
181+
182+
@Override
183+
public void call() {
184+
observer.onError(error);
185+
}
186+
}
187+
188+
private static final class ThrottledOnComplete<T> implements Action0 {
189+
190+
private final Observer<T> observer;
191+
192+
public ThrottledOnComplete(Observer<T> observer) {
193+
this.observer = observer;
194+
}
195+
196+
@Override
197+
public void call() {
198+
observer.onCompleted();
173199
}
174200
}
175201

0 commit comments

Comments
 (0)