15
15
* limitations under the License.
16
16
*/
17
17
18
- import static io .rsocket .frame .FrameHeaderCodec .frameType ;
19
- import static org .hamcrest .MatcherAssert .assertThat ;
20
- import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
21
- import static org .hamcrest .Matchers .hasSize ;
22
-
23
18
import io .netty .buffer .ByteBuf ;
24
19
import io .netty .util .CharsetUtil ;
25
20
import io .netty .util .ReferenceCountUtil ;
37
32
import java .time .Duration ;
38
33
import java .util .ArrayList ;
39
34
import java .util .Collection ;
40
- import java .util .List ;
41
35
import java .util .Map ;
42
36
import java .util .concurrent .CancellationException ;
43
37
import java .util .concurrent .atomic .AtomicBoolean ;
52
46
import org .junit .jupiter .params .ParameterizedTest ;
53
47
import org .junit .jupiter .params .provider .Arguments ;
54
48
import org .junit .jupiter .params .provider .MethodSource ;
55
- import org .junit .runners .model .Statement ;
56
49
import org .reactivestreams .Publisher ;
57
50
import reactor .core .Disposable ;
58
51
import reactor .core .publisher .Flux ;
64
57
import reactor .test .publisher .TestPublisher ;
65
58
import reactor .test .util .RaceTestUtils ;
66
59
import reactor .util .context .Context ;
60
+ import reactor .util .context .ContextView ;
67
61
import reactor .util .retry .Retry ;
68
62
69
63
public class DefaultRSocketClientTests {
@@ -75,13 +69,7 @@ public void setUp() throws Throwable {
75
69
Hooks .onNextDropped (ReferenceCountUtil ::safeRelease );
76
70
Hooks .onErrorDropped ((t ) -> {});
77
71
rule = new ClientSocketRule ();
78
- rule .apply (
79
- new Statement () {
80
- @ Override
81
- public void evaluate () {}
82
- },
83
- null )
84
- .evaluate ();
72
+ rule .init ();
85
73
}
86
74
87
75
@ AfterEach
@@ -179,19 +167,12 @@ public void shouldSentFrameOnResolution(
179
167
@ MethodSource ("interactions" )
180
168
@ SuppressWarnings ({"unchecked" , "rawtypes" })
181
169
public void shouldHaveNoLeaksOnPayloadInCaseOfRacingOfOnNextAndCancel (
182
- BiFunction <RSocketClient , Publisher <Payload >, Publisher <?>> request , FrameType requestType )
183
- throws Throwable {
170
+ BiFunction <RSocketClient , Publisher <Payload >, Publisher <?>> request , FrameType requestType ) {
184
171
Assumptions .assumeThat (requestType ).isNotEqualTo (FrameType .REQUEST_CHANNEL );
185
172
186
173
for (int i = 0 ; i < RaceTestConstants .REPEATS ; i ++) {
187
174
ClientSocketRule rule = new ClientSocketRule ();
188
- rule .apply (
189
- new Statement () {
190
- @ Override
191
- public void evaluate () {}
192
- },
193
- null )
194
- .evaluate ();
175
+ rule .init ();
195
176
Payload payload = ByteBufPayload .create ("test" , "testMetadata" );
196
177
TestPublisher <Payload > testPublisher =
197
178
TestPublisher .createNoncompliant (TestPublisher .Violation .DEFER_CANCELLATION );
@@ -241,19 +222,12 @@ public void evaluate() {}
241
222
@ MethodSource ("interactions" )
242
223
@ SuppressWarnings ({"unchecked" , "rawtypes" })
243
224
public void shouldHaveNoLeaksOnPayloadInCaseOfRacingOfRequestAndCancel (
244
- BiFunction <RSocketClient , Publisher <Payload >, Publisher <?>> request , FrameType requestType )
245
- throws Throwable {
225
+ BiFunction <RSocketClient , Publisher <Payload >, Publisher <?>> request , FrameType requestType ) {
246
226
Assumptions .assumeThat (requestType ).isNotEqualTo (FrameType .REQUEST_CHANNEL );
247
227
248
228
for (int i = 0 ; i < RaceTestConstants .REPEATS ; i ++) {
249
229
ClientSocketRule rule = new ClientSocketRule ();
250
- rule .apply (
251
- new Statement () {
252
- @ Override
253
- public void evaluate () {}
254
- },
255
- null )
256
- .evaluate ();
230
+ rule .init ();
257
231
ByteBuf dataBuffer = rule .allocator .buffer ();
258
232
dataBuffer .writeCharSequence ("test" , CharsetUtil .UTF_8 );
259
233
@@ -311,14 +285,17 @@ public void shouldPropagateDownstreamContext(
311
285
Payload payload = ByteBufPayload .create (dataBuffer , metadataBuffer );
312
286
AssertSubscriber assertSubscriber = new AssertSubscriber (Context .of ("test" , "test" ));
313
287
314
- Context [] receivedContext = new Context [1 ];
288
+ ContextView [] receivedContext = new Context [1 ];
315
289
Publisher <?> publisher =
316
290
request .apply (
317
291
rule .client ,
318
292
Mono .just (payload )
319
293
.mergeWith (
320
- Mono .subscriberContext ()
321
- .doOnNext (c -> receivedContext [0 ] = c )
294
+ Mono .deferContextual (
295
+ c -> {
296
+ receivedContext [0 ] = c ;
297
+ return Mono .empty ();
298
+ })
322
299
.then (Mono .empty ())));
323
300
publisher .subscribe (assertSubscriber );
324
301
@@ -481,16 +458,11 @@ public void shouldDisposeOriginalSource() {
481
458
}
482
459
483
460
@ Test
484
- public void shouldDisposeOriginalSourceIfRacing () throws Throwable {
461
+ public void shouldDisposeOriginalSourceIfRacing () {
485
462
for (int i = 0 ; i < RaceTestConstants .REPEATS ; i ++) {
486
463
ClientSocketRule rule = new ClientSocketRule ();
487
- rule .apply (
488
- new Statement () {
489
- @ Override
490
- public void evaluate () {}
491
- },
492
- null )
493
- .evaluate ();
464
+
465
+ rule .init ();
494
466
495
467
AssertSubscriber <RSocket > assertSubscriber = AssertSubscriber .create ();
496
468
rule .client .source ().subscribe (assertSubscriber );
@@ -520,8 +492,8 @@ public static class ClientSocketRule extends AbstractSocketRule<RSocketRequester
520
492
protected Sinks .One <RSocket > producer ;
521
493
522
494
@ Override
523
- protected void init () {
524
- super .init ();
495
+ protected void doInit () {
496
+ super .doInit ();
525
497
delayer = () -> producer .tryEmitValue (socket );
526
498
producer = Sinks .one ();
527
499
client =
@@ -547,22 +519,5 @@ protected RSocketRequester newRSocket() {
547
519
__ -> null ,
548
520
null );
549
521
}
550
-
551
- public int getStreamIdForRequestType (FrameType expectedFrameType ) {
552
- assertThat ("Unexpected frames sent." , connection .getSent (), hasSize (greaterThanOrEqualTo (1 )));
553
- List <FrameType > framesFound = new ArrayList <>();
554
- for (ByteBuf frame : connection .getSent ()) {
555
- FrameType frameType = frameType (frame );
556
- if (frameType == expectedFrameType ) {
557
- return FrameHeaderCodec .streamId (frame );
558
- }
559
- framesFound .add (frameType );
560
- }
561
- throw new AssertionError (
562
- "No frames sent with frame type: "
563
- + expectedFrameType
564
- + ", frames found: "
565
- + framesFound );
566
- }
567
522
}
568
523
}
0 commit comments