Skip to content

Commit dc81cda

Browse files
committed
Reset subscriberRef in prepare
Fix a bug in EventStreamAsyncResponseTransformer where the subscriber ref is not reset between calls to prepare(), causing retries to fail when the EventStreamResponseHandler attempts to subscribe to a new stream on a request retry.
1 parent b42e418 commit dc81cda

File tree

3 files changed

+43
-0
lines changed

3 files changed

+43
-0
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"category": "AWS SDK for Java v2",
3+
"type": "bugfix",
4+
"description": "Fix a bug in `EventStreamAsyncResponseTransformer` where the reference to the current stream `Subscriber` is not reset in `prepare`, causing an `IllegalStateException` to be thrown when attemping to subscribe to the event stream upon a retry."
5+
}

core/aws-core/src/main/java/software/amazon/awssdk/awscore/eventstream/EventStreamAsyncResponseTransformer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ private EventStreamAsyncResponseTransformer(
185185
@Override
186186
public CompletableFuture<Void> prepare() {
187187
transformFuture = new CompletableFuture<>();
188+
subscriberRef.set(null);
188189
isDone = false;
189190
return transformFuture;
190191
}

core/aws-core/src/test/java/software/amazon/awssdk/awscore/eventstream/EventStreamAsyncResponseTransformerTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.CompletionException;
2727
import java.util.concurrent.CountDownLatch;
2828
import java.util.concurrent.Executors;
29+
import java.util.concurrent.atomic.AtomicBoolean;
2930
import java.util.concurrent.atomic.AtomicInteger;
3031
import java.util.concurrent.atomic.AtomicLong;
3132
import java.util.function.Consumer;
@@ -182,6 +183,42 @@ public void prepareReturnsNewFuture() {
182183
assertThat(transformer.prepare()).isNotEqualTo(cf1);
183184
}
184185

186+
@Test(timeout = 2000)
187+
public void prepareResetsSubscriberRef() throws InterruptedException {
188+
CountDownLatch latch = new CountDownLatch(2);
189+
AtomicBoolean exceptionThrown = new AtomicBoolean(false);
190+
191+
AsyncResponseTransformer<SdkResponse, Void> transformer =
192+
EventStreamAsyncResponseTransformer.builder()
193+
.eventStreamResponseHandler(
194+
onEventStream(p -> {
195+
try {
196+
p.subscribe(e -> {});
197+
} catch (Throwable t) {
198+
exceptionThrown.set(true);
199+
} finally {
200+
latch.countDown();
201+
}
202+
}))
203+
.eventResponseHandler((r, e) -> null)
204+
.executor(Executors.newFixedThreadPool(2))
205+
.future(new CompletableFuture<>())
206+
.build();
207+
208+
Flowable<ByteBuffer> bytePublisher = Flowable.empty();
209+
210+
CompletableFuture<Void> transformFuture = transformer.prepare();
211+
transformer.onStream(SdkPublisher.adapt(bytePublisher));
212+
transformFuture.join();
213+
214+
transformFuture = transformer.prepare();
215+
transformer.onStream(SdkPublisher.adapt(bytePublisher));
216+
transformFuture.join();
217+
218+
latch.await();
219+
assertThat(exceptionThrown).isFalse();
220+
}
221+
185222
@Test
186223
public void erroneousExtraExceptionOccurredDoesNotSurfaceException() {
187224
AtomicLong numExceptions = new AtomicLong(0);

0 commit comments

Comments
 (0)