@@ -183,231 +183,223 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
183
183
184
184
@ Test
185
185
public void requestChannelCase_StreamIsTerminatedAfterBothSidesSentCompletion1 () {
186
- TestPublisher <Payload > outerPublisher = TestPublisher .create ();
187
- AssertSubscriber <Payload > outerAssertSubscriber = new AssertSubscriber <>(0 );
186
+ TestPublisher <Payload > requesterPublisher = TestPublisher .create ();
187
+ AssertSubscriber <Payload > requesterSubscriber = new AssertSubscriber <>(0 );
188
188
189
- AssertSubscriber <Payload > innerAssertSubscriber = new AssertSubscriber <>(0 );
190
- TestPublisher <Payload > innerPublisher = TestPublisher .create ();
189
+ AssertSubscriber <Payload > responderSubscriber = new AssertSubscriber <>(0 );
190
+ TestPublisher <Payload > responderPublisher = TestPublisher .create ();
191
191
192
192
initRequestChannelCase (
193
- outerPublisher , outerAssertSubscriber , innerPublisher , innerAssertSubscriber );
193
+ requesterPublisher , requesterSubscriber , responderPublisher , responderSubscriber );
194
194
195
- nextFromOuterPublisher ( outerPublisher , innerAssertSubscriber );
195
+ nextFromrequesterPublisher ( requesterPublisher , responderSubscriber );
196
196
197
- completeFromOuterPublisher ( outerPublisher , innerAssertSubscriber );
197
+ completeFromrequesterPublisher ( requesterPublisher , responderSubscriber );
198
198
199
- nextFromInnerPublisher ( innerPublisher , outerAssertSubscriber );
199
+ nextFromresponderPublisher ( responderPublisher , requesterSubscriber );
200
200
201
- completeFromInnerPublisher ( innerPublisher , outerAssertSubscriber );
201
+ completeFromresponderPublisher ( responderPublisher , requesterSubscriber );
202
202
}
203
203
204
204
@ Test
205
205
public void requestChannelCase_StreamIsTerminatedAfterBothSidesSentCompletion2 () {
206
- TestPublisher <Payload > outerPublisher = TestPublisher .create ();
207
- AssertSubscriber <Payload > outerAssertSubscriber = new AssertSubscriber <>(0 );
206
+ TestPublisher <Payload > requesterPublisher = TestPublisher .create ();
207
+ AssertSubscriber <Payload > requesterSubscriber = new AssertSubscriber <>(0 );
208
208
209
- AssertSubscriber <Payload > innerAssertSubscriber = new AssertSubscriber <>(0 );
210
- TestPublisher <Payload > innerPublisher = TestPublisher .create ();
209
+ AssertSubscriber <Payload > responderSubscriber = new AssertSubscriber <>(0 );
210
+ TestPublisher <Payload > responderPublisher = TestPublisher .create ();
211
211
212
212
initRequestChannelCase (
213
- outerPublisher , outerAssertSubscriber , innerPublisher , innerAssertSubscriber );
213
+ requesterPublisher , requesterSubscriber , responderPublisher , responderSubscriber );
214
214
215
- nextFromInnerPublisher ( innerPublisher , outerAssertSubscriber );
215
+ nextFromresponderPublisher ( responderPublisher , requesterSubscriber );
216
216
217
- completeFromInnerPublisher ( innerPublisher , outerAssertSubscriber );
217
+ completeFromresponderPublisher ( responderPublisher , requesterSubscriber );
218
218
219
- nextFromOuterPublisher ( outerPublisher , innerAssertSubscriber );
219
+ nextFromrequesterPublisher ( requesterPublisher , responderSubscriber );
220
220
221
- completeFromOuterPublisher ( outerPublisher , innerAssertSubscriber );
221
+ completeFromrequesterPublisher ( requesterPublisher , responderSubscriber );
222
222
}
223
223
224
224
@ Test
225
225
public void
226
226
requestChannelCase_CancellationFromResponderShouldLeaveStreamInHalfClosedStateWithNextCompletionPossibleFromRequester () {
227
- TestPublisher <Payload > outerPublisher = TestPublisher .create ();
228
- AssertSubscriber <Payload > outerAssertSubscriber = new AssertSubscriber <>(0 );
227
+ TestPublisher <Payload > requesterPublisher = TestPublisher .create ();
228
+ AssertSubscriber <Payload > requesterSubscriber = new AssertSubscriber <>(0 );
229
229
230
- AssertSubscriber <Payload > innerAssertSubscriber = new AssertSubscriber <>(0 );
231
- TestPublisher <Payload > innerPublisher = TestPublisher .create ();
230
+ AssertSubscriber <Payload > responderSubscriber = new AssertSubscriber <>(0 );
231
+ TestPublisher <Payload > responderPublisher = TestPublisher .create ();
232
232
233
233
initRequestChannelCase (
234
- outerPublisher , outerAssertSubscriber , innerPublisher , innerAssertSubscriber );
234
+ requesterPublisher , requesterSubscriber , responderPublisher , responderSubscriber );
235
235
236
- nextFromOuterPublisher ( outerPublisher , innerAssertSubscriber );
236
+ nextFromrequesterPublisher ( requesterPublisher , responderSubscriber );
237
237
238
- cancelFromInnerSubscriber (outerPublisher , innerAssertSubscriber );
238
+ cancelFromInnerSubscriber (requesterPublisher , responderSubscriber );
239
239
240
- nextFromInnerPublisher ( innerPublisher , outerAssertSubscriber );
240
+ nextFromresponderPublisher ( responderPublisher , requesterSubscriber );
241
241
242
- completeFromInnerPublisher ( innerPublisher , outerAssertSubscriber );
242
+ completeFromresponderPublisher ( responderPublisher , requesterSubscriber );
243
243
}
244
244
245
245
@ Test
246
246
public void
247
247
requestChannelCase_CompletionFromRequesterShouldLeaveStreamInHalfClosedStateWithNextCancellationPossibleFromResponder () {
248
- TestPublisher <Payload > outerPublisher = TestPublisher .create ();
249
- AssertSubscriber <Payload > outerAssertSubscriber = new AssertSubscriber <>(0 );
248
+ TestPublisher <Payload > requesterPublisher = TestPublisher .create ();
249
+ AssertSubscriber <Payload > requesterSubscriber = new AssertSubscriber <>(0 );
250
250
251
- AssertSubscriber <Payload > innerAssertSubscriber = new AssertSubscriber <>(0 );
252
- TestPublisher <Payload > innerPublisher = TestPublisher .create ();
251
+ AssertSubscriber <Payload > responderSubscriber = new AssertSubscriber <>(0 );
252
+ TestPublisher <Payload > responderPublisher = TestPublisher .create ();
253
253
254
254
initRequestChannelCase (
255
- outerPublisher , outerAssertSubscriber , innerPublisher , innerAssertSubscriber );
255
+ requesterPublisher , requesterSubscriber , responderPublisher , responderSubscriber );
256
256
257
- nextFromInnerPublisher ( innerPublisher , outerAssertSubscriber );
257
+ nextFromresponderPublisher ( responderPublisher , requesterSubscriber );
258
258
259
- completeFromInnerPublisher ( innerPublisher , outerAssertSubscriber );
259
+ completeFromresponderPublisher ( responderPublisher , requesterSubscriber );
260
260
261
- nextFromOuterPublisher ( outerPublisher , innerAssertSubscriber );
261
+ nextFromrequesterPublisher ( requesterPublisher , responderSubscriber );
262
262
263
- cancelFromInnerSubscriber (outerPublisher , innerAssertSubscriber );
263
+ cancelFromInnerSubscriber (requesterPublisher , responderSubscriber );
264
264
}
265
265
266
266
@ Test
267
267
public void
268
268
requestChannelCase_ensureThatRequesterSubscriberCancellationTerminatesStreamsOnBothSides () {
269
- TestPublisher <Payload > outerPublisher = TestPublisher .create ();
270
- AssertSubscriber <Payload > outerAssertSubscriber = new AssertSubscriber <>(0 );
269
+ TestPublisher <Payload > requesterPublisher = TestPublisher .create ();
270
+ AssertSubscriber <Payload > requesterSubscriber = new AssertSubscriber <>(0 );
271
271
272
- AssertSubscriber <Payload > innerAssertSubscriber = new AssertSubscriber <>(0 );
273
- TestPublisher <Payload > innerPublisher = TestPublisher .create ();
272
+ AssertSubscriber <Payload > responderSubscriber = new AssertSubscriber <>(0 );
273
+ TestPublisher <Payload > responderPublisher = TestPublisher .create ();
274
274
275
275
initRequestChannelCase (
276
- outerPublisher , outerAssertSubscriber , innerPublisher , innerAssertSubscriber );
276
+ requesterPublisher , requesterSubscriber , responderPublisher , responderSubscriber );
277
277
278
- nextFromInnerPublisher ( innerPublisher , outerAssertSubscriber );
278
+ nextFromresponderPublisher ( responderPublisher , requesterSubscriber );
279
279
280
- nextFromOuterPublisher ( outerPublisher , innerAssertSubscriber );
280
+ nextFromrequesterPublisher ( requesterPublisher , responderSubscriber );
281
281
282
282
// ensures both sides are terminated
283
283
cancelFromOuterSubscriber (
284
- outerPublisher , outerAssertSubscriber , innerPublisher , innerAssertSubscriber );
284
+ requesterPublisher , requesterSubscriber , responderPublisher , responderSubscriber );
285
285
}
286
286
287
287
void initRequestChannelCase (
288
- TestPublisher <Payload > outerPublisher ,
289
- AssertSubscriber <Payload > outerAssertSubscriber ,
290
- TestPublisher <Payload > innerPublisher ,
291
- AssertSubscriber <Payload > innerAssertSubscriber ) {
288
+ TestPublisher <Payload > requesterPublisher ,
289
+ AssertSubscriber <Payload > requesterSubscriber ,
290
+ TestPublisher <Payload > responderPublisher ,
291
+ AssertSubscriber <Payload > responderSubscriber ) {
292
292
rule .setRequestAcceptor (
293
293
new AbstractRSocket () {
294
294
@ Override
295
295
public Flux <Payload > requestChannel (Publisher <Payload > payloads ) {
296
- payloads .subscribe (innerAssertSubscriber );
297
- return innerPublisher .flux ();
296
+ payloads .subscribe (responderSubscriber );
297
+ return responderPublisher .flux ();
298
298
}
299
299
});
300
300
301
- rule .crs .requestChannel (outerPublisher ).subscribe (outerAssertSubscriber );
301
+ rule .crs .requestChannel (requesterPublisher ).subscribe (requesterSubscriber );
302
302
303
- outerPublisher .assertWasSubscribed ();
304
- outerAssertSubscriber .assertSubscribed ();
303
+ requesterPublisher .assertWasSubscribed ();
304
+ requesterSubscriber .assertSubscribed ();
305
305
306
- innerAssertSubscriber .assertNotSubscribed ();
307
- innerPublisher .assertWasNotSubscribed ();
306
+ responderSubscriber .assertNotSubscribed ();
307
+ responderPublisher .assertWasNotSubscribed ();
308
308
309
309
// firstRequest
310
- outerAssertSubscriber .request (1 );
311
- outerPublisher .assertMaxRequested (1 );
312
- outerPublisher .next (DefaultPayload .create ("initialData" , "initialMetadata" ));
310
+ requesterSubscriber .request (1 );
311
+ requesterPublisher .assertMaxRequested (1 );
312
+ requesterPublisher .next (DefaultPayload .create ("initialData" , "initialMetadata" ));
313
313
314
- innerAssertSubscriber .assertSubscribed ();
315
- innerPublisher .assertWasSubscribed ();
314
+ responderSubscriber .assertSubscribed ();
315
+ responderPublisher .assertWasSubscribed ();
316
316
}
317
317
318
- void nextFromOuterPublisher (
319
- TestPublisher <Payload > outerPublisher , AssertSubscriber <Payload > innerAssertSubscriber ) {
318
+ void nextFromrequesterPublisher (
319
+ TestPublisher <Payload > requesterPublisher , AssertSubscriber <Payload > responderSubscriber ) {
320
320
// ensures that outerUpstream and innerSubscriber is not terminated so the requestChannel
321
- outerPublisher .assertSubscribers (1 );
322
- innerAssertSubscriber .assertNotTerminated ();
321
+ requesterPublisher .assertSubscribers (1 );
322
+ responderSubscriber .assertNotTerminated ();
323
323
324
- innerAssertSubscriber .request (6 );
325
- outerPublisher .next (
324
+ responderSubscriber .request (6 );
325
+ requesterPublisher .next (
326
326
DefaultPayload .create ("d1" , "m1" ),
327
327
DefaultPayload .create ("d2" ),
328
328
DefaultPayload .create ("d3" , "m3" ),
329
329
DefaultPayload .create ("d4" ),
330
330
DefaultPayload .create ("d5" , "m5" ));
331
331
332
- List <Payload > innerPayloads = innerAssertSubscriber .awaitAndAssertNextValueCount (6 ).values ();
332
+ List <Payload > innerPayloads = responderSubscriber .awaitAndAssertNextValueCount (6 ).values ();
333
333
Assertions .assertThat (innerPayloads .stream ().map (Payload ::getDataUtf8 ))
334
334
.containsExactly ("initialData" , "d1" , "d2" , "d3" , "d4" , "d5" );
335
- // fixme: incorrect behaviour of metadata encoding
336
- // Assertions
337
- // .assertThat(innerPayloads
338
- // .stream()
339
- // .map(Payload::hasMetadata)
340
- // )
341
- // .containsExactly(true, true, false, true, false, true);
335
+ Assertions .assertThat (innerPayloads .stream ().map (Payload ::hasMetadata ))
336
+ .containsExactly (true , true , false , true , false , true );
342
337
Assertions .assertThat (innerPayloads .stream ().map (Payload ::getMetadataUtf8 ))
343
338
.containsExactly ("initialMetadata" , "m1" , "" , "m3" , "" , "m5" );
344
339
}
345
340
346
- void completeFromOuterPublisher (
347
- TestPublisher <Payload > outerPublisher , AssertSubscriber <Payload > innerAssertSubscriber ) {
341
+ void completeFromrequesterPublisher (
342
+ TestPublisher <Payload > requesterPublisher , AssertSubscriber <Payload > responderSubscriber ) {
348
343
// ensures that after sending complete upstream part is closed
349
- outerPublisher .complete ();
350
- innerAssertSubscriber .assertTerminated ();
351
- outerPublisher .assertNoSubscribers ();
344
+ requesterPublisher .complete ();
345
+ responderSubscriber .assertTerminated ();
346
+ requesterPublisher .assertNoSubscribers ();
352
347
}
353
348
354
349
void cancelFromInnerSubscriber (
355
- TestPublisher <Payload > outerPublisher , AssertSubscriber <Payload > innerAssertSubscriber ) {
350
+ TestPublisher <Payload > requesterPublisher , AssertSubscriber <Payload > responderSubscriber ) {
356
351
// ensures that after sending complete upstream part is closed
357
- innerAssertSubscriber .cancel ();
358
- outerPublisher .assertWasCancelled ();
359
- outerPublisher .assertNoSubscribers ();
352
+ responderSubscriber .cancel ();
353
+ requesterPublisher .assertWasCancelled ();
354
+ requesterPublisher .assertNoSubscribers ();
360
355
}
361
356
362
- void nextFromInnerPublisher (
363
- TestPublisher <Payload > innerPublisher , AssertSubscriber <Payload > outerAssertSubscriber ) {
357
+ void nextFromresponderPublisher (
358
+ TestPublisher <Payload > responderPublisher , AssertSubscriber <Payload > requesterSubscriber ) {
364
359
// ensures that downstream is not terminated so the requestChannel state is half-closed
365
- innerPublisher .assertSubscribers (1 );
366
- outerAssertSubscriber .assertNotTerminated ();
360
+ responderPublisher .assertSubscribers (1 );
361
+ requesterSubscriber .assertNotTerminated ();
367
362
368
- // ensures innerPublisher can send messages and outerSubscriber can receive them
369
- outerAssertSubscriber .request (5 );
370
- innerPublisher .next (
363
+ // ensures responderPublisher can send messages and outerSubscriber can receive them
364
+ requesterSubscriber .request (5 );
365
+ responderPublisher .next (
371
366
DefaultPayload .create ("rd1" , "rm1" ),
372
367
DefaultPayload .create ("rd2" ),
373
368
DefaultPayload .create ("rd3" , "rm3" ),
374
369
DefaultPayload .create ("rd4" ),
375
370
DefaultPayload .create ("rd5" , "rm5" ));
376
371
377
- List <Payload > outerPayloads = outerAssertSubscriber .awaitAndAssertNextValueCount (5 ).values ();
372
+ List <Payload > outerPayloads = requesterSubscriber .awaitAndAssertNextValueCount (5 ).values ();
378
373
Assertions .assertThat (outerPayloads .stream ().map (Payload ::getDataUtf8 ))
379
374
.containsExactly ("rd1" , "rd2" , "rd3" , "rd4" , "rd5" );
380
- // fixme: incorrect behaviour of metadata encoding
381
- // Assertions
382
- // .assertThat(outerPayloads
383
- // .stream()
384
- // .map(Payload::hasMetadata)
385
- // )
386
- // .containsExactly(true, false, true, false, true);
375
+ Assertions .assertThat (outerPayloads .stream ().map (Payload ::hasMetadata ))
376
+ .containsExactly (true , false , true , false , true );
387
377
Assertions .assertThat (outerPayloads .stream ().map (Payload ::getMetadataUtf8 ))
388
378
.containsExactly ("rm1" , "" , "rm3" , "" , "rm5" );
389
379
}
390
380
391
- void completeFromInnerPublisher (
392
- TestPublisher <Payload > innerPublisher , AssertSubscriber <Payload > outerAssertSubscriber ) {
381
+ void completeFromresponderPublisher (
382
+ TestPublisher <Payload > responderPublisher , AssertSubscriber <Payload > requesterSubscriber ) {
393
383
// ensures that after sending complete inner upstream is closed
394
- innerPublisher .complete ();
395
- outerAssertSubscriber .assertTerminated ();
396
- innerPublisher .assertNoSubscribers ();
384
+ responderPublisher .complete ();
385
+ requesterSubscriber .assertTerminated ();
386
+ responderPublisher .assertNoSubscribers ();
397
387
}
398
388
399
389
void cancelFromOuterSubscriber (
400
- TestPublisher <Payload > outerPublisher ,
401
- AssertSubscriber <Payload > outerAssertSubscriber ,
402
- TestPublisher <Payload > innerPublisher ,
403
- AssertSubscriber <Payload > innerAssertSubscriber ) {
390
+ TestPublisher <Payload > requesterPublisher ,
391
+ AssertSubscriber <Payload > requesterSubscriber ,
392
+ TestPublisher <Payload > responderPublisher ,
393
+ AssertSubscriber <Payload > responderSubscriber ) {
404
394
// ensures that after sending cancel the whole requestChannel is terminated
405
- outerAssertSubscriber .cancel ();
406
- innerPublisher .assertWasCancelled ();
407
- innerPublisher .assertNoSubscribers ();
395
+ requesterSubscriber .cancel ();
396
+ // error should be propagated
397
+ responderSubscriber .assertTerminated ();
398
+ responderPublisher .assertWasCancelled ();
399
+ responderPublisher .assertNoSubscribers ();
408
400
// ensures that cancellation is propagated to the actual upstream
409
- outerPublisher .assertWasCancelled ();
410
- outerPublisher .assertNoSubscribers ();
401
+ requesterPublisher .assertWasCancelled ();
402
+ requesterPublisher .assertNoSubscribers ();
411
403
}
412
404
413
405
public static class SocketRule extends ExternalResource {
0 commit comments