@@ -123,10 +123,12 @@ private SplittingTransformer(AsyncResponseTransformer<ResponseT, ResultT> upstre
123
123
maximumBufferSizeInBytes , "maximumBufferSizeInBytes" );
124
124
125
125
this .resultFuture .whenComplete ((r , e ) -> {
126
+ log .trace (() -> "result future whenComplete" );
126
127
if (e == null ) {
127
128
return ;
128
129
}
129
130
if (isCancelled .compareAndSet (false , true )) {
131
+ log .trace (() -> "result future whenComplete with isCancelled=true" );
130
132
handleFutureCancel (e );
131
133
}
132
134
});
@@ -167,6 +169,7 @@ public void request(long n) {
167
169
168
170
@ Override
169
171
public void cancel () {
172
+ log .trace (() -> String .format ("received cancel signal. Current cancel state is 'isCancelled=%s'" , isCancelled .get ()));
170
173
if (isCancelled .compareAndSet (false , true )) {
171
174
log .trace (() -> "Cancelling splitting transformer" );
172
175
handleSubscriptionCancel ();
@@ -207,26 +210,31 @@ private boolean doEmit() {
207
210
/**
208
211
* Handle the {@code .cancel()} signal received from the downstream subscription. Data that is being sent to the upstream
209
212
* transformer need to finish processing before we complete. One typical use case for this is completing the multipart
210
- * download, the subscriber having reached the final part will signal that it doesn't need more parts by calling {@code
211
- * .cancel()} on the subscription.
213
+ * download, the subscriber having reached the final part will signal that it doesn't need more parts by calling
214
+ * {@code .cancel()} on the subscription.
212
215
*/
213
216
private void handleSubscriptionCancel () {
214
217
synchronized (cancelLock ) {
215
218
if (downstreamSubscriber == null ) {
219
+ log .trace (() -> "downstreamSubscriber already null, skipping downstreamSubscriber.onComplete()" );
216
220
return ;
217
221
}
218
222
if (!onStreamCalled .get ()) {
219
223
// we never subscribe publisherToUpstream to the upstream, it would not complete
224
+ log .trace (() -> "publisherToUpstream never subscribed, skipping downstreamSubscriber.onComplete()" );
220
225
downstreamSubscriber = null ;
221
226
return ;
222
227
}
228
+ log .trace (() -> "publisherToUpstream.complete()" );
223
229
publisherToUpstream .complete ().whenComplete ((v , t ) -> {
224
230
if (downstreamSubscriber == null ) {
231
+ log .trace (() -> "[in future] downstreamSubscriber already null, skipping downstreamSubscriber.onComplete()" );
225
232
return ;
226
233
}
227
234
if (t != null ) {
228
235
downstreamSubscriber .onError (t );
229
236
} else {
237
+ log .trace (() -> "[in future] calling downstreamSubscriber.onComplete" );
230
238
downstreamSubscriber .onComplete ();
231
239
}
232
240
downstreamSubscriber = null ;
@@ -236,8 +244,8 @@ private void handleSubscriptionCancel() {
236
244
237
245
/**
238
246
* Handle when the {@link SplittingTransformer#resultFuture} is cancelled or completed exceptionally from the outside. Data
239
- * need to stop being sent to the upstream transformer immediately. One typical use case for this is transfer manager
240
- * needing to pause download by calling {@code .cancel(true)} on the future.
247
+ * need to stop being sent to the upstream transformer immediately. One typical use case for this is transfer manager needing
248
+ * to pause download by calling {@code .cancel(true)} on the future.
241
249
*
242
250
* @param e The exception the future was complete exceptionally with.
243
251
*/
@@ -281,6 +289,7 @@ public CompletableFuture<ResponseT> prepare() {
281
289
});
282
290
individualFuture .whenComplete ((r , e ) -> {
283
291
if (isCancelled .get ()) {
292
+ log .trace (() -> "Individual future completed ." );
284
293
handleSubscriptionCancel ();
285
294
}
286
295
});
@@ -308,8 +317,8 @@ public void onStream(SdkPublisher<ByteBuffer> publisher) {
308
317
DelegatingBufferingSubscriber .builder ()
309
318
.maximumBufferInBytes (maximumBufferInBytes )
310
319
.delegate (upstreamSubscriber )
311
- .build ()
312
- )) ;
320
+ .build ())
321
+ );
313
322
}
314
323
}
315
324
publisher .subscribe (new IndividualPartSubscriber <>(this .individualFuture , response ));
0 commit comments