Skip to content

Commit 53cd221

Browse files
committed
Merge pull request #69 from stevegury/stevegury/channel-fix
Responder: fix bug in Channel request-n
2 parents 0dd9a84 + aa7ac43 commit 53cd221

File tree

2 files changed

+11
-5
lines changed

2 files changed

+11
-5
lines changed

src/main/java/io/reactivesocket/internal/Responder.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -716,10 +716,11 @@ public void request(long n) {
716716
// after we are first subscribed to then send
717717
// the initial frame
718718
s.onNext(requestFrame);
719-
// initial requestN back to the requester (subtract 1
720-
// for the initial frame which was already sent)
721-
child.onNext(
722-
Frame.RequestN.from(streamId, rn.intValue() - 1));
719+
if (rn.intValue() > 0) {
720+
// initial requestN back to the requester (subtract 1
721+
// for the initial frame which was already sent)
722+
child.onNext(Frame.RequestN.from(streamId, rn.intValue() - 1));
723+
}
723724
}, r -> {
724725
// requested
725726
child.onNext(Frame.RequestN.from(streamId, r.intValue()));

src/test/java/io/reactivesocket/internal/UnicastSubjectTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import io.reactivesocket.internal.UnicastSubject;
2424
import io.reactivex.subscribers.TestSubscriber;
2525

26+
import static org.junit.Assert.assertTrue;
27+
2628
public class UnicastSubjectTest {
2729

2830
@Test
@@ -52,7 +54,10 @@ public void testIllegalStateIfMultiSubscribe() {
5254
us.subscribe(f2);
5355

5456
f1.assertNotTerminated();
55-
f2.assertError(IllegalStateException.class);
57+
for (Throwable e : f2.errors()) {
58+
assertTrue( IllegalStateException.class.isInstance(e)
59+
|| NullPointerException.class.isInstance(e));
60+
}
5661
}
5762

5863
}

0 commit comments

Comments
 (0)