Skip to content

Commit d6618e3

Browse files
committed
provides extra safety for the data encoding and ensure no leaks
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 1c3927b commit d6618e3

8 files changed

+177
-24
lines changed

rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,34 @@ static ByteBuf encode(
3737
boolean hasMetadata,
3838
ByteBuf data) {
3939

40-
final boolean addData = data != null && data.isReadable();
41-
final boolean addMetadata = hasMetadata && metadata.isReadable();
40+
final boolean addData;
41+
if (data != null) {
42+
if (data.isReadable()) {
43+
addData = true;
44+
} else {
45+
// even though there is nothing to read, we still have to release here since nobody else
46+
// going to do soo
47+
data.release();
48+
addData = false;
49+
}
50+
} else {
51+
addData = false;
52+
}
53+
54+
final boolean addMetadata;
55+
if (hasMetadata) {
56+
if (metadata.isReadable()) {
57+
addMetadata = true;
58+
} else {
59+
// even though there is nothing to read, we still have to release here since nobody else
60+
// going to do soo
61+
metadata.release();
62+
addMetadata = false;
63+
}
64+
} else {
65+
// has no metadata means it is null, thus no need to release anything
66+
addMetadata = false;
67+
}
4268

4369
if (hasMetadata) {
4470
int length = metadata.readableBytes();

rsocket-core/src/main/java/io/rsocket/frame/LeaseFrameFlyweight.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ public static ByteBuf encode(
1414
@Nullable final ByteBuf metadata) {
1515

1616
final boolean hasMetadata = metadata != null;
17-
final boolean addMetadata = hasMetadata && metadata.isReadable();
1817

1918
int flags = 0;
2019

@@ -27,6 +26,21 @@ public static ByteBuf encode(
2726
.writeInt(ttl)
2827
.writeInt(numRequests);
2928

29+
final boolean addMetadata;
30+
if (hasMetadata) {
31+
if (metadata.isReadable()) {
32+
addMetadata = true;
33+
} else {
34+
// even though there is nothing to read, we still have to release here since nobody else
35+
// going to do soo
36+
metadata.release();
37+
addMetadata = false;
38+
}
39+
} else {
40+
// has no metadata means it is null, thus no need to release anything
41+
addMetadata = false;
42+
}
43+
3044
if (addMetadata) {
3145
return allocator.compositeBuffer(2).addComponents(true, header, metadata);
3246
} else {

rsocket-core/src/main/java/io/rsocket/frame/MetadataPushFrameFlyweight.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,21 @@
22

33
import io.netty.buffer.ByteBuf;
44
import io.netty.buffer.ByteBufAllocator;
5+
import io.netty.util.IllegalReferenceCountException;
56
import io.rsocket.Payload;
67

78
public class MetadataPushFrameFlyweight {
89

910
public static ByteBuf encodeReleasingPayload(ByteBufAllocator allocator, Payload payload) {
1011
final ByteBuf metadata = payload.metadata().retain();
11-
payload.release();
12+
// releasing payload safely since it can be already released wheres we have to release retained
13+
// data and metadata as well
14+
try {
15+
payload.release();
16+
} catch (IllegalReferenceCountException e) {
17+
metadata.release();
18+
throw e;
19+
}
1220
return encode(allocator, metadata);
1321
}
1422

rsocket-core/src/main/java/io/rsocket/frame/PayloadFrameFlyweight.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.netty.buffer.ByteBuf;
44
import io.netty.buffer.ByteBufAllocator;
5+
import io.netty.util.IllegalReferenceCountException;
56
import io.rsocket.Payload;
67

78
public class PayloadFrameFlyweight {
@@ -23,11 +24,31 @@ public static ByteBuf encodeNextCompleteReleasingPayload(
2324
static ByteBuf encodeReleasingPayload(
2425
ByteBufAllocator allocator, int streamId, boolean complete, Payload payload) {
2526

26-
final boolean hasMetadata = payload.hasMetadata();
27+
// if refCnt exceptions throws here it is safe to do no-op
28+
boolean hasMetadata = payload.hasMetadata();
29+
// if refCnt exceptions throws here it is safe to do no-op still
2730
final ByteBuf metadata = hasMetadata ? payload.metadata().retain() : null;
28-
final ByteBuf data = payload.data().retain();
29-
30-
payload.release();
31+
final ByteBuf data;
32+
// retaining data safely. May throw either NPE or RefCntE
33+
try {
34+
data = payload.data().retain();
35+
} catch (IllegalReferenceCountException | NullPointerException e) {
36+
if (hasMetadata) {
37+
metadata.release();
38+
}
39+
throw e;
40+
}
41+
// releasing payload safely since it can be already released wheres we have to release retained
42+
// data and metadata as well
43+
try {
44+
payload.release();
45+
} catch (IllegalReferenceCountException e) {
46+
data.release();
47+
if (hasMetadata) {
48+
metadata.release();
49+
}
50+
throw e;
51+
}
3152

3253
return encode(allocator, streamId, false, complete, true, metadata, data);
3354
}

rsocket-core/src/main/java/io/rsocket/frame/RequestChannelFrameFlyweight.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.netty.buffer.ByteBuf;
44
import io.netty.buffer.ByteBufAllocator;
5+
import io.netty.util.IllegalReferenceCountException;
56
import io.rsocket.Payload;
67

78
public class RequestChannelFrameFlyweight {
@@ -17,11 +18,31 @@ public static ByteBuf encodeReleasingPayload(
1718
long initialRequestN,
1819
Payload payload) {
1920

20-
final boolean hasMetadata = payload.hasMetadata();
21+
// if refCnt exceptions throws here it is safe to do no-op
22+
boolean hasMetadata = payload.hasMetadata();
23+
// if refCnt exceptions throws here it is safe to do no-op still
2124
final ByteBuf metadata = hasMetadata ? payload.metadata().retain() : null;
22-
final ByteBuf data = payload.data().retain();
23-
24-
payload.release();
25+
final ByteBuf data;
26+
// retaining data safely. May throw either NPE or RefCntE
27+
try {
28+
data = payload.data().retain();
29+
} catch (IllegalReferenceCountException | NullPointerException e) {
30+
if (hasMetadata) {
31+
metadata.release();
32+
}
33+
throw e;
34+
}
35+
// releasing payload safely since it can be already released wheres we have to release retained
36+
// data and metadata as well
37+
try {
38+
payload.release();
39+
} catch (IllegalReferenceCountException e) {
40+
data.release();
41+
if (hasMetadata) {
42+
metadata.release();
43+
}
44+
throw e;
45+
}
2546

2647
return encode(allocator, streamId, false, complete, initialRequestN, metadata, data);
2748
}

rsocket-core/src/main/java/io/rsocket/frame/RequestFireAndForgetFrameFlyweight.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.netty.buffer.ByteBuf;
44
import io.netty.buffer.ByteBufAllocator;
5+
import io.netty.util.IllegalReferenceCountException;
56
import io.rsocket.Payload;
67

78
public class RequestFireAndForgetFrameFlyweight {
@@ -13,11 +14,31 @@ private RequestFireAndForgetFrameFlyweight() {}
1314
public static ByteBuf encodeReleasingPayload(
1415
ByteBufAllocator allocator, int streamId, Payload payload) {
1516

16-
final boolean hasMetadata = payload.hasMetadata();
17+
// if refCnt exceptions throws here it is safe to do no-op
18+
boolean hasMetadata = payload.hasMetadata();
19+
// if refCnt exceptions throws here it is safe to do no-op still
1720
final ByteBuf metadata = hasMetadata ? payload.metadata().retain() : null;
18-
final ByteBuf data = payload.data().retain();
19-
20-
payload.release();
21+
final ByteBuf data;
22+
// retaining data safely. May throw either NPE or RefCntE
23+
try {
24+
data = payload.data().retain();
25+
} catch (IllegalReferenceCountException | NullPointerException e) {
26+
if (hasMetadata) {
27+
metadata.release();
28+
}
29+
throw e;
30+
}
31+
// releasing payload safely since it can be already released wheres we have to release retained
32+
// data and metadata as well
33+
try {
34+
payload.release();
35+
} catch (IllegalReferenceCountException e) {
36+
data.release();
37+
if (hasMetadata) {
38+
metadata.release();
39+
}
40+
throw e;
41+
}
2142

2243
return FLYWEIGHT.encode(allocator, streamId, false, metadata, data);
2344
}

rsocket-core/src/main/java/io/rsocket/frame/RequestResponseFrameFlyweight.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.netty.buffer.ByteBuf;
44
import io.netty.buffer.ByteBufAllocator;
5+
import io.netty.util.IllegalReferenceCountException;
56
import io.rsocket.Payload;
67

78
public class RequestResponseFrameFlyweight {
@@ -13,11 +14,31 @@ private RequestResponseFrameFlyweight() {}
1314
public static ByteBuf encodeReleasingPayload(
1415
ByteBufAllocator allocator, int streamId, Payload payload) {
1516

16-
final boolean hasMetadata = payload.hasMetadata();
17+
// if refCnt exceptions throws here it is safe to do no-op
18+
boolean hasMetadata = payload.hasMetadata();
19+
// if refCnt exceptions throws here it is safe to do no-op still
1720
final ByteBuf metadata = hasMetadata ? payload.metadata().retain() : null;
18-
final ByteBuf data = payload.data().retain();
19-
20-
payload.release();
21+
final ByteBuf data;
22+
// retaining data safely. May throw either NPE or RefCntE
23+
try {
24+
data = payload.data().retain();
25+
} catch (IllegalReferenceCountException | NullPointerException e) {
26+
if (hasMetadata) {
27+
metadata.release();
28+
}
29+
throw e;
30+
}
31+
// releasing payload safely since it can be already released wheres we have to release retained
32+
// data and metadata as well
33+
try {
34+
payload.release();
35+
} catch (IllegalReferenceCountException e) {
36+
data.release();
37+
if (hasMetadata) {
38+
metadata.release();
39+
}
40+
throw e;
41+
}
2142

2243
return encode(allocator, streamId, false, metadata, data);
2344
}

rsocket-core/src/main/java/io/rsocket/frame/RequestStreamFrameFlyweight.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.netty.buffer.ByteBuf;
44
import io.netty.buffer.ByteBufAllocator;
5+
import io.netty.util.IllegalReferenceCountException;
56
import io.rsocket.Payload;
67

78
public class RequestStreamFrameFlyweight {
@@ -13,11 +14,31 @@ private RequestStreamFrameFlyweight() {}
1314
public static ByteBuf encodeReleasingPayload(
1415
ByteBufAllocator allocator, int streamId, long initialRequestN, Payload payload) {
1516

16-
final boolean hasMetadata = payload.hasMetadata();
17+
// if refCnt exceptions throws here it is safe to do no-op
18+
boolean hasMetadata = payload.hasMetadata();
19+
// if refCnt exceptions throws here it is safe to do no-op still
1720
final ByteBuf metadata = hasMetadata ? payload.metadata().retain() : null;
18-
final ByteBuf data = payload.data().retain();
19-
20-
payload.release();
21+
final ByteBuf data;
22+
// retaining data safely. May throw either NPE or RefCntE
23+
try {
24+
data = payload.data().retain();
25+
} catch (IllegalReferenceCountException | NullPointerException e) {
26+
if (hasMetadata) {
27+
metadata.release();
28+
}
29+
throw e;
30+
}
31+
// releasing payload safely since it can be already released wheres we have to release retained
32+
// data and metadata as well
33+
try {
34+
payload.release();
35+
} catch (IllegalReferenceCountException e) {
36+
data.release();
37+
if (hasMetadata) {
38+
metadata.release();
39+
}
40+
throw e;
41+
}
2142

2243
return encode(allocator, streamId, false, initialRequestN, metadata, data);
2344
}

0 commit comments

Comments
 (0)