31
31
import io .rsocket .RSocket ;
32
32
import io .rsocket .exceptions .ConnectionErrorException ;
33
33
import io .rsocket .exceptions .Exceptions ;
34
- import io .rsocket .frame .CancelFrameFlyweight ;
35
- import io .rsocket .frame .ErrorFrameFlyweight ;
36
- import io .rsocket .frame .FrameHeaderFlyweight ;
34
+ import io .rsocket .frame .CancelFrameCodec ;
35
+ import io .rsocket .frame .ErrorFrameCodec ;
36
+ import io .rsocket .frame .FrameHeaderCodec ;
37
37
import io .rsocket .frame .FrameType ;
38
- import io .rsocket .frame .MetadataPushFrameFlyweight ;
39
- import io .rsocket .frame .PayloadFrameFlyweight ;
40
- import io .rsocket .frame .RequestChannelFrameFlyweight ;
41
- import io .rsocket .frame .RequestFireAndForgetFrameFlyweight ;
42
- import io .rsocket .frame .RequestNFrameFlyweight ;
43
- import io .rsocket .frame .RequestResponseFrameFlyweight ;
44
- import io .rsocket .frame .RequestStreamFrameFlyweight ;
38
+ import io .rsocket .frame .MetadataPushFrameCodec ;
39
+ import io .rsocket .frame .PayloadFrameCodec ;
40
+ import io .rsocket .frame .RequestChannelFrameCodec ;
41
+ import io .rsocket .frame .RequestFireAndForgetFrameCodec ;
42
+ import io .rsocket .frame .RequestNFrameCodec ;
43
+ import io .rsocket .frame .RequestResponseFrameCodec ;
44
+ import io .rsocket .frame .RequestStreamFrameCodec ;
45
45
import io .rsocket .frame .decoder .PayloadDecoder ;
46
46
import io .rsocket .internal .SynchronizedIntObjectHashMap ;
47
47
import io .rsocket .internal .UnboundedProcessor ;
@@ -225,7 +225,7 @@ private Mono<Void> handleFireAndForget(Payload payload) {
225
225
226
226
final int streamId = streamIdSupplier .nextStreamId (receivers );
227
227
final ByteBuf requestFrame =
228
- RequestFireAndForgetFrameFlyweight .encodeReleasingPayload (
228
+ RequestFireAndForgetFrameCodec .encodeReleasingPayload (
229
229
allocator , streamId , payload );
230
230
231
231
sendProcessor .onNext (requestFrame );
@@ -275,7 +275,7 @@ void hookOnFirstRequest(long n) {
275
275
this .streamId = streamId ;
276
276
277
277
ByteBuf requestResponseFrame =
278
- RequestResponseFrameFlyweight .encodeReleasingPayload (
278
+ RequestResponseFrameCodec .encodeReleasingPayload (
279
279
allocator , streamId , payload );
280
280
281
281
receivers .put (streamId , receiver );
@@ -285,8 +285,7 @@ void hookOnFirstRequest(long n) {
285
285
@ Override
286
286
void hookOnCancel () {
287
287
if (receivers .remove (streamId , receiver )) {
288
- sendProcessor .onNext (
289
- CancelFrameFlyweight .encode (allocator , streamId ));
288
+ sendProcessor .onNext (CancelFrameCodec .encode (allocator , streamId ));
290
289
} else {
291
290
payload .release ();
292
291
}
@@ -341,7 +340,7 @@ void hookOnFirstRequest(long n) {
341
340
this .streamId = streamId ;
342
341
343
342
ByteBuf requestStreamFrame =
344
- RequestStreamFrameFlyweight .encodeReleasingPayload (
343
+ RequestStreamFrameCodec .encodeReleasingPayload (
345
344
allocator , streamId , n , payload );
346
345
347
346
receivers .put (streamId , receiver );
@@ -356,14 +355,13 @@ void hookOnRemainingRequests(long n) {
356
355
}
357
356
358
357
sendProcessor .onNext (
359
- RequestNFrameFlyweight .encode (allocator , streamId , n ));
358
+ RequestNFrameCodec .encode (allocator , streamId , n ));
360
359
}
361
360
362
361
@ Override
363
362
void hookOnCancel () {
364
363
if (receivers .remove (streamId , receiver )) {
365
- sendProcessor .onNext (
366
- CancelFrameFlyweight .encode (allocator , streamId ));
364
+ sendProcessor .onNext (CancelFrameCodec .encode (allocator , streamId ));
367
365
} else {
368
366
payload .release ();
369
367
}
@@ -450,28 +448,26 @@ protected void hookOnNext(Payload payload) {
450
448
new IllegalArgumentException (INVALID_PAYLOAD_ERROR_MESSAGE );
451
449
errorConsumer .accept (t );
452
450
// no need to send any errors.
453
- sendProcessor .onNext (
454
- CancelFrameFlyweight .encode (allocator , streamId ));
451
+ sendProcessor .onNext (CancelFrameCodec .encode (allocator , streamId ));
455
452
receiver .onError (t );
456
453
return ;
457
454
}
458
455
final ByteBuf frame =
459
- PayloadFrameFlyweight .encodeNextReleasingPayload (
456
+ PayloadFrameCodec .encodeNextReleasingPayload (
460
457
allocator , streamId , payload );
461
458
462
459
sendProcessor .onNext (frame );
463
460
}
464
461
465
462
@ Override
466
463
protected void hookOnComplete () {
467
- ByteBuf frame =
468
- PayloadFrameFlyweight .encodeComplete (allocator , streamId );
464
+ ByteBuf frame = PayloadFrameCodec .encodeComplete (allocator , streamId );
469
465
sendProcessor .onNext (frame );
470
466
}
471
467
472
468
@ Override
473
469
protected void hookOnError (Throwable t ) {
474
- ByteBuf frame = ErrorFrameFlyweight .encode (allocator , streamId , t );
470
+ ByteBuf frame = ErrorFrameCodec .encode (allocator , streamId , t );
475
471
sendProcessor .onNext (frame );
476
472
receiver .onError (t );
477
473
}
@@ -488,7 +484,7 @@ void hookOnFirstRequest(long n) {
488
484
this .streamId = streamId ;
489
485
490
486
final ByteBuf frame =
491
- RequestChannelFrameFlyweight .encodeReleasingPayload (
487
+ RequestChannelFrameCodec .encodeReleasingPayload (
492
488
allocator , streamId , false , n , initialPayload );
493
489
494
490
senders .put (streamId , upstreamSubscriber );
@@ -508,14 +504,14 @@ void hookOnRemainingRequests(long n) {
508
504
return ;
509
505
}
510
506
511
- sendProcessor .onNext (RequestNFrameFlyweight .encode (allocator , streamId , n ));
507
+ sendProcessor .onNext (RequestNFrameCodec .encode (allocator , streamId , n ));
512
508
}
513
509
514
510
@ Override
515
511
void hookOnCancel () {
516
512
senders .remove (streamId , upstreamSubscriber );
517
513
if (receivers .remove (streamId , receiver )) {
518
- sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
514
+ sendProcessor .onNext (CancelFrameCodec .encode (allocator , streamId ));
519
515
}
520
516
}
521
517
@@ -562,7 +558,7 @@ private Mono<Void> handleMetadataPush(Payload payload) {
562
558
}
563
559
564
560
ByteBuf metadataPushFrame =
565
- MetadataPushFrameFlyweight .encodeReleasingPayload (allocator , payload );
561
+ MetadataPushFrameCodec .encodeReleasingPayload (allocator , payload );
566
562
567
563
sendProcessor .onNextPrioritized (metadataPushFrame );
568
564
@@ -585,8 +581,8 @@ private Throwable checkAvailable() {
585
581
586
582
private void handleIncomingFrames (ByteBuf frame ) {
587
583
try {
588
- int streamId = FrameHeaderFlyweight .streamId (frame );
589
- FrameType type = FrameHeaderFlyweight .frameType (frame );
584
+ int streamId = FrameHeaderCodec .streamId (frame );
585
+ FrameType type = FrameHeaderCodec .frameType (frame );
590
586
if (streamId == 0 ) {
591
587
handleStreamZero (type , frame );
592
588
} else {
@@ -666,7 +662,7 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) {
666
662
{
667
663
Subscription sender = senders .get (streamId );
668
664
if (sender != null ) {
669
- long n = RequestNFrameFlyweight .requestN (frame );
665
+ long n = RequestNFrameCodec .requestN (frame );
670
666
sender .request (n );
671
667
}
672
668
break ;
@@ -682,7 +678,7 @@ private void handleMissingResponseProcessor(int streamId, FrameType type, ByteBu
682
678
if (type == FrameType .ERROR ) {
683
679
// message for stream that has never existed, we have a problem with
684
680
// the overall connection and must tear down
685
- String errorMessage = ErrorFrameFlyweight .dataUtf8 (frame );
681
+ String errorMessage = ErrorFrameCodec .dataUtf8 (frame );
686
682
687
683
throw new IllegalStateException (
688
684
"Client received error for non-existent stream: "
0 commit comments