2
2
3
3
import io .rsocket .RSocketClientTest .ClientSocketRule ;
4
4
import io .rsocket .util .EmptyPayload ;
5
+ import java .nio .channels .ClosedChannelException ;
6
+ import java .time .Duration ;
7
+ import java .util .Arrays ;
8
+ import java .util .function .Function ;
5
9
import org .junit .Rule ;
6
10
import org .junit .Test ;
7
11
import org .junit .runner .RunWith ;
11
15
import reactor .core .publisher .Mono ;
12
16
import reactor .test .StepVerifier ;
13
17
14
- import java .nio .channels .ClosedChannelException ;
15
- import java .time .Duration ;
16
- import java .util .Arrays ;
17
- import java .util .function .Function ;
18
-
19
18
@ RunWith (Parameterized .class )
20
19
public class RSocketClientTerminationTest {
21
20
22
- @ Rule
23
- public final ClientSocketRule rule = new ClientSocketRule ();
21
+ @ Rule public final ClientSocketRule rule = new ClientSocketRule ();
24
22
private Function <RSocket , ? extends Publisher <?>> interaction ;
25
23
26
24
public RSocketClientTerminationTest (Function <RSocket , ? extends Publisher <?>> interaction ) {
@@ -31,9 +29,7 @@ public RSocketClientTerminationTest(Function<RSocket, ? extends Publisher<?>> in
31
29
public void testCurrentStreamIsTerminatedOnConnectionClose () {
32
30
RSocketClient rSocket = rule .socket ;
33
31
34
- Mono .delay (Duration .ofSeconds (1 ))
35
- .doOnNext (v -> rule .connection .dispose ())
36
- .subscribe ();
32
+ Mono .delay (Duration .ofSeconds (1 )).doOnNext (v -> rule .connection .dispose ()).subscribe ();
37
33
38
34
StepVerifier .create (interaction .apply (rSocket ))
39
35
.expectError (ClosedChannelException .class )
0 commit comments