Skip to content

Commit a410d90

Browse files
committed
Drain client response body consistenly
Prior to this commit, the `DefaultClientResponse` implementation would handle HTTP client cases where the server response body would need to be consumed (pooled resources need to be released) and the body publisher cancelled right away. This is done to avoid reading the whole response body and returning the connection to the pool, now that this connection cannot be reused. This was done already when the `WebClient` is asked for a `Void` type for the response body. SPR-17054 brought a new case for that: whenever no message reader is able to decode the response body, an `UnsupportedMediaTypeException` error signal is sent. Prior to this commit, the response body would not be consumed and cancelled for that case. This commit refactors all those cases from the `DefaultClientResponse` directly into the `BodyExtractors`, since most of the logic and knowledge for that belongs there. We only need to only apply this behavior when the HTTP client is involved, as the server does not want this to happen. Issue: SPR-17054
1 parent 07653bf commit a410d90

File tree

4 files changed

+156
-170
lines changed

4 files changed

+156
-170
lines changed

spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractors.java

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@
2929
import org.springframework.core.ParameterizedTypeReference;
3030
import org.springframework.core.ResolvableType;
3131
import org.springframework.core.io.buffer.DataBuffer;
32+
import org.springframework.core.io.buffer.DataBufferUtils;
3233
import org.springframework.http.MediaType;
3334
import org.springframework.http.ReactiveHttpInputMessage;
35+
import org.springframework.http.client.reactive.ClientHttpResponse;
3436
import org.springframework.http.codec.HttpMessageReader;
3537
import org.springframework.http.codec.multipart.Part;
3638
import org.springframework.http.server.reactive.ServerHttpRequest;
@@ -42,6 +44,7 @@
4244
* @author Arjen Poutsma
4345
* @author Sebastien Deleuze
4446
* @author Rossen Stoyanchev
47+
* @author Brian Clozel
4548
* @since 5.0
4649
*/
4750
public abstract class BodyExtractors {
@@ -81,8 +84,8 @@ private static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(Resol
8184
return (inputMessage, context) ->
8285
readWithMessageReaders(inputMessage, context, elementType,
8386
(HttpMessageReader<T> reader) -> readToMono(inputMessage, context, elementType, reader),
84-
ex -> Mono.from(unsupportedErrorHandler(inputMessage, ex)),
85-
Mono::empty);
87+
ex -> Mono.from(unsupportedErrorHandler(inputMessage, context, ex)),
88+
skipBodyAsMono(inputMessage, context));
8689
}
8790

