@@ -228,14 +228,14 @@ private void handleFrame(Frame frame) {
228
228
handleStream (streamId , requestStream (frameDecoder .apply (frame )), initialRequestN (frame ));
229
229
break ;
230
230
case REQUEST_CHANNEL :
231
- handleChannel (streamId , frame );
232
- break ;
233
- case PAYLOAD :
234
- // TODO: Hook in receiving socket.
231
+ handleChannel (streamId , frameDecoder .apply (frame ), initialRequestN (frame ));
235
232
break ;
236
233
case METADATA_PUSH :
237
234
metadataPush (frameDecoder .apply (frame ));
238
235
break ;
236
+ case PAYLOAD :
237
+ // TODO: Hook in receiving socket.
238
+ break ;
239
239
case LEASE :
240
240
// Lease must not be received here as this is the server end of the socket which sends
241
241
// leases.
@@ -282,18 +282,17 @@ private void handleFrame(Frame frame) {
282
282
283
283
private void handleFireAndForget (int streamId , Mono <Void > result ) {
284
284
result
285
+ .doOnSubscribe (subscription -> sendingSubscriptions .put (streamId , subscription ))
285
286
.doFinally (signalType -> sendingSubscriptions .remove (streamId ))
286
- .subscribe (
287
- null ,
288
- errorConsumer ,
289
- null ,
290
- subscription -> sendingSubscriptions .put (streamId , subscription ));
287
+ .subscribe (null , errorConsumer );
291
288
}
292
289
293
290
private void handleRequestResponse (int streamId , Mono <Payload > response ) {
294
291
response
292
+ .switchIfEmpty (Mono .fromCallable (() -> Frame .PayloadFrame .from (streamId , FrameType .COMPLETE )))
295
293
.doOnSubscribe (subscription -> sendingSubscriptions .put (streamId , subscription ))
296
- .map (
294
+ .doFinally (signalType -> sendingSubscriptions .remove (streamId ))
295
+ .subscribe (
297
296
payload -> {
298
297
int flags = FLAGS_C ;
299
298
if (payload .hasMetadata ()) {
@@ -302,34 +301,35 @@ private void handleRequestResponse(int streamId, Mono<Payload> response) {
302
301
final Frame frame =
303
302
Frame .PayloadFrame .from (streamId , FrameType .NEXT_COMPLETE , payload , flags );
304
303
payload .release ();
305
- return frame ;
306
- })
307
- .switchIfEmpty (Mono .fromCallable (() -> Frame .PayloadFrame .from (streamId , FrameType .COMPLETE )))
308
- .doFinally (signalType -> sendingSubscriptions .remove (streamId ))
309
- .subscribe (sendProcessor ::onNext , t -> handleError (streamId , t ));
304
+ sendProcessor .onNext (frame );
305
+ },
306
+ t -> handleError (streamId , t ));
310
307
}
311
308
312
309
private void handleStream (int streamId , Flux <Payload > response , int initialRequestN ) {
313
310
response
314
- .map (
315
- payload -> {
316
- final Frame frame = Frame .PayloadFrame .from (streamId , FrameType .NEXT , payload );
317
- payload .release ();
318
- return frame ;
319
- })
320
- .concatWith (Mono .fromCallable (() -> Frame .PayloadFrame .from (streamId , FrameType .COMPLETE )))
321
311
.transform (
322
312
frameFlux -> {
323
- LimitableRequestPublisher <Frame > frames = LimitableRequestPublisher .wrap (frameFlux );
324
- sendingSubscriptions .put (streamId , frames );
325
- frames .increaseRequestLimit (initialRequestN );
326
- return frames ;
313
+ LimitableRequestPublisher <Payload > payloads = LimitableRequestPublisher .wrap (frameFlux );
314
+ sendingSubscriptions .put (streamId , payloads );
315
+ payloads .increaseRequestLimit (initialRequestN );
316
+ return payloads ;
327
317
})
328
318
.doFinally (signalType -> sendingSubscriptions .remove (streamId ))
329
- .subscribe (sendProcessor ::onNext , t -> handleError (streamId , t ));
319
+ .subscribe (
320
+ payload -> {
321
+ final Frame frame = Frame .PayloadFrame .from (streamId , FrameType .NEXT , payload );
322
+ payload .release ();
323
+ sendProcessor .onNext (frame );
324
+ },
325
+ t -> handleError (streamId , t ),
326
+ () -> {
327
+ final Frame frame = Frame .PayloadFrame .from (streamId , FrameType .COMPLETE );
328
+ sendProcessor .onNext (frame );
329
+ });
330
330
}
331
331
332
- private void handleChannel (int streamId , Frame firstFrame ) {
332
+ private void handleChannel (int streamId , Payload payload , int initialRequestN ) {
333
333
UnicastProcessor <Payload > frames = UnicastProcessor .create ();
334
334
channelProcessors .put (streamId , frames );
335
335
@@ -343,9 +343,9 @@ private void handleChannel(int streamId, Frame firstFrame) {
343
343
// not chained, as the payload should be enqueued in the Unicast processor before this method
344
344
// returns
345
345
// and any later payload can be processed
346
- frames .onNext (frameDecoder . apply ( firstFrame ) );
346
+ frames .onNext (payload );
347
347
348
- handleStream (streamId , requestChannel (payloads ), initialRequestN ( firstFrame ) );
348
+ handleStream (streamId , requestChannel (payloads ), initialRequestN );
349
349
}
350
350
351
351
private void handleKeepAliveFrame (Frame frame ) {
0 commit comments