Skip to content

Commit 272401b

Browse files
yschimkeNiteshKant
authored andcommitted
StreamIdSupplier returns 1,3,5... or 2,4,6... (#213)
Tighten `StreamIdSupplier.isValid()` check and start stream ids from 1
1 parent 4ec6edb commit 272401b

File tree

3 files changed

+74
-5
lines changed

3 files changed

+74
-5
lines changed

reactivesocket-core/src/main/java/io/reactivesocket/ClientReactiveSocket.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ private void handleFrame(int streamId, FrameType type, Frame frame) {
301301
}
302302

303303
private void handleMissingResponseProcessor(int streamId, FrameType type, Frame frame) {
304-
if (!streamIdSupplier.isValid(streamId)) {
304+
if (!streamIdSupplier.isBeforeOrCurrent(streamId)) {
305305
if (type == FrameType.ERROR) {
306306
// message for stream that has never existed, we have a problem with
307307
// the overall connection and must tear down

reactivesocket-core/src/main/java/io/reactivesocket/StreamIdSupplier.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,15 @@ public synchronized int nextStreamId() {
2929
return streamId;
3030
}
3131

32-
public synchronized boolean isValid(int streamId) {
33-
return this.streamId < streamId;
32+
public synchronized boolean isBeforeOrCurrent(int streamId) {
33+
return this.streamId >= streamId && streamId > 0;
3434
}
3535

3636
public static StreamIdSupplier clientSupplier() {
37-
return new StreamIdSupplier(1);
37+
return new StreamIdSupplier(-1);
3838
}
3939

4040
public static StreamIdSupplier serverSupplier() {
41-
return new StreamIdSupplier(2);
41+
return new StreamIdSupplier(0);
4242
}
4343
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package io.reactivesocket;
2+
3+
import org.junit.Test;
4+
5+
import static org.junit.Assert.assertEquals;
6+
import static org.junit.Assert.assertFalse;
7+
import static org.junit.Assert.assertTrue;
8+
9+
public class StreamIdSupplierTest {
10+
@Test
11+
public void testClientSequence() {
12+
StreamIdSupplier s = StreamIdSupplier.clientSupplier();
13+
assertEquals(1, s.nextStreamId());
14+
assertEquals(3, s.nextStreamId());
15+
assertEquals(5, s.nextStreamId());
16+
}
17+
18+
@Test
19+
public void testServerSequence() {
20+
StreamIdSupplier s = StreamIdSupplier.serverSupplier();
21+
assertEquals(2, s.nextStreamId());
22+
assertEquals(4, s.nextStreamId());
23+
assertEquals(6, s.nextStreamId());
24+
}
25+
26+
@Test
27+
public void testClientIsValid() {
28+
StreamIdSupplier s = StreamIdSupplier.clientSupplier();
29+
30+
assertFalse(s.isBeforeOrCurrent(1));
31+
assertFalse(s.isBeforeOrCurrent(3));
32+
33+
s.nextStreamId();
34+
assertTrue(s.isBeforeOrCurrent(1));
35+
assertFalse(s.isBeforeOrCurrent(3));
36+
37+
s.nextStreamId();
38+
assertTrue(s.isBeforeOrCurrent(3));
39+
40+
// negative
41+
assertFalse(s.isBeforeOrCurrent(-1));
42+
// connection
43+
assertFalse(s.isBeforeOrCurrent(0));
44+
// server also accepted (checked externally)
45+
assertTrue(s.isBeforeOrCurrent(2));
46+
}
47+
48+
@Test
49+
public void testServerIsValid() {
50+
StreamIdSupplier s = StreamIdSupplier.serverSupplier();
51+
52+
assertFalse(s.isBeforeOrCurrent(2));
53+
assertFalse(s.isBeforeOrCurrent(4));
54+
55+
s.nextStreamId();
56+
assertTrue(s.isBeforeOrCurrent(2));
57+
assertFalse(s.isBeforeOrCurrent(4));
58+
59+
s.nextStreamId();
60+
assertTrue(s.isBeforeOrCurrent(4));
61+
62+
// negative
63+
assertFalse(s.isBeforeOrCurrent(-2));
64+
// connection
65+
assertFalse(s.isBeforeOrCurrent(0));
66+
// client also accepted (checked externally)
67+
assertTrue(s.isBeforeOrCurrent(1));
68+
}
69+
}

0 commit comments

Comments
 (0)