File tree Expand file tree Collapse file tree 1 file changed +4
-3
lines changed
src/main/java/io/reactivesocket/internal Expand file tree Collapse file tree 1 file changed +4
-3
lines changed Original file line number Diff line number Diff line change @@ -336,7 +336,7 @@ public void request(long n) {
336
336
if (n > 0 && started .compareAndSet (false , true )) {
337
337
final int streamId = requestFrame .getStreamId ();
338
338
339
- new FragmentedPublisher ( FrameType . NEXT_COMPLETE , streamId , requestHandler .handleRequestResponse (requestFrame )) .subscribe (new Subscriber <Frame >() {
339
+ requestHandler .handleRequestResponse (requestFrame ).subscribe (new Subscriber <Payload >() {
340
340
341
341
// event emission is serialized so this doesn't need to be atomic
342
342
int count = 0 ;
@@ -352,11 +352,12 @@ public void onSubscribe(Subscription s) {
352
352
}
353
353
354
354
@ Override
355
- public void onNext (Frame v ) {
355
+ public void onNext (Payload v ) {
356
356
if (++count > 1 ) {
357
357
onError (new IllegalStateException ("RequestResponse expects a single onNext" ));
358
358
} else {
359
- child .onNext (v );
359
+
360
+ child .onNext (Frame .Response .from (streamId , FrameType .NEXT_COMPLETE , v ));
360
361
}
361
362
}
362
363
You can’t perform that action at this time.
0 commit comments