Skip to content

Commit 4197037

Browse files
authored
Fixed an issue in JSON-RPC event stream services (none exist yet) where retries excluded the initial-request event. (#2673)
Before this change, the AsyncStreamPrepender (a Publisher) does not support multiple subscribe calls, but each attempt makes a new subscribe call. Because of this, if a retry is encountered, backpressure is miscalculated and the initial-request is not sent for the retries.
1 parent eabf797 commit 4197037

File tree

2 files changed

+24
-6
lines changed

2 files changed

+24
-6
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/AsyncStreamPrepender.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,6 @@
2525
public class AsyncStreamPrepender<T> implements Publisher<T> {
2626
private final Publisher<T> delegate;
2727
private final T firstItem;
28-
private Subscriber<? super T> subscriber;
29-
private volatile boolean complete = false;
30-
private volatile boolean firstRequest = true;
3128

3229
public AsyncStreamPrepender(Publisher<T> delegate, T firstItem) {
3330
this.delegate = delegate;
@@ -36,11 +33,18 @@ public AsyncStreamPrepender(Publisher<T> delegate, T firstItem) {
3633

3734
@Override
3835
public void subscribe(Subscriber<? super T> s) {
39-
subscriber = s;
40-
delegate.subscribe(new DelegateSubscriber());
36+
delegate.subscribe(new DelegateSubscriber(s));
4137
}
4238

4339
private class DelegateSubscriber implements Subscriber<T> {
40+
private final Subscriber<? super T> subscriber;
41+
private volatile boolean complete = false;
42+
private volatile boolean firstRequest = true;
43+
44+
private DelegateSubscriber(Subscriber<? super T> subscriber) {
45+
this.subscriber = subscriber;
46+
}
47+
4448
@Override
4549
public void onSubscribe(Subscription subscription) {
4650
subscriber.onSubscribe(new Subscription() {
@@ -90,7 +94,6 @@ public void request(long n) {
9094
public void cancel() {
9195
cancelled = true;
9296
subscription.cancel();
93-
subscriber = null;
9497
}
9598
});
9699
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/AsyncStreamPrependerTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,21 @@ public void sequence() {
6262
}
6363
}
6464

65+
@Test
66+
public void multiSubscribe_stillPrepends() {
67+
AsyncStreamPrepender<Long> prepender = new AsyncStreamPrepender<>(rangeLong(1L, 5L), 0L);
68+
Flowable<Long> prepended1 = fromPublisher(prepender);
69+
Flowable<Long> prepended2 = fromPublisher(prepender);
70+
71+
Iterator<Long> iterator1 = prepended1.blockingIterable(1).iterator();
72+
Iterator<Long> iterator2 = prepended2.blockingIterable(1).iterator();
73+
74+
for (long i = 0; i <= 5; i++) {
75+
assertEquals(i, iterator1.next().longValue());
76+
assertEquals(i, iterator2.next().longValue());
77+
}
78+
}
79+
6580
@Test
6681
public void error() {
6782
Flowable<Long> error = Flowable.error(IllegalStateException::new);

0 commit comments

Comments
 (0)