Skip to content

Commit b6d2a29

Browse files
committed
Update for Reactor API changes
1 parent b0d273e commit b6d2a29

File tree

8 files changed

+58
-57
lines changed

8 files changed

+58
-57
lines changed

spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,8 @@ void ignoreInput() {
148148
}
149149

150150
@Test // gh-24741
151-
@Disabled // pending https://github.com/rsocket/rsocket-java/pull/777
151+
@Disabled
152+
// pending https://github.com/rsocket/rsocket-java/pull/777
152153
void noSuchRouteOnChannelInteraction() {
153154
Flux<String> input = Flux.just("foo", "bar", "baz");
154155
Flux<String> result = requester.route("no-such-route").data(input).retrieveFlux(String.class);
@@ -245,7 +246,7 @@ private static class PayloadInterceptor implements RSocket, RSocketInterceptor {
245246
void checkForLeaks() {
246247
this.rsockets.stream().map(PayloadSavingDecorator::getPayloads)
247248
.forEach(payloadInfoProcessor -> {
248-
payloadInfoProcessor.complete();
249+
payloadInfoProcessor.emitComplete();
249250
payloadInfoProcessor.asFlux()
250251
.doOnNext(this::checkForLeak)
251252
.blockLast();
@@ -290,18 +291,18 @@ private static class PayloadSavingDecorator implements RSocket {
290291

291292
private final RSocket delegate;
292293

293-
private Sinks.StandaloneFluxSink<PayloadLeakInfo> payloads = Sinks.replayAll();
294+
private Sinks.Many<PayloadLeakInfo> payloads = Sinks.many().replay().all();
294295

295296
PayloadSavingDecorator(RSocket delegate) {
296297
this.delegate = delegate;
297298
}
298299

299-
Sinks.StandaloneFluxSink<PayloadLeakInfo> getPayloads() {
300+
Sinks.Many<PayloadLeakInfo> getPayloads() {
300301
return this.payloads;
301302
}
302303

303304
void reset() {
304-
this.payloads = Sinks.replayAll();
305+
this.payloads = Sinks.many().replay().all();
305306
}
306307

307308
@Override
@@ -327,7 +328,7 @@ public Flux<io.rsocket.Payload> requestChannel(Publisher<io.rsocket.Payload> pay
327328
}
328329

329330
private io.rsocket.Payload addPayload(io.rsocket.Payload payload) {
330-
this.payloads.next(new PayloadLeakInfo(payload));
331+
this.payloads.emitNext(new PayloadLeakInfo(payload));
331332
return payload;
332333
}
333334

spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,14 +224,14 @@ public void noMatchingRoute() {
224224
@Controller
225225
static class ServerController {
226226

227-
final Sinks.StandaloneFluxSink<String> fireForgetPayloads = Sinks.replayAll();
227+
final Sinks.Many<String> fireForgetPayloads = Sinks.many().replay().all();
228228

229-
final Sinks.StandaloneFluxSink<String> metadataPushPayloads = Sinks.replayAll();
229+
final Sinks.Many<String> metadataPushPayloads = Sinks.many().replay().all();
230230

231231

232232
@MessageMapping("receive")
233233
void receive(String payload) {
234-
this.fireForgetPayloads.next(payload);
234+
this.fireForgetPayloads.emitNext(payload);
235235
}
236236

237237
@MessageMapping("echo")
@@ -273,7 +273,7 @@ Mono<Void> voidReturnValue(String payload) {
273273

274274
@ConnectMapping("foo-updates")
275275
public void handleMetadata(@Header("foo") String foo) {
276-
this.metadataPushPayloads.next(foo);
276+
this.metadataPushPayloads.emitNext(foo);
277277
}
278278

279279
@MessageExceptionHandler

spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,11 +217,11 @@ void handleFireAndForget() {
217217

218218
private static class ClientHandler {
219219

220-
final Sinks.StandaloneFluxSink<String> fireForgetPayloads = Sinks.replayAll();
220+
final Sinks.Many<String> fireForgetPayloads = Sinks.many().replay().all();
221221

222222
@MessageMapping("receive")
223223
void receive(String payload) {
224-
this.fireForgetPayloads.next(payload);
224+
this.fireForgetPayloads.emitNext(payload);
225225
}
226226

227227
@MessageMapping("echo")

spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ public void fluxNotHandled() {
371371
this.messageHandler.handleMessage(message);
372372

373373
assertThat(controller.fluxSink).isNotNull();
374-
controller.fluxSink.next("foo");
374+
controller.fluxSink.emitNext("foo");
375375

376376
verify(this.converter, never()).toMessage(any(), any(MessageHeaders.class));
377377
}
@@ -587,7 +587,7 @@ private static class ReactiveController {
587587

588588
private MonoProcessor<String> monoProcessor;
589589

590-
private Sinks.StandaloneFluxSink<String> fluxSink;
590+
private Sinks.Many<String> fluxSink;
591591

592592
private boolean exceptionCaught = false;
593593

@@ -599,7 +599,7 @@ public Mono<String> handleMono() {
599599

600600
@MessageMapping("flux")
601601
public Flux<String> handleFlux() {
602-
this.fluxSink = Sinks.unicast();
602+
this.fluxSink = Sinks.many().unicast().onBackpressureBuffer();
603603
return this.fluxSink.asFlux();
604604
}
605605

spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketClientToServerCoroutinesIntegrationTests.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,17 +145,17 @@ class RSocketClientToServerCoroutinesIntegrationTests {
145145
@Controller
146146
class ServerController {
147147

148-
val fireForgetPayloads = Sinks.replayAll<String>()
148+
val fireForgetPayloads = Sinks.many().replay().all<String>()
149149

150150
@MessageMapping("receive")
151151
fun receive(payload: String) {
152-
fireForgetPayloads.next(payload)
152+
fireForgetPayloads.emitNext(payload)
153153
}
154154

155155
@MessageMapping("receive-async")
156156
suspend fun receiveAsync(payload: String) {
157157
delay(10)
158-
fireForgetPayloads.next(payload)
158+
fireForgetPayloads.emitNext(payload)
159159
}
160160

161161
@MessageMapping("echo-async")

spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,9 +207,9 @@ private String decodeToString(Part part) {
207207

208208
@Test // SPR-16402
209209
public void singleSubscriberWithResource() throws IOException {
210-
Sinks.StandaloneFluxSink<Resource> sink = Sinks.unicast();
210+
Sinks.Many<Resource> sink = Sinks.many().unicast().onBackpressureBuffer();
211211
Resource logo = new ClassPathResource("/org/springframework/http/converter/logo.jpg");
212-
sink.next(logo);
212+
sink.emitNext(logo);
213213

214214
MultipartBodyBuilder bodyBuilder = new MultipartBodyBuilder();
215215
bodyBuilder.asyncPart("logo", sink.asFlux(), Resource.class);

spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,11 @@ public void deferredResultSubscriberWithMultipleValues() throws Exception {
138138
Bar bar1 = new Bar("foo");
139139
Bar bar2 = new Bar("bar");
140140

141-
Sinks.StandaloneFluxSink<Bar> sink = Sinks.unicast();
141+
Sinks.Many<Bar> sink = Sinks.many().unicast().onBackpressureBuffer();
142142
testDeferredResultSubscriber(sink.asFlux(), Flux.class, forClass(Bar.class), () -> {
143-
sink.next(bar1);
144-
sink.next(bar2);
145-
sink.complete();
143+
sink.emitNext(bar1);
144+
sink.emitNext(bar2);
145+
sink.emitComplete();
146146
}, Arrays.asList(bar1, bar2));
147147
}
148148

@@ -189,16 +189,16 @@ private void testSseResponse(boolean expectSseEmitter) throws Exception {
189189
public void writeServerSentEvents() throws Exception {
190190

191191
this.servletRequest.addHeader("Accept", "text/event-stream");
192-
Sinks.StandaloneFluxSink<String> sink = Sinks.unicast();
192+
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
193193
SseEmitter sseEmitter = (SseEmitter) handleValue(sink.asFlux(), Flux.class, forClass(String.class));
194194

195195
EmitterHandler emitterHandler = new EmitterHandler();
196196
sseEmitter.initialize(emitterHandler);
197197

198-
sink.next("foo");
199-
sink.next("bar");
200-
sink.next("baz");
201-
sink.complete();
198+
sink.emitNext("foo");
199+
sink.emitNext("bar");
200+
sink.emitNext("baz");
201+
sink.emitComplete();
202202

203203
assertThat(emitterHandler.getValuesAsText()).isEqualTo("data:foo\n\ndata:bar\n\ndata:baz\n\n");
204204
}
@@ -208,16 +208,16 @@ public void writeServerSentEventsWithBuilder() throws Exception {
208208

209209
ResolvableType type = ResolvableType.forClassWithGenerics(ServerSentEvent.class, String.class);
210210

211-
Sinks.StandaloneFluxSink<ServerSentEvent<?>> sink = Sinks.unicast();
211+
Sinks.Many<ServerSentEvent<?>> sink = Sinks.many().unicast().onBackpressureBuffer();
212212
SseEmitter sseEmitter = (SseEmitter) handleValue(sink.asFlux(), Flux.class, type);
213213

214214
EmitterHandler emitterHandler = new EmitterHandler();
215215
sseEmitter.initialize(emitterHandler);
216216

217-
sink.next(ServerSentEvent.builder("foo").id("1").build());
218-
sink.next(ServerSentEvent.builder("bar").id("2").build());
219-
sink.next(ServerSentEvent.builder("baz").id("3").build());
220-
sink.complete();
217+
sink.emitNext(ServerSentEvent.builder("foo").id("1").build());
218+
sink.emitNext(ServerSentEvent.builder("bar").id("2").build());
219+
sink.emitNext(ServerSentEvent.builder("baz").id("3").build());
220+
sink.emitComplete();
221221

222222
assertThat(emitterHandler.getValuesAsText()).isEqualTo("id:1\ndata:foo\n\nid:2\ndata:bar\n\nid:3\ndata:baz\n\n");
223223
}
@@ -227,7 +227,7 @@ public void writeStreamJson() throws Exception {
227227

228228
this.servletRequest.addHeader("Accept", "application/x-ndjson");
229229

230-
Sinks.StandaloneFluxSink<Bar> sink = Sinks.unicast();
230+
Sinks.Many<Bar> sink = Sinks.many().unicast().onBackpressureBuffer();
231231
ResponseBodyEmitter emitter = handleValue(sink.asFlux(), Flux.class, forClass(Bar.class));
232232

233233
EmitterHandler emitterHandler = new EmitterHandler();
@@ -239,9 +239,9 @@ public void writeStreamJson() throws Exception {
239239
Bar bar1 = new Bar("foo");
240240
Bar bar2 = new Bar("bar");
241241

242-
sink.next(bar1);
243-
sink.next(bar2);
244-
sink.complete();
242+
sink.emitNext(bar1);
243+
sink.emitNext(bar2);
244+
sink.emitComplete();
245245

246246
assertThat(message.getHeaders().getContentType().toString()).isEqualTo("application/x-ndjson");
247247
assertThat(emitterHandler.getValues()).isEqualTo(Arrays.asList(bar1, "\n", bar2, "\n"));
@@ -250,16 +250,16 @@ public void writeStreamJson() throws Exception {
250250
@Test
251251
public void writeText() throws Exception {
252252

253-
Sinks.StandaloneFluxSink<String> sink = Sinks.unicast();
253+
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
254254
ResponseBodyEmitter emitter = handleValue(sink.asFlux(), Flux.class, forClass(String.class));
255255

256256
EmitterHandler emitterHandler = new EmitterHandler();
257257
emitter.initialize(emitterHandler);
258258

259-
sink.next("The quick");
260-
sink.next(" brown fox jumps over ");
261-
sink.next("the lazy dog");
262-
sink.complete();
259+
sink.emitNext("The quick");
260+
sink.emitNext(" brown fox jumps over ");
261+
sink.emitNext("the lazy dog");
262+
sink.emitComplete();
263263

264264
assertThat(emitterHandler.getValuesAsText()).isEqualTo("The quick brown fox jumps over the lazy dog");
265265
}

spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandlerTests.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -227,16 +227,16 @@ public void responseBodyFlux() throws Exception {
227227
this.request.addHeader("Accept", "text/event-stream");
228228

229229
MethodParameter type = on(TestController.class).resolveReturnType(Flux.class, String.class);
230-
Sinks.StandaloneFluxSink<String> sink = Sinks.unicast();
230+
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
231231
this.handler.handleReturnValue(sink.asFlux(), type, this.mavContainer, this.webRequest);
232232

233233
assertThat(this.request.isAsyncStarted()).isTrue();
234234
assertThat(this.response.getStatus()).isEqualTo(200);
235235

236-
sink.next("foo");
237-
sink.next("bar");
238-
sink.next("baz");
239-
sink.complete();
236+
sink.emitNext("foo");
237+
sink.emitNext("bar");
238+
sink.emitNext("baz");
239+
sink.emitComplete();
240240

241241
assertThat(this.response.getContentType()).isEqualTo("text/event-stream");
242242
assertThat(this.response.getContentAsString()).isEqualTo("data:foo\n\ndata:bar\n\ndata:baz\n\n");
@@ -248,14 +248,14 @@ public void responseBodyFluxWithError() throws Exception {
248248
this.request.addHeader("Accept", "text/event-stream");
249249

250250
MethodParameter type = on(TestController.class).resolveReturnType(Flux.class, String.class);
251-
Sinks.StandaloneFluxSink<String> sink = Sinks.unicast();
251+
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
252252
this.handler.handleReturnValue(sink.asFlux(), type, this.mavContainer, this.webRequest);
253253

254254
assertThat(this.request.isAsyncStarted()).isTrue();
255255

256256
IllegalStateException ex = new IllegalStateException("wah wah");
257-
sink.error(ex);
258-
sink.complete();
257+
sink.emitError(ex);
258+
sink.emitComplete();
259259

260260
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(this.webRequest);
261261
assertThat(asyncManager.getConcurrentResult()).isSameAs(ex);
@@ -290,7 +290,7 @@ public void responseEntitySseNoContent() throws Exception {
290290
@Test
291291
public void responseEntityFlux() throws Exception {
292292

293-
Sinks.StandaloneFluxSink<String> sink = Sinks.unicast();
293+
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
294294
ResponseEntity<Flux<String>> entity = ResponseEntity.ok().body(sink.asFlux());
295295
ResolvableType bodyType = forClassWithGenerics(Flux.class, String.class);
296296
MethodParameter type = on(TestController.class).resolveReturnType(ResponseEntity.class, bodyType);
@@ -299,10 +299,10 @@ public void responseEntityFlux() throws Exception {
299299
assertThat(this.request.isAsyncStarted()).isTrue();
300300
assertThat(this.response.getStatus()).isEqualTo(200);
301301

302-
sink.next("foo");
303-
sink.next("bar");
304-
sink.next("baz");
305-
sink.complete();
302+
sink.emitNext("foo");
303+
sink.emitNext("bar");
304+
sink.emitNext("baz");
305+
sink.emitComplete();
306306

307307
assertThat(this.response.getContentType()).isEqualTo("text/plain");
308308
assertThat(this.response.getContentAsString()).isEqualTo("foobarbaz");
@@ -311,7 +311,7 @@ public void responseEntityFlux() throws Exception {
311311
@Test // SPR-17076
312312
public void responseEntityFluxWithCustomHeader() throws Exception {
313313

314-
Sinks.StandaloneFluxSink<SimpleBean> sink = Sinks.unicast();
314+
Sinks.Many<SimpleBean> sink = Sinks.many().unicast().onBackpressureBuffer();
315315
ResponseEntity<Flux<SimpleBean>> entity = ResponseEntity.ok().header("x-foo", "bar").body(sink.asFlux());
316316
ResolvableType bodyType = forClassWithGenerics(Flux.class, SimpleBean.class);
317317
MethodParameter type = on(TestController.class).resolveReturnType(ResponseEntity.class, bodyType);

0 commit comments

Comments
 (0)