|
2 | 2 |
|
3 | 3 | import io.netty.buffer.ByteBuf;
|
4 | 4 | import io.netty.channel.Channel;
|
5 |
| -import io.netty.channel.ChannelPromise; |
6 | 5 | import io.netty.channel.EventLoop;
|
7 | 6 | import io.netty.util.ReferenceCountUtil;
|
8 | 7 | import io.netty.util.ReferenceCounted;
|
@@ -85,26 +84,22 @@ class SendPublisher<V extends ReferenceCounted> extends Flux<ByteBuf> {
|
85 | 84 | fuse = queue instanceof Fuseable.QueueSubscription;
|
86 | 85 | }
|
87 | 86 |
|
88 |
| - private ChannelPromise writeCleanupPromise(V poll) { |
89 |
| - return channel |
90 |
| - .newPromise() |
91 |
| - .addListener( |
92 |
| - future -> { |
93 |
| - if (requested != Long.MAX_VALUE) { |
94 |
| - requested--; |
95 |
| - } |
96 |
| - requestedUpstream--; |
97 |
| - pending--; |
| 87 | + @SuppressWarnings("unchecked") |
| 88 | + private void writeCleanup(V poll) { |
| 89 | + if (requested != Long.MAX_VALUE) { |
| 90 | + requested--; |
| 91 | + } |
| 92 | + requestedUpstream--; |
| 93 | + pending--; |
98 | 94 |
|
99 |
| - InnerSubscriber is = (InnerSubscriber) INNER_SUBSCRIBER.get(SendPublisher.this); |
100 |
| - if (is != null) { |
101 |
| - is.tryRequestMoreUpstream(); |
102 |
| - tryComplete(is); |
103 |
| - } |
104 |
| - if (poll.refCnt() > 0) { |
105 |
| - ReferenceCountUtil.safeRelease(poll); |
106 |
| - } |
107 |
| - }); |
| 95 | + InnerSubscriber is = (InnerSubscriber) INNER_SUBSCRIBER.get(SendPublisher.this); |
| 96 | + if (is != null) { |
| 97 | + is.tryRequestMoreUpstream(); |
| 98 | + tryComplete(is); |
| 99 | + } |
| 100 | + if (poll.refCnt() > 0) { |
| 101 | + ReferenceCountUtil.safeRelease(poll); |
| 102 | + } |
108 | 103 | }
|
109 | 104 |
|
110 | 105 | private void tryComplete(InnerSubscriber is) {
|
@@ -207,7 +202,7 @@ private void flush() {
|
207 | 202 | }
|
208 | 203 |
|
209 | 204 | private void tryDrain() {
|
210 |
| - if (wip == 0 && terminated == 0 && WIP.getAndIncrement(SendPublisher.this) == 0) { |
| 205 | + if (terminated == 0 && WIP.getAndIncrement(SendPublisher.this) == 0) { |
211 | 206 | try {
|
212 | 207 | if (eventLoop.inEventLoop()) {
|
213 | 208 | drain();
|
@@ -235,11 +230,29 @@ private void drain() {
|
235 | 230 | int readableBytes = sizeOf.size(poll);
|
236 | 231 | pending++;
|
237 | 232 | if (channel.isWritable() && readableBytes <= channel.bytesBeforeUnwritable()) {
|
238 |
| - channel.write(poll, writeCleanupPromise(poll)); |
| 233 | + channel |
| 234 | + .write(poll) |
| 235 | + .addListener( |
| 236 | + future -> { |
| 237 | + if (future.cause() != null) { |
| 238 | + onError(future.cause()); |
| 239 | + } else { |
| 240 | + writeCleanup(poll); |
| 241 | + } |
| 242 | + }); |
239 | 243 | scheduleFlush = true;
|
240 | 244 | } else {
|
241 | 245 | scheduleFlush = false;
|
242 |
| - channel.writeAndFlush(poll, writeCleanupPromise(poll)); |
| 246 | + channel |
| 247 | + .writeAndFlush(poll) |
| 248 | + .addListener( |
| 249 | + future -> { |
| 250 | + if (future.cause() != null) { |
| 251 | + onError(future.cause()); |
| 252 | + } else { |
| 253 | + writeCleanup(poll); |
| 254 | + } |
| 255 | + }); |
243 | 256 | }
|
244 | 257 |
|
245 | 258 | tryRequestMoreUpstream();
|
|
0 commit comments