Skip to content

Commit 594c580

Browse files
committed
Handle error in apply of writeFunction
Closes gh-23175
1 parent 97bfb75 commit 594c580

File tree

2 files changed

+39
-4
lines changed

2 files changed

+39
-4
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,15 @@ public final void onNext(T item) {
181181
else if (this.state == State.NEW) {
182182
this.item = item;
183183
this.state = State.FIRST_SIGNAL_RECEIVED;
184-
writeFunction.apply(this).subscribe(this.writeCompletionBarrier);
184+
Publisher<Void> result;
185+
try {
186+
result = writeFunction.apply(this);
187+
}
188+
catch (Throwable ex) {
189+
this.writeCompletionBarrier.onError(ex);
190+
return;
191+
}
192+
result.subscribe(this.writeCompletionBarrier);
185193
}
186194
else {
187195
if (this.subscription != null) {
@@ -230,7 +238,15 @@ public final void onComplete() {
230238
else if (this.state == State.NEW) {
231239
this.completed = true;
232240
this.state = State.FIRST_SIGNAL_RECEIVED;
233-
writeFunction.apply(this).subscribe(this.writeCompletionBarrier);
241+
Publisher<Void> result;
242+
try {
243+
result = writeFunction.apply(this);
244+
}
245+
catch (Throwable ex) {
246+
this.writeCompletionBarrier.onError(ex);
247+
return;
248+
}
249+
result.subscribe(this.writeCompletionBarrier);
234250
}
235251
else {
236252
this.completed = true;

spring-web/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public void writeOneItem() throws Exception {
9292

9393

9494
@Test
95-
public void writeMultipleItems() throws Exception {
95+
public void writeMultipleItems() {
9696
List<String> items = Arrays.asList("one", "two", "three");
9797
Mono<Void> completion = Flux.fromIterable(items).as(this::sendOperator);
9898
Signal<Void> signal = completion.materialize().block();
@@ -108,7 +108,7 @@ public void writeMultipleItems() throws Exception {
108108
}
109109

110110
@Test
111-
public void errorAfterMultipleItems() throws Exception {
111+
public void errorAfterMultipleItems() {
112112
IllegalStateException error = new IllegalStateException("boo");
113113
Flux<String> publisher = Flux.generate(() -> 0, (idx , subscriber) -> {
114114
int i = ++idx;
@@ -213,6 +213,25 @@ public void errorFromWriteFunctionWhileItemCached() {
213213
bufferFactory.checkForLeaks();
214214
}
215215

216+
@Test // gh-23175
217+
public void errorInWriteFunction() {
218+
219+
StepVerifier
220+
.create(new ChannelSendOperator<>(Mono.just("one"), p -> {
221+
throw new IllegalStateException("boo");
222+
}))
223+
.expectErrorMessage("boo")
224+
.verify(Duration.ofMillis(5000));
225+
226+
StepVerifier
227+
.create(new ChannelSendOperator<>(Mono.empty(), p -> {
228+
throw new IllegalStateException("boo");
229+
}))
230+
.expectErrorMessage("boo")
231+
.verify(Duration.ofMillis(5000));
232+
}
233+
234+
216235
private <T> Mono<Void> sendOperator(Publisher<String> source){
217236
return new ChannelSendOperator<>(source, writer::send);
218237
}

0 commit comments

Comments
 (0)