@@ -259,14 +259,14 @@ private void handleFrame(Frame frame) {
259
259
handleStream (streamId , requestStream (frameDecoder .apply (frame )), initialRequestN (frame ));
260
260
break ;
261
261
case REQUEST_CHANNEL :
262
- handleChannel (streamId , frame );
263
- break ;
264
- case PAYLOAD :
265
- // TODO: Hook in receiving socket.
262
+ handleChannel (streamId , frameDecoder .apply (frame ), initialRequestN (frame ));
266
263
break ;
267
264
case METADATA_PUSH :
268
265
metadataPush (frameDecoder .apply (frame ));
269
266
break ;
267
+ case PAYLOAD :
268
+ // TODO: Hook in receiving socket.
269
+ break ;
270
270
case LEASE :
271
271
// Lease must not be received here as this is the server end of the socket which sends
272
272
// leases.
@@ -313,18 +313,17 @@ private void handleFrame(Frame frame) {
313
313
314
314
private void handleFireAndForget (int streamId , Mono <Void > result ) {
315
315
result
316
+ .doOnSubscribe (subscription -> sendingSubscriptions .put (streamId , subscription ))
316
317
.doFinally (signalType -> sendingSubscriptions .remove (streamId ))
317
- .subscribe (
318
- null ,
319
- errorConsumer ,
320
- null ,
321
- subscription -> sendingSubscriptions .put (streamId , subscription ));
318
+ .subscribe (null , errorConsumer );
322
319
}
323
320
324
321
private void handleRequestResponse (int streamId , Mono <Payload > response ) {
325
322
response
323
+ .switchIfEmpty (Mono .fromCallable (() -> Frame .PayloadFrame .from (streamId , FrameType .COMPLETE )))
326
324
.doOnSubscribe (subscription -> sendingSubscriptions .put (streamId , subscription ))
327
- .map (
325
+ .doFinally (signalType -> sendingSubscriptions .remove (streamId ))
326
+ .subscribe (
328
327
payload -> {
329
328
int flags = FLAGS_C ;
330
329
if (payload .hasMetadata ()) {
@@ -333,35 +332,35 @@ private void handleRequestResponse(int streamId, Mono<Payload> response) {
333
332
final Frame frame =
334
333
Frame .PayloadFrame .from (streamId , FrameType .NEXT_COMPLETE , payload , flags );
335
334
payload .release ();
336
- return frame ;
337
- })
338
- .switchIfEmpty (
339
- Mono .fromCallable (() -> Frame .PayloadFrame .from (streamId , FrameType .COMPLETE )))
340
- .doFinally (signalType -> sendingSubscriptions .remove (streamId ))
341
- .subscribe (sendProcessor ::onNext , t -> handleError (streamId , t ));
335
+ sendProcessor .onNext (frame );
336
+ },
337
+ t -> handleError (streamId , t ));
342
338
}
343
339
344
340
private void handleStream (int streamId , Flux <Payload > response , int initialRequestN ) {
345
341
response
346
- .map (
347
- payload -> {
348
- final Frame frame = Frame .PayloadFrame .from (streamId , FrameType .NEXT , payload );
349
- payload .release ();
350
- return frame ;
351
- })
352
- .concatWith (Mono .fromCallable (() -> Frame .PayloadFrame .from (streamId , FrameType .COMPLETE )))
353
342
.transform (
354
343
frameFlux -> {
355
- LimitableRequestPublisher <Frame > frames = LimitableRequestPublisher .wrap (frameFlux );
356
- sendingSubscriptions .put (streamId , frames );
357
- frames .increaseRequestLimit (initialRequestN );
358
- return frames ;
344
+ LimitableRequestPublisher <Payload > payloads = LimitableRequestPublisher .wrap (frameFlux );
345
+ sendingSubscriptions .put (streamId , payloads );
346
+ payloads .increaseRequestLimit (initialRequestN );
347
+ return payloads ;
359
348
})
360
349
.doFinally (signalType -> sendingSubscriptions .remove (streamId ))
361
- .subscribe (sendProcessor ::onNext , t -> handleError (streamId , t ));
350
+ .subscribe (
351
+ payload -> {
352
+ final Frame frame = Frame .PayloadFrame .from (streamId , FrameType .NEXT , payload );
353
+ payload .release ();
354
+ sendProcessor .onNext (frame );
355
+ },
356
+ t -> handleError (streamId , t ),
357
+ () -> {
358
+ final Frame frame = Frame .PayloadFrame .from (streamId , FrameType .COMPLETE );
359
+ sendProcessor .onNext (frame );
360
+ });
362
361
}
363
362
364
- private void handleChannel (int streamId , Frame firstFrame ) {
363
+ private void handleChannel (int streamId , Payload payload , int initialRequestN ) {
365
364
UnicastProcessor <Payload > frames = UnicastProcessor .create ();
366
365
channelProcessors .put (streamId , frames );
367
366
@@ -375,9 +374,9 @@ private void handleChannel(int streamId, Frame firstFrame) {
375
374
// not chained, as the payload should be enqueued in the Unicast processor before this method
376
375
// returns
377
376
// and any later payload can be processed
378
- frames .onNext (frameDecoder . apply ( firstFrame ) );
377
+ frames .onNext (payload );
379
378
380
- handleStream (streamId , requestChannel (payloads ), initialRequestN ( firstFrame ) );
379
+ handleStream (streamId , requestChannel (payloads ), initialRequestN );
381
380
}
382
381
383
382
private void handleKeepAliveFrame (Frame frame ) {
0 commit comments