@@ -179,7 +179,7 @@ private static class PublisherAdapter implements Publisher<ByteBuffer> {
179
179
private final StreamedHttpResponse response ;
180
180
private final ChannelHandlerContext channelContext ;
181
181
private final RequestContext requestContext ;
182
- private final AtomicBoolean isCancelled = new AtomicBoolean (false );
182
+ private final AtomicBoolean isDone = new AtomicBoolean (false );
183
183
184
184
private PublisherAdapter (StreamedHttpResponse response , ChannelHandlerContext channelContext ,
185
185
RequestContext requestContext ) {
@@ -209,8 +209,10 @@ private Subscription resolveSubscription(Subscription subscription) {
209
209
}
210
210
211
211
private void onCancel () {
212
+ if (!isDone .compareAndSet (false , true )) {
213
+ return ;
214
+ }
212
215
try {
213
- isCancelled .set (true );
214
216
requestContext .handler ().exceptionOccurred (
215
217
new SdkCancellationException ("Subscriber cancelled before all events were published" ));
216
218
} finally {
@@ -221,6 +223,10 @@ private void onCancel() {
221
223
222
224
@ Override
223
225
public void onNext (HttpContent httpContent ) {
226
+ // isDone may be true if the subscriber cancelled
227
+ if (isDone .get ()) {
228
+ return ;
229
+ }
224
230
// Needed to prevent use-after-free bug if the subscriber's onNext is asynchronous
225
231
ByteBuffer b = copyToByteBuffer (httpContent .content ());
226
232
httpContent .release ();
@@ -230,7 +236,7 @@ public void onNext(HttpContent httpContent) {
230
236
231
237
@ Override
232
238
public void onError (Throwable t ) {
233
- if (isCancelled . get ( )) {
239
+ if (! isDone . compareAndSet ( false , true )) {
234
240
return ;
235
241
}
236
242
try {
@@ -247,7 +253,7 @@ public void onError(Throwable t) {
247
253
public void onComplete () {
248
254
// For HTTP/2 it's possible to get an onComplete after we cancel due to the channel becoming
249
255
// inactive. We guard against that here and just ignore the signal (see HandlerPublisher)
250
- if (isCancelled . get ( )) {
256
+ if (! isDone . compareAndSet ( false , true )) {
251
257
return ;
252
258
}
253
259
try {
0 commit comments