8891
/**
@@ -110,8 +113,8 @@ private static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(Resol
110113
return (inputMessage, context) ->
111114
readWithMessageReaders(inputMessage, context, elementType,
112115
(HttpMessageReader<T> reader) -> readToFlux(inputMessage, context, elementType, reader),
113-
ex -> unsupportedErrorHandler(inputMessage, ex),
114-
Flux::empty);
116+
ex -> unsupportedErrorHandler(inputMessage, context, ex),
117+
skipBodyAsFlux(inputMessage, context));
115118
}
116119

117120

@@ -183,7 +186,6 @@ private static <T, S extends Publisher<T>> S readWithMessageReaders(
183186
if (VOID_TYPE.equals(elementType)) {
184187
return emptySupplier.get();
185188
}
186-
187189
MediaType contentType = Optional.ofNullable(message.getHeaders().getContentType())
188190
.orElse(MediaType.APPLICATION_OCTET_STREAM);
189191

@@ -195,6 +197,28 @@ private static <T, S extends Publisher<T>> S readWithMessageReaders(
195197
.orElseGet(() -> errorFunction.apply(unsupportedError(context, elementType, contentType)));
196198
}
197199

200+
private static <T> Supplier<Flux<T>> skipBodyAsFlux(ReactiveHttpInputMessage message,
201+
BodyExtractor.Context context) {
202+
203+
if (isExtractingForClient(message, context)) {
204+
return () -> consumeAndCancel(message).thenMany(Flux.empty());
205+
}
206+
else {
207+
return Flux::empty;
208+
}
209+
}
210+
211+
private static <T> Supplier<Mono<T>> skipBodyAsMono(ReactiveHttpInputMessage message,
212+
BodyExtractor.Context context) {
213+
214+
if (isExtractingForClient(message, context)) {
215+
return () -> consumeAndCancel(message).then(Mono.empty());
216+
}
217+
else {
218+
return Mono::empty;
219+
}
220+
}
221+
198222
private static UnsupportedMediaTypeException unsupportedError(BodyExtractor.Context context,
199223
ResolvableType elementType, MediaType contentType) {
200224

@@ -222,17 +246,21 @@ private static <T> Flux<T> readToFlux(ReactiveHttpInputMessage message, BodyExtr
222246
}
223247

224248
private static <T> Flux<T> unsupportedErrorHandler(
225-
ReactiveHttpInputMessage inputMessage, UnsupportedMediaTypeException ex) {
249+
ReactiveHttpInputMessage inputMessage, BodyExtractor.Context context,
250+
UnsupportedMediaTypeException ex) {
226251

252+
Flux<T> result;
227253
if (inputMessage.getHeaders().getContentType() == null) {
228254
// Empty body with no content type is ok
229-
return inputMessage.getBody().map(o -> {
255+
result = inputMessage.getBody().map(o -> {
230256
throw ex;
231257
});
232258
}
233259
else {
234-
return Flux.error(ex);
260+
result = Flux.error(ex);
235261
}
262+
return isExtractingForClient(inputMessage, context) ?
263+
consumeAndCancel(inputMessage).thenMany(result) : result;
236264
}
237265

238266
private static <T> HttpMessageReader<T> findReader(
@@ -251,4 +279,23 @@ private static <T> HttpMessageReader<T> cast(HttpMessageReader<?> reader) {
251279
return (HttpMessageReader<T>) reader;
252280
}
253281

282+
private static boolean isExtractingForClient(ReactiveHttpInputMessage message,
283+
BodyExtractor.Context context) {
284+
return !context.serverResponse().isPresent()
285+
&& message instanceof ClientHttpResponse;
286+
}
287+
288+
private static Mono<Void> consumeAndCancel(ReactiveHttpInputMessage message) {
289+
return message.getBody()
290+
.map(buffer -> {
291+
DataBufferUtils.release(buffer);
292+
throw new ReadCancellationException();
293+
})
294+
.onErrorResume(ReadCancellationException.class, ex -> Mono.empty())
295+
.then();
296+
}
297+
298+
@SuppressWarnings("serial")
299+
private static class ReadCancellationException extends RuntimeException {
300+
}
254301
}

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java

Lines changed: 6 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727

2828
import org.springframework.core.ParameterizedTypeReference;
2929
import org.springframework.core.codec.Hints;
30-
import org.springframework.core.io.buffer.DataBufferUtils;
3130
import org.springframework.http.HttpHeaders;
3231
import org.springframework.http.HttpStatus;
3332
import org.springframework.http.MediaType;
@@ -111,73 +110,32 @@ public Map<String, Object> hints() {
111110

112111
@Override
113112
public <T> Mono<T> bodyToMono(Class<? extends T> elementClass) {
114-
if (Void.class.isAssignableFrom(elementClass)) {
115-
return consumeAndCancel();
116-
}
117-
else {
118-
return body(BodyExtractors.toMono(elementClass));
119-
}
120-
}
121-
122-
@SuppressWarnings("unchecked")
123-
private <T> Mono<T> consumeAndCancel() {
124-
return (Mono<T>) this.response.getBody()
125-
.map(buffer -> {
126-
DataBufferUtils.release(buffer);
127-
throw new ReadCancellationException();
128-
})
129-
.onErrorResume(ReadCancellationException.class, ex -> Mono.empty())
130-
.then();
113+
return body(BodyExtractors.toMono(elementClass));
131114
}
132115

133116
@Override
134117
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> typeReference) {
135-
if (Void.class.isAssignableFrom(typeReference.getType().getClass())) {
136-
return consumeAndCancel();
137-
}
138-
else {
139-
return body(BodyExtractors.toMono(typeReference));
140-
}
118+
return body(BodyExtractors.toMono(typeReference));
141119
}
142120

143121
@Override
144122
public <T> Flux<T> bodyToFlux(Class<? extends T> elementClass) {
145-
if (Void.class.isAssignableFrom(elementClass)) {
146-
return Flux.from(consumeAndCancel());
147-
}
148-
else {
149-
return body(BodyExtractors.toFlux(elementClass));
150-
}
123+
return body(BodyExtractors.toFlux(elementClass));
151124
}
152125

153126
@Override
154127
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> typeReference) {
155-
if (Void.class.isAssignableFrom(typeReference.getType().getClass())) {
156-
return Flux.from(consumeAndCancel());
157-
}
158-
else {
159-
return body(BodyExtractors.toFlux(typeReference));
160-
}
128+
return body(BodyExtractors.toFlux(typeReference));
161129
}
162130

163131
@Override
164132
public <T> Mono<ResponseEntity<T>> toEntity(Class<T> bodyType) {
165-
if (Void.class.isAssignableFrom(bodyType)) {
166-
return toEntityInternal(consumeAndCancel());
167-
}
168-
else {
169-
return toEntityInternal(bodyToMono(bodyType));
170-
}
133+
return toEntityInternal(bodyToMono(bodyType));
171134
}
172135

173136
@Override
174137
public <T> Mono<ResponseEntity<T>> toEntity(ParameterizedTypeReference<T> typeReference) {
175-
if (Void.class.isAssignableFrom(typeReference.getType().getClass())) {
176-
return toEntityInternal(consumeAndCancel());
177-
}
178-
else {
179-
return toEntityInternal(bodyToMono(typeReference));
180-
}
138+
return toEntityInternal(bodyToMono(typeReference));
181139
}
182140

183141
private <T> Mono<ResponseEntity<T>> toEntityInternal(Mono<T> bodyMono) {
@@ -254,9 +212,4 @@ private OptionalLong toOptionalLong(long value) {
254212
}
255213
}
256214

257-
258-
@SuppressWarnings("serial")
259-
private static class ReadCancellationException extends RuntimeException {
260-
}
261-
262215
}

spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyExtractorsTests.java

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,25 @@
2727
import java.util.Optional;
2828

2929
import com.fasterxml.jackson.annotation.JsonView;
30+
import io.netty.buffer.PooledByteBufAllocator;
31+
import io.netty.util.IllegalReferenceCountException;
32+
import org.junit.Assert;
3033
import org.junit.Before;
3134
import org.junit.Test;
3235
import reactor.core.publisher.Flux;
3336
import reactor.core.publisher.Mono;
3437
import reactor.test.StepVerifier;
38+
import reactor.test.publisher.TestPublisher;
3539

3640
import org.springframework.core.ParameterizedTypeReference;
3741
import org.springframework.core.codec.ByteBufferDecoder;
3842
import org.springframework.core.codec.StringDecoder;
3943
import org.springframework.core.io.buffer.DataBuffer;
4044
import org.springframework.core.io.buffer.DefaultDataBuffer;
4145
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
46+
import org.springframework.core.io.buffer.NettyDataBuffer;
47+
import org.springframework.core.io.buffer.NettyDataBufferFactory;
48+
import org.springframework.http.HttpStatus;
4249
import org.springframework.http.MediaType;
4350
import org.springframework.http.ReactiveHttpInputMessage;
4451
import org.springframework.http.codec.DecoderHttpMessageReader;
@@ -53,22 +60,27 @@
5360
import org.springframework.http.codec.xml.Jaxb2XmlDecoder;
5461
import org.springframework.http.server.reactive.ServerHttpRequest;
5562
import org.springframework.http.server.reactive.ServerHttpResponse;
63+
import org.springframework.mock.http.client.reactive.test.MockClientHttpResponse;
5664
import org.springframework.mock.http.server.reactive.test.MockServerHttpRequest;
5765
import org.springframework.util.MultiValueMap;
5866

5967
import static org.junit.Assert.*;
68+
import static org.mockito.Mockito.when;
6069
import static org.springframework.http.codec.json.Jackson2CodecSupport.*;
6170

6271
/**
6372
* @author Arjen Poutsma
6473
* @author Sebastien Deleuze
74+
* @author Brian Clozel
6575
*/
6676
public class BodyExtractorsTests {
6777

6878
private BodyExtractor.Context context;
6979

7080
private Map<String, Object> hints;
7181

82+
private Optional<ServerHttpResponse> serverResponse = Optional.empty();
83+
7284

7385
@Before
7486
public void createContext() {
@@ -92,7 +104,7 @@ public List<HttpMessageReader<?>> messageReaders() {
92104

93105
@Override
94106
public Optional<ServerHttpResponse> serverResponse() {
95-
return Optional.empty();
107+
return serverResponse;
96108
}
97109

98110
@Override
@@ -180,6 +192,43 @@ public void toMonoWithEmptyBodyAndNoContentType() {
180192
StepVerifier.create(result).expectComplete().verify();
181193
}
182194

195+
@Test
196+
public void toMonoVoidAsClientShouldConsumeAndCancel() {
197+
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
198+
DefaultDataBuffer dataBuffer =
199+
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
200+
TestPublisher<DataBuffer> body = TestPublisher.create();
201+
202+
BodyExtractor<Mono<Void>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(Void.class);
203+
MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
204+
response.setBody(body.flux());
205+
206+
StepVerifier.create(extractor.extract(response, this.context))
207+
.then(() -> {
208+
body.assertWasSubscribed();
209+
body.emit(dataBuffer);
210+
})
211+
.verifyComplete();
212+
213+
body.assertCancelled();
214+
}
215+
216+
@Test
217+
public void toMonoVoidAsClientWithEmptyBody() {
218+
TestPublisher<DataBuffer> body = TestPublisher.create();
219+
220+
BodyExtractor<Mono<Void>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(Void.class);
221+
MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
222+
response.setBody(body.flux());
223+
224+
StepVerifier.create(extractor.extract(response, this.context))
225+
.then(() -> {
226+
body.assertWasSubscribed();
227+
body.complete();
228+
})
229+
.verifyComplete();
230+
}
231+
183232
@Test
184233
public void toFlux() {
185234
BodyExtractor<Flux<String>, ReactiveHttpInputMessage> extractor = BodyExtractors.toFlux(String.class);
@@ -366,6 +415,34 @@ public void toDataBuffers() {
366415
.verify();
367416
}
368417

418+
@Test // SPR-17054
419+
public void unsupportedMediaTypeShouldConsumeAndCancel() {
420+
NettyDataBufferFactory factory = new NettyDataBufferFactory(new PooledByteBufAllocator(true));
421+
NettyDataBuffer buffer = factory.wrap(ByteBuffer.wrap("spring".getBytes(StandardCharsets.UTF_8)));
422+
TestPublisher<DataBuffer> body = TestPublisher.create();
423+
424+
MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
425+
response.getHeaders().setContentType(MediaType.APPLICATION_PDF);
426+
response.setBody(body.flux());
427+
428+
BodyExtractor<Mono<User>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(User.class);
429+
StepVerifier.create(extractor.extract(response, this.context))
430+
.then(() -> {
431+
body.assertWasSubscribed();
432+
body.emit(buffer);
433+
})
434+
.expectErrorSatisfies(throwable -> {
435+
assertTrue(throwable instanceof UnsupportedMediaTypeException);
436+
try {
437+
buffer.release();
438+
Assert.fail("releasing the buffer should have failed");
439+
} catch (IllegalReferenceCountException exc) {
440+
441+
}
442+
body.assertCancelled();
443+
}).verify();
444+
}
445+
369446

370447
interface SafeToDeserialize {}
371448

0 commit comments

Comments
 (0)