Skip to content

Commit 3a5a4c2

Browse files
authored
Fixes Payload#hasMetadata to strictly flow the flag in frame
This PR ensures that Payload#hasMetadata will have identically value as it was encoded in frame decoded to Payload. Also, this PR ensures that during the encoding the current behavior of Payload#metadata \ Payload#sliceMetadata which always return a buffer (even though them hasMetadata is false) will not brake encoding process and the result flag in the encoded frame will be mirroring exactly what `Payload#hasMetadata` returned * fixes behaviour of flight-weights when it gets unreadable buffers ensures that ByteBufPayload is not accessible anymore when it has been released Signed-off-by: Oleh Dokuka <[email protected]> * partial Signed-off-by: Oleh Dokuka <[email protected]> * fixes incorrect request propagation Initially that issue was hidden because both sides uses limitRate which does prefetch 256 elements in advance so it is almost impossible to track underflow in request Signed-off-by: Oleh Dokuka <[email protected]> * provides refactoring related to ensuring that hasMetadata is propagated correctly Right now there was observed a few issues related to that Payload with no metadata was incorrectly incoded so it results to hasMetadata true on the received side Also, optimized the API of Flyweights to ensure that we have common things in one place Signed-off-by: Oleh Dokuka <[email protected]> * fixes failing tests related to changes of initialRequestN Signed-off-by: Oleh Dokuka <[email protected]> * fixes compilation errors Signed-off-by: Oleh Dokuka <[email protected]> * uncomment assertions related to proper metadata propagation Signed-off-by: Oleh Dokuka <[email protected]> * moves payload releasing to codecs Signed-off-by: Oleh Dokuka <[email protected]>
1 parent cd67e54 commit 3a5a4c2

36 files changed

+872
-589
lines changed

rsocket-core/src/main/java/io/rsocket/core/DefaultConnectionSetupPayload.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.rsocket.core;
1818

