Skip to content

Commit 14e2c68

Browse files
committed
Support for RSocket composite metadata
Closes gh-22798
1 parent 9fb973d commit 14e2c68

File tree

9 files changed

+352
-112
lines changed

9 files changed

+352
-112
lines changed

spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java

Lines changed: 126 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,19 @@
1616

1717
package org.springframework.messaging.rsocket;
1818

19-
import java.nio.charset.StandardCharsets;
19+
import java.util.Arrays;
2020
import java.util.Collections;
21+
import java.util.LinkedHashMap;
22+
import java.util.List;
2123
import java.util.Map;
2224

25+
import io.netty.buffer.ByteBuf;
26+
import io.netty.buffer.ByteBufAllocator;
27+
import io.netty.buffer.CompositeByteBuf;
28+
import io.netty.buffer.Unpooled;
2329
import io.rsocket.Payload;
2430
import io.rsocket.RSocket;
31+
import io.rsocket.metadata.CompositeMetadataFlyweight;
2532
import org.reactivestreams.Publisher;
2633
import reactor.core.publisher.Flux;
2734
import reactor.core.publisher.Mono;
@@ -32,6 +39,10 @@
3239
import org.springframework.core.codec.Decoder;
3340
import org.springframework.core.codec.Encoder;
3441
import org.springframework.core.io.buffer.DataBuffer;
42+
import org.springframework.core.io.buffer.DataBufferFactory;
43+
import org.springframework.core.io.buffer.DataBufferUtils;
44+
import org.springframework.core.io.buffer.NettyDataBuffer;
45+
import org.springframework.core.io.buffer.NettyDataBufferFactory;
3546
import org.springframework.lang.Nullable;
3647
import org.springframework.util.Assert;
3748
import org.springframework.util.MimeType;
@@ -44,24 +55,42 @@
4455
*/
4556
final class DefaultRSocketRequester implements RSocketRequester {
4657

58+
static final MimeType COMPOSITE_METADATA = new MimeType("message", "x.rsocket.composite-metadata.v0");
59+
60+
static final MimeType ROUTING = new MimeType("message", "x.rsocket.routing.v0");
61+
62+
static final List<MimeType> METADATA_MIME_TYPES = Arrays.asList(COMPOSITE_METADATA, ROUTING);
63+
64+
4765
private static final Map<String, Object> EMPTY_HINTS = Collections.emptyMap();
4866

4967

5068
private final RSocket rsocket;
5169

52-
@Nullable
5370
private final MimeType dataMimeType;
5471

72+
private final MimeType metadataMimeType;
73+
5574
private final RSocketStrategies strategies;
5675

57-
private DataBuffer emptyDataBuffer;
76+
private final DataBuffer emptyDataBuffer;
5877

5978

60-
DefaultRSocketRequester(RSocket rsocket, @Nullable MimeType dataMimeType, RSocketStrategies strategies) {
79+
DefaultRSocketRequester(
80+
RSocket rsocket, MimeType dataMimeType, MimeType metadataMimeType,
81+
RSocketStrategies strategies) {
82+
6183
Assert.notNull(rsocket, "RSocket is required");
84+
Assert.notNull(dataMimeType, "'dataMimeType' is required");
85+
Assert.notNull(metadataMimeType, "'metadataMimeType' is required");
6286
Assert.notNull(strategies, "RSocketStrategies is required");
87+
88+
Assert.isTrue(METADATA_MIME_TYPES.contains(metadataMimeType),
89+
() -> "Unexpected metadatata mime type: '" + metadataMimeType + "'");
90+
6391
this.rsocket = rsocket;
6492
this.dataMimeType = dataMimeType;
93+
this.metadataMimeType = metadataMimeType;
6594
this.strategies = strategies;
6695
this.emptyDataBuffer = this.strategies.dataBufferFactory().wrap(new byte[0]);
6796
}
@@ -72,6 +101,16 @@ public RSocket rsocket() {
72101
return this.rsocket;
73102
}
74103

104+
@Override
105+
public MimeType dataMimeType() {
106+
return this.dataMimeType;
107+
}
108+
109+
@Override
110+
public MimeType metadataMimeType() {
111+
return this.metadataMimeType;
112+
}
113+
75114
@Override
76115
public RequestSpec route(String route) {
77116
return new DefaultRequestSpec(route);
@@ -82,13 +121,28 @@ private static boolean isVoid(ResolvableType elementType) {
82121
return (Void.class.equals(elementType.resolve()) || void.class.equals(elementType.resolve()));
83122
}
84123

124+
private DataBufferFactory bufferFactory() {
125+
return this.strategies.dataBufferFactory();
126+
}
127+
85128

86129
private class DefaultRequestSpec implements RequestSpec {
87130

88-
private final String route;
131+
private final Map<Object, MimeType> metadata = new LinkedHashMap<>(4);
132+
133+
134+
public DefaultRequestSpec(String route) {
135+
Assert.notNull(route, "'route' is required");
136+
metadata(route, ROUTING);
137+
}
138+
89139

90-
DefaultRequestSpec(String route) {
91-
this.route = route;
140+
@Override
141+
public RequestSpec metadata(Object metadata, MimeType mimeType) {
142+
Assert.isTrue(this.metadata.isEmpty() || metadataMimeType().equals(COMPOSITE_METADATA),
143+
"Additional metadata entries supported only with composite metadata");
144+
this.metadata.put(metadata, mimeType);
145+
return this;
92146
}
93147

94148
@Override
@@ -122,7 +176,7 @@ else if (adapter != null) {
122176
}
123177
else {
124178
Mono<Payload> payloadMono = Mono
125-
.fromCallable(() -> encodeValue(input, ResolvableType.forInstance(input), null))
179+
.fromCallable(() -> encodeData(input, ResolvableType.forInstance(input), null))
126180
.map(this::firstPayload)
127181
.doOnDiscard(Payload.class, Payload::release)
128182
.switchIfEmpty(emptyPayload());
@@ -139,14 +193,14 @@ else if (adapter != null) {
139193

140194
if (adapter != null && !adapter.isMultiValue()) {
141195
Mono<Payload> payloadMono = Mono.from(publisher)
142-
.map(value -> encodeValue(value, dataType, encoder))
196+
.map(value -> encodeData(value, dataType, encoder))
143197
.map(this::firstPayload)
144198
.switchIfEmpty(emptyPayload());
145199
return new DefaultResponseSpec(payloadMono);
146200
}
147201

148202
Flux<Payload> payloadFlux = Flux.from(publisher)
149-
.map(value -> encodeValue(value, dataType, encoder))
203+
.map(value -> encodeData(value, dataType, encoder))
150204
.switchOnFirst((signal, inner) -> {
151205
DataBuffer data = signal.get();
152206
if (data != null) {
@@ -163,24 +217,80 @@ else if (adapter != null) {
163217
}
164218

165219
@SuppressWarnings("unchecked")
166-
private <T> DataBuffer encodeValue(T value, ResolvableType valueType, @Nullable Encoder<?> encoder) {
220+
private <T> DataBuffer encodeData(T value, ResolvableType valueType, @Nullable Encoder<?> encoder) {
221+
if (value instanceof DataBuffer) {
222+
return (DataBuffer) value;
223+
}
167224
if (encoder == null) {
168-
encoder = strategies.encoder(ResolvableType.forInstance(value), dataMimeType);
225+
valueType = ResolvableType.forInstance(value);
226+
encoder = strategies.encoder(valueType, dataMimeType);
169227
}
170228
return ((Encoder<T>) encoder).encodeValue(
171-
value, strategies.dataBufferFactory(), valueType, dataMimeType, EMPTY_HINTS);
229+
value, bufferFactory(), valueType, dataMimeType, EMPTY_HINTS);
172230
}
173231

174232
private Payload firstPayload(DataBuffer data) {
175-
return PayloadUtils.createPayload(getMetadata(), data);
233+
DataBuffer metadata;
234+
try {
235+
metadata = getMetadata();
236+
return PayloadUtils.createPayload(metadata, data);
237+
}
238+
catch (Throwable ex) {
239+
DataBufferUtils.release(data);
240+
throw ex;
241+
}
176242
}
177243

178244
private Mono<Payload> emptyPayload() {
179245
return Mono.fromCallable(() -> firstPayload(emptyDataBuffer));
180246
}
181247

182248
private DataBuffer getMetadata() {
183-
return strategies.dataBufferFactory().wrap(this.route.getBytes(StandardCharsets.UTF_8));
249+
if (metadataMimeType().equals(COMPOSITE_METADATA)) {
250+
CompositeByteBuf metadata = getAllocator().compositeBuffer();
251+
this.metadata.forEach((key, value) -> {
252+
DataBuffer dataBuffer = encodeMetadata(key, value);
253+
CompositeMetadataFlyweight.encodeAndAddMetadata(metadata, getAllocator(), value.toString(),
254+
dataBuffer instanceof NettyDataBuffer ?
255+
((NettyDataBuffer) dataBuffer).getNativeBuffer() :
256+
Unpooled.wrappedBuffer(dataBuffer.asByteBuffer()));
257+
258+
});
259+
return asDataBuffer(metadata);
260+
}
261+
Assert.isTrue(this.metadata.size() < 2, "Composite metadata required for multiple entries");
262+
Map.Entry<Object, MimeType> entry = this.metadata.entrySet().iterator().next();
263+
Assert.isTrue(metadataMimeType().equals(entry.getValue()),
264+
() -> "Expected metadata MimeType '" + metadataMimeType() + "', actual " + this.metadata);
265+
return encodeMetadata(entry.getKey(), entry.getValue());
266+
}
267+
268+
@SuppressWarnings("unchecked")
269+
private <T> DataBuffer encodeMetadata(Object metadata, MimeType mimeType) {
270+
if (metadata instanceof DataBuffer) {
271+
return (DataBuffer) metadata;
272+
}
273+
ResolvableType type = ResolvableType.forInstance(metadata);
274+
Encoder<T> encoder = strategies.encoder(type, mimeType);
275+
Assert.notNull(encoder, () -> "No encoder for metadata " + metadata + ", mimeType '" + mimeType + "'");
276+
return encoder.encodeValue((T) metadata, bufferFactory(), type, mimeType, EMPTY_HINTS);
277+
}
278+
279+
private ByteBufAllocator getAllocator() {
280+
return bufferFactory() instanceof NettyDataBufferFactory ?
281+
((NettyDataBufferFactory) bufferFactory()).getByteBufAllocator() :
282+
ByteBufAllocator.DEFAULT;
283+
}
284+
285+
private DataBuffer asDataBuffer(ByteBuf byteBuf) {
286+
if (bufferFactory() instanceof NettyDataBufferFactory) {
287+
return ((NettyDataBufferFactory) bufferFactory()).wrap(byteBuf);
288+
}
289+
else {
290+
DataBuffer dataBuffer = bufferFactory().wrap(byteBuf.nioBuffer());
291+
byteBuf.release();
292+
return dataBuffer;
293+
}
184294
}
185295
}
186296

@@ -259,7 +369,7 @@ private <T> Flux<T> retrieveFlux(ResolvableType elementType) {
259369
}
260370

261371
private DataBuffer retainDataAndReleasePayload(Payload payload) {
262-
return PayloadUtils.retainDataAndReleasePayload(payload, strategies.dataBufferFactory());
372+
return PayloadUtils.retainDataAndReleasePayload(payload, bufferFactory());
263373
}
264374
}
265375

spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
4444
@Nullable
4545
private MimeType dataMimeType;
4646

47+
private MimeType metadataMimeType = DefaultRSocketRequester.COMPOSITE_METADATA;
48+
4749
private List<Consumer<RSocketFactory.ClientRSocketFactory>> factoryConfigurers = new ArrayList<>();
4850

4951
@Nullable
@@ -53,11 +55,18 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
5355

5456

5557
@Override
56-
public RSocketRequester.Builder dataMimeType(MimeType mimeType) {
58+
public RSocketRequester.Builder dataMimeType(@Nullable MimeType mimeType) {
5759
this.dataMimeType = mimeType;
5860
return this;
5961
}
6062

63+
@Override
64+
public RSocketRequester.Builder metadataMimeType(MimeType mimeType) {
65+
Assert.notNull(mimeType, "`metadataMimeType` is required");
66+
this.metadataMimeType = mimeType;
67+
return this;
68+
}
69+
6170
@Override
6271
public RSocketRequester.Builder rsocketFactory(Consumer<RSocketFactory.ClientRSocketFactory> configurer) {
6372
this.factoryConfigurers.add(configurer);
@@ -100,10 +109,13 @@ private Mono<RSocketRequester> doConnect(ClientTransport transport) {
100109
RSocketFactory.ClientRSocketFactory rsocketFactory = RSocketFactory.connect();
101110
MimeType dataMimeType = getDataMimeType(rsocketStrategies);
102111
rsocketFactory.dataMimeType(dataMimeType.toString());
112+
rsocketFactory.metadataMimeType(this.metadataMimeType.toString());
103113
this.factoryConfigurers.forEach(consumer -> consumer.accept(rsocketFactory));
104114

105-
return rsocketFactory.transport(transport).start()
106-
.map(rsocket -> new DefaultRSocketRequester(rsocket, dataMimeType, rsocketStrategies));
115+
return rsocketFactory.transport(transport)
116+
.start()
117+
.map(rsocket -> new DefaultRSocketRequester(
118+
rsocket, dataMimeType, this.metadataMimeType, rsocketStrategies));
107119
}
108120

109121
private RSocketStrategies getRSocketStrategies() {

spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424
import io.rsocket.SocketAcceptor;
2525
import reactor.core.publisher.Mono;
2626

27-
import org.springframework.core.io.buffer.DataBufferFactory;
2827
import org.springframework.lang.Nullable;
2928
import org.springframework.messaging.Message;
29+
import org.springframework.util.Assert;
3030
import org.springframework.util.MimeType;
3131
import org.springframework.util.MimeTypeUtils;
3232
import org.springframework.util.StringUtils;
@@ -47,16 +47,28 @@ public final class MessageHandlerAcceptor extends RSocketMessageHandler
4747
@Nullable
4848
private MimeType defaultDataMimeType;
4949

50+
private MimeType defaultMetadataMimeType = DefaultRSocketRequester.COMPOSITE_METADATA;
51+
52+
53+
/**
54+
* Configure the default content type to use for data payloads if the
55+
* {@code SETUP} frame did not specify one.
56+
* <p>By default this is not set.
57+
* @param mimeType the MimeType to use
58+
*/
59+
public void setDefaultDataMimeType(@Nullable MimeType mimeType) {
60+
this.defaultDataMimeType = mimeType;
61+
}
5062

5163
/**
52-
* Configure the default content type to use for data payloads.
53-
* <p>By default this is not set. However a server acceptor will use the
54-
* content type from the {@link ConnectionSetupPayload}, so this is typically
55-
* required for clients but can also be used on servers as a fallback.
56-
* @param defaultDataMimeType the MimeType to use
64+
* Configure the default {@code MimeType} for payload data if the
65+
* {@code SETUP} frame did not specify one.
66+
* <p>By default this is set to {@code "message/x.rsocket.composite-metadata.v0"}
67+
* @param mimeType the MimeType to use
5768
*/
58-
public void setDefaultDataMimeType(@Nullable MimeType defaultDataMimeType) {
59-
this.defaultDataMimeType = defaultDataMimeType;
69+
public void setDefaultMetadataMimeType(MimeType mimeType) {
70+
Assert.notNull(mimeType, "'metadataMimeType' is required");
71+
this.defaultMetadataMimeType = mimeType;
6072
}
6173

6274

@@ -76,12 +88,24 @@ public RSocket apply(ConnectionSetupPayload setupPayload, RSocket sendingRSocket
7688
}
7789

7890
private MessagingRSocket createRSocket(ConnectionSetupPayload setupPayload, RSocket rsocket) {
91+
7992
MimeType dataMimeType = StringUtils.hasText(setupPayload.dataMimeType()) ?
8093
MimeTypeUtils.parseMimeType(setupPayload.dataMimeType()) :
8194
this.defaultDataMimeType;
82-
RSocketRequester requester = RSocketRequester.wrap(rsocket, dataMimeType, getRSocketStrategies());
83-
DataBufferFactory bufferFactory = getRSocketStrategies().dataBufferFactory();
84-
return new MessagingRSocket(this, getRouteMatcher(), requester, dataMimeType, bufferFactory);
95+
Assert.notNull(dataMimeType,
96+
"No `dataMimeType` in the ConnectionSetupPayload and no default value");
97+
98+
MimeType metadataMimeType = StringUtils.hasText(setupPayload.dataMimeType()) ?
99+
MimeTypeUtils.parseMimeType(setupPayload.metadataMimeType()) :
100+
this.defaultMetadataMimeType;
101+
Assert.notNull(dataMimeType,
102+
"No `metadataMimeType` in the ConnectionSetupPayload and no default value");
103+
104+
RSocketRequester requester = RSocketRequester.wrap(
105+
rsocket, dataMimeType, metadataMimeType, getRSocketStrategies());
106+
107+
return new MessagingRSocket(this, getRouteMatcher(), requester,
108+
dataMimeType, metadataMimeType, getRSocketStrategies().dataBufferFactory());
85109
}
86110

87111
}

0 commit comments

Comments
 (0)