26
26
import java .util .concurrent .Flow ;
27
27
28
28
import org .junit .jupiter .api .Test ;
29
- import org .reactivestreams .FlowAdapters ;
30
- import reactor .test .StepVerifier ;
31
29
32
30
import static java .nio .charset .StandardCharsets .UTF_8 ;
33
31
import static org .assertj .core .api .Assertions .assertThat ;
@@ -68,22 +66,7 @@ public byte[] map(byte[] b, int off, int len) {
68
66
69
67
70
68
@ Test
71
- void basic () {
72
- Flow .Publisher <byte []> publisher = new OutputStreamPublisher <>(
73
- out -> {
74
- out .write (FOO );
75
- out .write (BAR );
76
- out .write (BAZ );
77
- },
78
- this .byteMapper , this .executor , null );
79
-
80
- StepVerifier .create (FlowAdapters .toPublisher (publisher ))
81
- .assertNext (s -> assertThat (s ).containsExactly ("foobarbaz" .getBytes (UTF_8 )))
82
- .verifyComplete ();
83
- }
84
-
85
- @ Test
86
- void flush () throws IOException {
69
+ void basic () throws IOException {
87
70
Flow .Publisher <byte []> publisher = new OutputStreamPublisher <>(
88
71
out -> {
89
72
out .write (FOO );
@@ -115,7 +98,7 @@ void flush() throws IOException {
115
98
}
116
99
117
100
@ Test
118
- void chunkSize () {
101
+ void chunkSize () throws Exception {
119
102
Flow .Publisher <byte []> publisher = new OutputStreamPublisher <>(
120
103
out -> {
121
104
out .write (FOO );
@@ -127,24 +110,25 @@ void chunkSize() {
127
110
try (SubscriberInputStream <byte []> is = new SubscriberInputStream <>(s -> s , s -> {}, 1 )) {
128
111
publisher .subscribe (is );
129
112
130
- StringBuilder stringBuilder = new StringBuilder ();
113
+ StringBuilder sb = new StringBuilder ();
131
114
byte [] chunk = new byte [3 ];
132
115
133
- stringBuilder .append (new String (new byte []{(byte )is .read ()}, UTF_8 ));
116
+ sb .append ((char ) is .read ());
117
+ assertThat (sb ).matches ("f" );
118
+
134
119
assertThat (is .read (chunk )).isEqualTo (3 );
120
+ sb .append (new String (chunk , UTF_8 ));
121
+ assertThat (sb ).matches ("foob" );
135
122
136
- stringBuilder .append (new String (chunk , UTF_8 ));
137
123
assertThat (is .read (chunk )).isEqualTo (3 );
124
+ sb .append (new String (chunk , UTF_8 ));
125
+ assertThat (sb ).matches ("foobarb" );
138
126
139
- stringBuilder .append (new String (chunk , UTF_8 ));
140
127
assertThat (is .read (chunk )).isEqualTo (2 );
128
+ sb .append (new String (chunk ,0 , 2 , UTF_8 ));
129
+ assertThat (sb ).matches ("foobarbaz" );
141
130
142
- stringBuilder .append (new String (chunk ,0 , 2 , UTF_8 ));
143
131
assertThat (is .read ()).isEqualTo (-1 );
144
- assertThat (stringBuilder .toString ()).isEqualTo ("foobarbaz" );
145
- }
146
- catch (IOException e ) {
147
- throw new RuntimeException (e );
148
132
}
149
133
}
150
134
@@ -155,13 +139,13 @@ void cancel() throws InterruptedException, IOException {
155
139
Flow .Publisher <byte []> publisher = new OutputStreamPublisher <>(
156
140
out -> {
157
141
assertThatIOException ().isThrownBy (() -> {
158
- out .write (FOO );
159
- out .flush ();
160
- out .write (BAR );
161
- out .flush ();
162
- out .write (BAZ );
163
- out .flush ();
164
- }).withMessage ("Subscription has been terminated" );
142
+ out .write (FOO );
143
+ out .flush ();
144
+ out .write (BAR );
145
+ out .flush ();
146
+ out .write (BAZ );
147
+ out .flush ();
148
+ }).withMessage ("Subscription has been terminated" );
165
149
latch .countDown ();
166
150
167
151
}, this .byteMapper , this .executor , null );
@@ -201,6 +185,7 @@ void closed() throws InterruptedException, IOException {
201
185
202
186
assertThat (is .read (chunk )).isEqualTo (3 );
203
187
assertThat (chunk ).containsExactly (FOO );
188
+
204
189
assertThat (is .read (chunk )).isEqualTo (-1 );
205
190
}
206
191
@@ -215,7 +200,8 @@ void mapperThrowsException() throws InterruptedException {
215
200
out -> {
216
201
out .write (FOO );
217
202
out .flush ();
218
- assertThatIOException ().isThrownBy (() -> {
203
+ assertThatIOException ()
204
+ .isThrownBy (() -> {
219
205
out .write (BAR );
220
206
out .flush ();
221
207
})
0 commit comments