1919
import io.netty.buffer.ByteBuf;
20+
import io.netty.buffer.Unpooled;
2021
import io.rsocket.ConnectionSetupPayload;
2122
import io.rsocket.frame.FrameHeaderFlyweight;
2223
import io.rsocket.frame.SetupFrameFlyweight;
@@ -40,7 +41,8 @@ public boolean hasMetadata() {
4041

4142
@Override
4243
public ByteBuf sliceMetadata() {
43-
return SetupFrameFlyweight.metadata(setupFrame);
44+
final ByteBuf metadata = SetupFrameFlyweight.metadata(setupFrame);
45+
return metadata == null ? Unpooled.EMPTY_BUFFER : metadata;
4446
}
4547

4648
@Override

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 13 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -212,13 +212,8 @@ private Mono<Void> handleFireAndForget(Payload payload) {
212212
return UnicastMonoEmpty.newInstance(
213213
() -> {
214214
ByteBuf requestFrame =
215-
RequestFireAndForgetFrameFlyweight.encode(
216-
allocator,
217-
streamId,
218-
false,
219-
payload.hasMetadata() ? payload.sliceMetadata().retain() : null,
220-
payload.sliceData().retain());
221-
payload.release();
215+
RequestFireAndForgetFrameFlyweight.encodeReleasingPayload(
216+
allocator, streamId, payload);
222217

223218
sendProcessor.onNext(requestFrame);
224219
});
@@ -245,13 +240,8 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
245240
@Override
246241
public void doOnSubscribe() {
247242
final ByteBuf requestFrame =
248-
RequestResponseFrameFlyweight.encode(
249-
allocator,
250-
streamId,
251-
false,
252-
payload.sliceMetadata().retain(),
253-
payload.sliceData().retain());
254-
payload.release();
243+
RequestResponseFrameFlyweight.encodeReleasingPayload(
244+
allocator, streamId, payload);
255245

256246
sendProcessor.onNext(requestFrame);
257247
}
@@ -302,16 +292,10 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
302292
public void accept(long n) {
303293
if (firstRequest && !receiver.isDisposed()) {
304294
firstRequest = false;
305-
sendProcessor.onNext(
306-
RequestStreamFrameFlyweight.encode(
307-
allocator,
308-
streamId,
309-
false,
310-
n,
311-
payload.sliceMetadata().retain(),
312-
payload.sliceData().retain()));
313295
if (!payloadReleasedFlag.getAndSet(true)) {
314-
payload.release();
296+
sendProcessor.onNext(
297+
RequestStreamFrameFlyweight.encodeReleasingPayload(
298+
allocator, streamId, n, payload));
315299
}
316300
} else if (contains(streamId) && !receiver.isDisposed()) {
317301
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
@@ -400,10 +384,9 @@ protected void hookOnNext(Payload payload) {
400384
return;
401385
}
402386
final ByteBuf frame =
403-
PayloadFrameFlyweight.encode(allocator, streamId, false, false, true, payload);
387+
PayloadFrameFlyweight.encodeNextReleasingPayload(allocator, streamId, payload);
404388

405389
sendProcessor.onNext(frame);
406-
payload.release();
407390
}
408391

409392
@Override
@@ -444,18 +427,10 @@ public void accept(long n) {
444427
.subscribe(upstreamSubscriber);
445428
if (!payloadReleasedFlag.getAndSet(true)) {
446429
ByteBuf frame =
447-
RequestChannelFrameFlyweight.encode(
448-
allocator,
449-
streamId,
450-
false,
451-
false,
452-
n,
453-
initialPayload.sliceMetadata().retain(),
454-
initialPayload.sliceData().retain());
430+
RequestChannelFrameFlyweight.encodeReleasingPayload(
431+
allocator, streamId, false, n, initialPayload);
455432

456433
sendProcessor.onNext(frame);
457-
458-
initialPayload.release();
459434
}
460435
} else {
461436
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
@@ -497,8 +472,7 @@ private Mono<Void> handleMetadataPush(Payload payload) {
497472
return UnicastMonoEmpty.newInstance(
498473
() -> {
499474
ByteBuf metadataPushFrame =
500-
MetadataPushFrameFlyweight.encode(allocator, payload.sliceMetadata().retain());
501-
payload.release();
475+
MetadataPushFrameFlyweight.encodeReleasingPayload(allocator, payload);
502476

503477
sendProcessor.onNextPrioritized(metadataPushFrame);
504478
});
@@ -604,8 +578,8 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) {
604578
{
605579
Subscription sender = senders.get(streamId);
606580
if (sender != null) {
607-
int n = RequestNFrameFlyweight.requestN(frame);
608-
sender.request(n >= Integer.MAX_VALUE ? Long.MAX_VALUE : n);
581+
long n = RequestNFrameFlyweight.requestN(frame);
582+
sender.request(n);
609583
}
610584
break;
611585
}

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 12 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -301,12 +301,12 @@ private void handleFrame(ByteBuf frame) {
301301
handleRequestN(streamId, frame);
302302
break;
303303
case REQUEST_STREAM:
304-
int streamInitialRequestN = RequestStreamFrameFlyweight.initialRequestN(frame);
304+
long streamInitialRequestN = RequestStreamFrameFlyweight.initialRequestN(frame);
305305
Payload streamPayload = payloadDecoder.apply(frame);
306306
handleStream(streamId, requestStream(streamPayload), streamInitialRequestN, null);
307307
break;
308308
case REQUEST_CHANNEL:
309-
int channelInitialRequestN = RequestChannelFrameFlyweight.initialRequestN(frame);
309+
long channelInitialRequestN = RequestChannelFrameFlyweight.initialRequestN(frame);
310310
Payload channelPayload = payloadDecoder.apply(frame);
311311
handleChannel(streamId, channelPayload, channelInitialRequestN);
312312
break;
@@ -399,16 +399,9 @@ protected void hookOnNext(Payload payload) {
399399
return;
400400
}
401401

402-
ByteBuf byteBuf;
403-
try {
404-
byteBuf = PayloadFrameFlyweight.encodeNextComplete(allocator, streamId, payload);
405-
} catch (Throwable t) {
406-
payload.release();
407-
throw Exceptions.propagate(t);
408-
}
409-
410-
payload.release();
411-
402+
ByteBuf byteBuf =
403+
PayloadFrameFlyweight.encodeNextCompleteReleasingPayload(
404+
allocator, streamId, payload);
412405
sendProcessor.onNext(byteBuf);
413406
}
414407

@@ -437,14 +430,14 @@ protected void hookFinally(SignalType type) {
437430
private void handleStream(
438431
int streamId,
439432
Flux<Payload> response,
440-
int initialRequestN,
433+
long initialRequestN,
441434
@Nullable UnicastProcessor<Payload> requestChannel) {
442435
final BaseSubscriber<Payload> subscriber =
443436
new BaseSubscriber<Payload>() {
444437

445438
@Override
446439
protected void hookOnSubscribe(Subscription s) {
447-
s.request(initialRequestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : initialRequestN);
440+
s.request(initialRequestN);
448441
}
449442

450443
@Override
@@ -469,16 +462,8 @@ protected void hookOnNext(Payload payload) {
469462
return;
470463
}
471464

472-
ByteBuf byteBuf;
473-
try {
474-
byteBuf = PayloadFrameFlyweight.encodeNext(allocator, streamId, payload);
475-
} catch (Throwable t) {
476-
payload.release();
477-
throw Exceptions.propagate(t);
478-
}
479-
480-
payload.release();
481-
465+
ByteBuf byteBuf =
466+
PayloadFrameFlyweight.encodeNextReleasingPayload(allocator, streamId, payload);
482467
sendProcessor.onNext(byteBuf);
483468
}
484469

@@ -523,7 +508,7 @@ protected void hookFinally(SignalType type) {
523508
.subscribe(subscriber);
524509
}
525510

526-
private void handleChannel(int streamId, Payload payload, int initialRequestN) {
511+
private void handleChannel(int streamId, Payload payload, long initialRequestN) {
527512
UnicastProcessor<Payload> frames = UnicastProcessor.create();
528513
channelProcessors.put(streamId, frames);
529514

@@ -602,8 +587,8 @@ private void handleRequestN(int streamId, ByteBuf frame) {
602587
Subscription subscription = sendingSubscriptions.get(streamId);
603588

604589
if (subscription != null) {
605-
int n = RequestNFrameFlyweight.requestN(frame);
606-
subscription.request(n >= Integer.MAX_VALUE ? Long.MAX_VALUE : n);
590+
long n = RequestNFrameFlyweight.requestN(frame);
591+
subscription.request(n);
607592
}
608593
}
609594
}

rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,8 @@ void handleFollowsFlag(ByteBuf frame, int streamId, FrameType frameType) {
166166
header = frame.copy(frame.readerIndex(), FrameHeaderFlyweight.size());
167167

168168
if (frameType == FrameType.REQUEST_CHANNEL || frameType == FrameType.REQUEST_STREAM) {
169-
int i = RequestChannelFrameFlyweight.initialRequestN(frame);
170-
header.writeInt(i);
169+
long i = RequestChannelFrameFlyweight.initialRequestN(frame);
170+
header.writeInt(i > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) i);
171171
}
172172
putHeader(streamId, header);
173173
}
@@ -261,10 +261,16 @@ void reassembleFrame(ByteBuf frame, SynchronousSink<ByteBuf> sink) {
261261
private ByteBuf assembleFrameWithMetadata(ByteBuf frame, int streamId, ByteBuf header) {
262262
ByteBuf metadata;
263263
CompositeByteBuf cm = removeMetadata(streamId);
264-
if (cm != null) {
265-
metadata = cm.addComponents(true, PayloadFrameFlyweight.metadata(frame).retain());
264+
265+
ByteBuf decodedMetadata = PayloadFrameFlyweight.metadata(frame);
266+
if (decodedMetadata != null) {
267+
if (cm != null) {
268+
metadata = cm.addComponents(true, decodedMetadata.retain());
269+
} else {
270+
metadata = PayloadFrameFlyweight.metadata(frame).retain();
271+
}
266272
} else {
267-
metadata = PayloadFrameFlyweight.metadata(frame).retain();
273+
metadata = cm != null ? cm : null;
268274
}
269275

270276
ByteBuf data = assembleData(frame, streamId);

rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -30,38 +30,35 @@ private static int decodeLength(final ByteBuf byteBuf) {
3030
return length;
3131
}
3232

33-
static ByteBuf encodeOnlyMetadata(
34-
ByteBufAllocator allocator, final ByteBuf header, ByteBuf metadata) {
35-
return allocator.compositeBuffer(2).addComponents(true, header, metadata);
36-
}
37-
38-
static ByteBuf encodeOnlyData(ByteBufAllocator allocator, final ByteBuf header, ByteBuf data) {
39-
return allocator.compositeBuffer(2).addComponents(true, header, data);
40-
}
41-
4233
static ByteBuf encode(
43-
ByteBufAllocator allocator, final ByteBuf header, ByteBuf metadata, ByteBuf data) {
34+
ByteBufAllocator allocator,
35+
final ByteBuf header,
36+
ByteBuf metadata,
37+
boolean hasMetadata,
38+
ByteBuf data) {
4439

45-
int length = metadata.readableBytes();
46-
encodeLength(header, length);
47-
return allocator.compositeBuffer(3).addComponents(true, header, metadata, data);
48-
}
40+
final boolean addData = data != null && data.isReadable();
41+
final boolean addMetadata = hasMetadata && metadata.isReadable();
4942

50-
static ByteBuf metadataWithoutMarking(ByteBuf byteBuf, boolean hasMetadata) {
5143
if (hasMetadata) {
52-
int length = decodeLength(byteBuf);
53-
return byteBuf.readSlice(length);
44+
int length = metadata.readableBytes();
45+
encodeLength(header, length);
46+
}
47+
48+
if (addMetadata && addData) {
49+
return allocator.compositeBuffer(3).addComponents(true, header, metadata, data);
50+
} else if (addMetadata) {
51+
return allocator.compositeBuffer(2).addComponents(true, header, metadata);
52+
} else if (addData) {
53+
return allocator.compositeBuffer(2).addComponents(true, header, data);
5454
} else {
55-
return Unpooled.EMPTY_BUFFER;
55+
return header;
5656
}
5757
}
5858

59-
static ByteBuf metadata(ByteBuf byteBuf, boolean hasMetadata) {
60-
byteBuf.markReaderIndex();
61-
byteBuf.skipBytes(6);
62-
ByteBuf metadata = metadataWithoutMarking(byteBuf, hasMetadata);
63-
byteBuf.resetReaderIndex();
64-
return metadata;
59+
static ByteBuf metadataWithoutMarking(ByteBuf byteBuf) {
60+
int length = decodeLength(byteBuf);
61+
return byteBuf.readSlice(length);
6562
}
6663

6764
static ByteBuf dataWithoutMarking(ByteBuf byteBuf, boolean hasMetadata) {
@@ -76,12 +73,4 @@ static ByteBuf dataWithoutMarking(ByteBuf byteBuf, boolean hasMetadata) {
7673
return Unpooled.EMPTY_BUFFER;
7774
}
7875
}
79-
80-
static ByteBuf data(ByteBuf byteBuf, boolean hasMetadata) {
81-
byteBuf.markReaderIndex();
82-
byteBuf.skipBytes(6);
83-
ByteBuf data = dataWithoutMarking(byteBuf, hasMetadata);
84-
byteBuf.resetReaderIndex();
85-
return data;
86-
}
8776
}

rsocket-core/src/main/java/io/rsocket/frame/ExtensionFrameFlyweight.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,18 @@ public static ByteBuf encode(
1414
@Nullable ByteBuf metadata,
1515
ByteBuf data) {
1616

17+
final boolean hasMetadata = metadata != null;
18+
1719
int flags = FrameHeaderFlyweight.FLAGS_I;
1820

19-
if (metadata != null) {
21+
if (hasMetadata) {
2022
flags |= FrameHeaderFlyweight.FLAGS_M;
2123
}
2224

23-
ByteBuf header = FrameHeaderFlyweight.encode(allocator, streamId, FrameType.EXT, flags);
25+
final ByteBuf header = FrameHeaderFlyweight.encode(allocator, streamId, FrameType.EXT, flags);
2426
header.writeInt(extendedType);
25-
if (data == null && metadata == null) {
26-
return header;
27-
} else if (metadata != null) {
28-
return DataAndMetadataFlyweight.encode(allocator, header, metadata, data);
29-
} else {
30-
return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data);
31-
}
27+
28+
return DataAndMetadataFlyweight.encode(allocator, header, metadata, hasMetadata, data);
3229
}
3330

3431
public static int extendedType(ByteBuf byteBuf) {
@@ -56,10 +53,13 @@ public static ByteBuf metadata(ByteBuf byteBuf) {
5653
FrameHeaderFlyweight.ensureFrameType(FrameType.EXT, byteBuf);
5754

5855
boolean hasMetadata = FrameHeaderFlyweight.hasMetadata(byteBuf);
56+
if (!hasMetadata) {
57+
return null;
58+
}
5959
byteBuf.markReaderIndex();
6060
// Extended type
6161
byteBuf.skipBytes(FrameHeaderFlyweight.size() + Integer.BYTES);
62-
ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf, hasMetadata);
62+
ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf);
6363
byteBuf.resetReaderIndex();
6464
return metadata;
6565
}

rsocket-core/src/main/java/io/rsocket/frame/FragmentationFlyweight.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,7 @@ public static ByteBuf encode(final ByteBufAllocator allocator, ByteBuf header, B
1313
public static ByteBuf encode(
1414
final ByteBufAllocator allocator, ByteBuf header, @Nullable ByteBuf metadata, ByteBuf data) {
1515

16-
if (data == null && metadata == null) {
17-
return header;
18-
} else if (metadata != null) {
19-
return DataAndMetadataFlyweight.encode(allocator, header, metadata, data);
20-
} else {
21-
return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data);
22-
}
16+
final boolean hasMetadata = metadata != null;
17+
return DataAndMetadataFlyweight.encode(allocator, header, metadata, hasMetadata, data);
2318
}
2419
}

rsocket-core/src/main/java/io/rsocket/frame/KeepAliveFrameFlyweight.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public static ByteBuf encode(
2929

3030
header.writeLong(lp);
3131

32-
return DataAndMetadataFlyweight.encodeOnlyData(allocator, header, data);
32+
return DataAndMetadataFlyweight.encode(allocator, header, null, false, data);
3333
}
3434

3535
public static boolean respondFlag(ByteBuf byteBuf) {

0 commit comments

Comments
 (0)