Skip to content

Commit c49704b

Browse files
committed
Merge pull request #2244 from davidmoten/take-last-unsub
OperatorTakeLast add check for isUnsubscribed to fast path
2 parents 5839720 + 36b323e commit c49704b

File tree

2 files changed

+31
-0
lines changed

2 files changed

+31
-0
lines changed

src/main/java/rx/internal/operators/TakeLastQueueProducer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ void emit(long previousRequested) {
7070
if (previousRequested == 0) {
7171
try {
7272
for (Object value : deque) {
73+
if (subscriber.isUnsubscribed())
74+
return;
7375
notification.accept(subscriber, value);
7476
}
7577
} catch (Throwable e) {

src/test/java/rx/internal/operators/OperatorTakeLastTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.mockito.Mockito.verify;
2525

2626
import java.util.Arrays;
27+
import java.util.concurrent.atomic.AtomicInteger;
2728

2829
import org.junit.Test;
2930
import org.mockito.InOrder;
@@ -264,4 +265,32 @@ public void onNext(Integer integer) {
264265
}
265266
});
266267
}
268+
269+
@Test
270+
public void testUnsubscribeTakesEffectEarlyOnFastPath() {
271+
final AtomicInteger count = new AtomicInteger();
272+
Observable.range(0, 100000).takeLast(100000).subscribe(new Subscriber<Integer>() {
273+
274+
@Override
275+
public void onStart() {
276+
request(Long.MAX_VALUE);
277+
}
278+
279+
@Override
280+
public void onCompleted() {
281+
282+
}
283+
284+
@Override
285+
public void onError(Throwable e) {
286+
}
287+
288+
@Override
289+
public void onNext(Integer integer) {
290+
count.incrementAndGet();
291+
unsubscribe();
292+
}
293+
});
294+
assertEquals(1,count.get());
295+
}
267296
}

0 commit comments

Comments
 (0)