Skip to content

Commit 2f95859

Browse files
committed
Upgrade to RSocket 0.12.1-RC3-SNAPSHOT
Closes gh-22629
1 parent 3e4f890 commit 2f95859

File tree

6 files changed

+15
-17
lines changed

6 files changed

+15
-17
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ configure(allprojects) { project ->
150150
repositories {
151151
maven { url "https://repo.spring.io/libs-release" }
152152
maven { url "https://repo.spring.io/snapshot" } // Reactor
153+
maven { url "https://oss.jfrog.org/artifactory/libs-snapshot" } // RSocket
153154
mavenLocal()
154155
}
155156

spring-messaging/spring-messaging.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ dependencyManagement {
77
}
88
}
99

10-
def rsocketVersion = "0.11.17"
10+
def rsocketVersion = "0.12.1-RC3-SNAPSHOT"
1111

1212
dependencies {
1313
compile(project(":spring-beans"))

spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.springframework.messaging.rsocket;
1818

1919
import io.netty.buffer.ByteBuf;
20-
import io.rsocket.Frame;
2120
import io.rsocket.Payload;
2221
import io.rsocket.util.ByteBufPayload;
2322
import io.rsocket.util.DefaultPayload;
@@ -27,7 +26,6 @@
2726
import org.springframework.core.io.buffer.DefaultDataBuffer;
2827
import org.springframework.core.io.buffer.NettyDataBuffer;
2928
import org.springframework.core.io.buffer.NettyDataBufferFactory;
30-
import org.springframework.util.Assert;
3129

3230
/**
3331
* Static utility methods to create {@link Payload} from {@link DataBuffer}s
@@ -53,11 +51,9 @@ public static DataBuffer retainDataAndReleasePayload(Payload payload, DataBuffer
5351
ByteBuf byteBuf = payload.sliceData().retain();
5452
return ((NettyDataBufferFactory) bufferFactory).wrap(byteBuf);
5553
}
56-
57-
Assert.isTrue(!(payload instanceof ByteBufPayload) && !(payload instanceof Frame),
58-
"NettyDataBufferFactory expected, actual: " + bufferFactory.getClass().getSimpleName());
59-
60-
return bufferFactory.wrap(payload.getData());
54+
else {
55+
return bufferFactory.wrap(payload.getData());
56+
}
6157
}
6258
finally {
6359
if (payload.refCnt() > 0) {

spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
import io.netty.buffer.Unpooled;
2929
import io.netty.util.ReferenceCounted;
3030
import io.rsocket.AbstractRSocket;
31-
import io.rsocket.Frame;
3231
import io.rsocket.RSocket;
3332
import io.rsocket.RSocketFactory;
33+
import io.rsocket.frame.decoder.PayloadDecoder;
3434
import io.rsocket.plugins.RSocketInterceptor;
3535
import io.rsocket.transport.netty.client.TcpClientTransport;
3636
import io.rsocket.transport.netty.server.CloseableChannel;
@@ -89,15 +89,15 @@ public static void setupOnce() {
8989
context = new AnnotationConfigApplicationContext(ServerConfig.class);
9090

9191
server = RSocketFactory.receive()
92-
.frameDecoder(Frame::retain) // zero copy
92+
.frameDecoder(PayloadDecoder.ZERO_COPY)
9393
.addServerPlugin(payloadInterceptor) // intercept responding
9494
.acceptor(context.getBean(MessageHandlerAcceptor.class))
9595
.transport(TcpServerTransport.create("localhost", 7000))
9696
.start()
9797
.block();
9898

9999
client = RSocketFactory.connect()
100-
.frameDecoder(Frame::retain) // zero copy
100+
.frameDecoder(PayloadDecoder.ZERO_COPY)
101101
.addClientPlugin(payloadInterceptor) // intercept outgoing requests
102102
.dataMimeType(MimeTypeUtils.TEXT_PLAIN_VALUE)
103103
.transport(TcpClientTransport.create("localhost", 7000))

spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
import java.time.Duration;
2020

2121
import io.netty.buffer.PooledByteBufAllocator;
22-
import io.rsocket.Frame;
2322
import io.rsocket.RSocket;
2423
import io.rsocket.RSocketFactory;
24+
import io.rsocket.frame.decoder.PayloadDecoder;
2525
import io.rsocket.transport.netty.client.TcpClientTransport;
2626
import io.rsocket.transport.netty.server.CloseableChannel;
2727
import io.rsocket.transport.netty.server.TcpServerTransport;
@@ -71,15 +71,15 @@ public static void setupOnce() {
7171

7272
server = RSocketFactory.receive()
7373
.addServerPlugin(interceptor)
74-
.frameDecoder(Frame::retain) // as per https://github.com/rsocket/rsocket-java#zero-copy
74+
.frameDecoder(PayloadDecoder.ZERO_COPY)
7575
.acceptor(context.getBean(MessageHandlerAcceptor.class))
7676
.transport(TcpServerTransport.create("localhost", 7000))
7777
.start()
7878
.block();
7979

8080
client = RSocketFactory.connect()
8181
.dataMimeType(MimeTypeUtils.TEXT_PLAIN_VALUE)
82-
.frameDecoder(Frame::retain) // as per https://github.com/rsocket/rsocket-java#zero-copy
82+
.frameDecoder(PayloadDecoder.ZERO_COPY)
8383
.transport(TcpClientTransport.create("localhost", 7000))
8484
.start()
8585
.block();
@@ -105,6 +105,7 @@ public void fireAndForget() {
105105
.expectNext("Hello 1")
106106
.expectNext("Hello 2")
107107
.expectNext("Hello 3")
108+
.thenAwait(Duration.ofMillis(50))
108109
.thenCancel()
109110
.verify(Duration.ofSeconds(5));
110111

spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121

2222
import io.netty.buffer.PooledByteBufAllocator;
2323
import io.rsocket.Closeable;
24-
import io.rsocket.Frame;
2524
import io.rsocket.RSocket;
2625
import io.rsocket.RSocketFactory;
26+
import io.rsocket.frame.decoder.PayloadDecoder;
2727
import io.rsocket.transport.netty.client.TcpClientTransport;
2828
import io.rsocket.transport.netty.server.TcpServerTransport;
2929
import io.rsocket.util.DefaultPayload;
@@ -64,7 +64,7 @@ public static void setupOnce() {
6464
context = new AnnotationConfigApplicationContext(RSocketConfig.class);
6565

6666
server = RSocketFactory.receive()
67-
.frameDecoder(Frame::retain) // as per https://github.com/rsocket/rsocket-java#zero-copy
67+
.frameDecoder(PayloadDecoder.ZERO_COPY)
6868
.acceptor(context.getBean("serverAcceptor", MessageHandlerAcceptor.class))
6969
.transport(TcpServerTransport.create("localhost", 7000))
7070
.start()
@@ -108,7 +108,7 @@ private static void connectAndVerify(String destination) {
108108
rsocket = RSocketFactory.connect()
109109
.setupPayload(DefaultPayload.create("", destination))
110110
.dataMimeType("text/plain")
111-
.frameDecoder(Frame::retain) // as per https://github.com/rsocket/rsocket-java#zero-copy
111+
.frameDecoder(PayloadDecoder.ZERO_COPY)
112112
.acceptor(context.getBean("clientAcceptor", MessageHandlerAcceptor.class))
113113
.transport(TcpClientTransport.create("localhost", 7000))
114114
.start()

0 commit comments

Comments
 (0)