Skip to content

Commit 7c87f6c

Browse files
committed
fix issue when long running channels eventually hang under load
Signed-off-by: Maksym Ostroverkhov <[email protected]>
1 parent de7ee88 commit 7c87f6c

File tree

1 file changed

+36
-23
lines changed

1 file changed

+36
-23
lines changed

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/SendPublisher.java

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import io.netty.buffer.ByteBuf;
44
import io.netty.channel.Channel;
5-
import io.netty.channel.ChannelPromise;
65
import io.netty.channel.EventLoop;
76
import io.netty.util.ReferenceCountUtil;
87
import io.netty.util.ReferenceCounted;
@@ -85,26 +84,22 @@ class SendPublisher<V extends ReferenceCounted> extends Flux<ByteBuf> {
8584
fuse = queue instanceof Fuseable.QueueSubscription;
8685
}
8786

88-
private ChannelPromise writeCleanupPromise(V poll) {
89-
return channel
90-
.newPromise()
91-
.addListener(
92-
future -> {
93-
if (requested != Long.MAX_VALUE) {
94-
requested--;
95-
}
96-
requestedUpstream--;
97-
pending--;
87+
@SuppressWarnings("unchecked")
88+
private void writeCleanup(V poll) {
89+
if (requested != Long.MAX_VALUE) {
90+
requested--;
91+
}
92+
requestedUpstream--;
93+
pending--;
9894

99-
InnerSubscriber is = (InnerSubscriber) INNER_SUBSCRIBER.get(SendPublisher.this);
100-
if (is != null) {
101-
is.tryRequestMoreUpstream();
102-
tryComplete(is);
103-
}
104-
if (poll.refCnt() > 0) {
105-
ReferenceCountUtil.safeRelease(poll);
106-
}
107-
});
95+
InnerSubscriber is = (InnerSubscriber) INNER_SUBSCRIBER.get(SendPublisher.this);
96+
if (is != null) {
97+
is.tryRequestMoreUpstream();
98+
tryComplete(is);
99+
}
100+
if (poll.refCnt() > 0) {
101+
ReferenceCountUtil.safeRelease(poll);
102+
}
108103
}
109104

110105
private void tryComplete(InnerSubscriber is) {
@@ -207,7 +202,7 @@ private void flush() {
207202
}
208203

209204
private void tryDrain() {
210-
if (wip == 0 && terminated == 0 && WIP.getAndIncrement(SendPublisher.this) == 0) {
205+
if (terminated == 0 && WIP.getAndIncrement(SendPublisher.this) == 0) {
211206
try {
212207
if (eventLoop.inEventLoop()) {
213208
drain();
@@ -235,11 +230,29 @@ private void drain() {
235230
int readableBytes = sizeOf.size(poll);
236231
pending++;
237232
if (channel.isWritable() && readableBytes <= channel.bytesBeforeUnwritable()) {
238-
channel.write(poll, writeCleanupPromise(poll));
233+
channel
234+
.write(poll)
235+
.addListener(
236+
future -> {
237+
if (future.cause() != null) {
238+
onError(future.cause());
239+
} else {
240+
writeCleanup(poll);
241+
}
242+
});
239243
scheduleFlush = true;
240244
} else {
241245
scheduleFlush = false;
242-
channel.writeAndFlush(poll, writeCleanupPromise(poll));
246+
channel
247+
.writeAndFlush(poll)
248+
.addListener(
249+
future -> {
250+
if (future.cause() != null) {
251+
onError(future.cause());
252+
} else {
253+
writeCleanup(poll);
254+
}
255+
});
243256
}
244257

245258
tryRequestMoreUpstream();

0 commit comments

Comments
 (0)