Skip to content

Commit e25b4f5

Browse files
committed
adds routing example with TaggingMetadata and CompositeMetadata
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 7d9c28f commit e25b4f5

File tree

2 files changed

+151
-0
lines changed

2 files changed

+151
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package io.rsocket.examples.transport.tcp.metadata.routing;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.buffer.ByteBufAllocator;
5+
import io.netty.buffer.ByteBufUtil;
6+
import io.netty.buffer.CompositeByteBuf;
7+
import io.rsocket.RSocket;
8+
import io.rsocket.SocketAcceptor;
9+
import io.rsocket.core.RSocketConnector;
10+
import io.rsocket.core.RSocketServer;
11+
import io.rsocket.metadata.CompositeMetadata;
12+
import io.rsocket.metadata.CompositeMetadataCodec;
13+
import io.rsocket.metadata.RoutingMetadata;
14+
import io.rsocket.metadata.TaggingMetadataCodec;
15+
import io.rsocket.metadata.WellKnownMimeType;
16+
import io.rsocket.transport.netty.client.TcpClientTransport;
17+
import io.rsocket.transport.netty.server.TcpServerTransport;
18+
import io.rsocket.util.ByteBufPayload;
19+
import java.util.Collections;
20+
import java.util.Objects;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
import reactor.core.publisher.Mono;
24+
25+
public class CompositeMetadataExample {
26+
static final Logger logger = LoggerFactory.getLogger(CompositeMetadataExample.class);
27+
28+
public static void main(String[] args) {
29+
RSocketServer.create(
30+
SocketAcceptor.forRequestResponse(
31+
payload -> {
32+
final String route = decodeRoute(payload.sliceMetadata());
33+
34+
logger.info("Received RequestResponse[route={}]", route);
35+
36+
if ("my.test.route".equals(route)) {
37+
payload.release();
38+
return Mono.just(ByteBufPayload.create("Hello From My Test Route"));
39+
}
40+
41+
return Mono.error(new IllegalArgumentException("Route " + route + " not found"));
42+
}))
43+
.bindNow(TcpServerTransport.create("localhost", 7000));
44+
45+
RSocket socket =
46+
RSocketConnector.create()
47+
// here we specify that every metadata payload will be encoded using
48+
// CompositeMetadata layout as specified in the following subspec
49+
// https://github.com/rsocket/rsocket/blob/master/Extensions/CompositeMetadata.md
50+
.metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString())
51+
.connect(TcpClientTransport.create("localhost", 7000))
52+
.block();
53+
54+
final ByteBuf routeMetadata =
55+
TaggingMetadataCodec.createTaggingContent(
56+
ByteBufAllocator.DEFAULT, Collections.singletonList("my.test.route"));
57+
final CompositeByteBuf compositeMetadata = ByteBufAllocator.DEFAULT.compositeBuffer();
58+
59+
CompositeMetadataCodec.encodeAndAddMetadata(
60+
compositeMetadata,
61+
ByteBufAllocator.DEFAULT,
62+
WellKnownMimeType.MESSAGE_RSOCKET_ROUTING,
63+
routeMetadata);
64+
65+
socket
66+
.requestResponse(
67+
ByteBufPayload.create(
68+
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "HelloWorld"), compositeMetadata))
69+
.log()
70+
.block();
71+
}
72+
73+
static String decodeRoute(ByteBuf metadata) {
74+
final CompositeMetadata compositeMetadata = new CompositeMetadata(metadata, false);
75+
76+
for (CompositeMetadata.Entry metadatum : compositeMetadata) {
77+
if (Objects.requireNonNull(metadatum.getMimeType())
78+
.equals(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())) {
79+
return new RoutingMetadata(metadatum.getContent()).iterator().next();
80+
}
81+
}
82+
83+
return null;
84+
}
85+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package io.rsocket.examples.transport.tcp.metadata.routing;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.buffer.ByteBufAllocator;
5+
import io.netty.buffer.ByteBufUtil;
6+
import io.rsocket.RSocket;
7+
import io.rsocket.SocketAcceptor;
8+
import io.rsocket.core.RSocketConnector;
9+
import io.rsocket.core.RSocketServer;
10+
import io.rsocket.metadata.RoutingMetadata;
11+
import io.rsocket.metadata.TaggingMetadataCodec;
12+
import io.rsocket.metadata.WellKnownMimeType;
13+
import io.rsocket.transport.netty.client.TcpClientTransport;
14+
import io.rsocket.transport.netty.server.TcpServerTransport;
15+
import io.rsocket.util.ByteBufPayload;
16+
import java.util.Collections;
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
19+
import reactor.core.publisher.Mono;
20+
21+
public class RoutingMetadataExample {
22+
static final Logger logger = LoggerFactory.getLogger(RoutingMetadataExample.class);
23+
24+
public static void main(String[] args) {
25+
RSocketServer.create(
26+
SocketAcceptor.forRequestResponse(
27+
payload -> {
28+
final String route = decodeRoute(payload.sliceMetadata());
29+
30+
logger.info("Received RequestResponse[route={}]", route);
31+
32+
if ("my.test.route".equals(route)) {
33+
payload.release();
34+
return Mono.just(ByteBufPayload.create("Hello From My Test Route"));
35+
}
36+
37+
return Mono.error(new IllegalArgumentException("Route " + route + " not found"));
38+
}))
39+
.bindNow(TcpServerTransport.create("localhost", 7000));
40+
41+
RSocket socket =
42+
RSocketConnector.create()
43+
// here we specify that route will be encoded using
44+
// Routing&Tagging Metadata layout specified at this
45+
// subspec https://github.com/rsocket/rsocket/blob/master/Extensions/Routing.md
46+
.metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())
47+
.connect(TcpClientTransport.create("localhost", 7000))
48+
.block();
49+
50+
final ByteBuf routeMetadata =
51+
TaggingMetadataCodec.createTaggingContent(
52+
ByteBufAllocator.DEFAULT, Collections.singletonList("my.test.route"));
53+
socket
54+
.requestResponse(
55+
ByteBufPayload.create(
56+
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "HelloWorld"), routeMetadata))
57+
.log()
58+
.block();
59+
}
60+
61+
static String decodeRoute(ByteBuf metadata) {
62+
final RoutingMetadata routingMetadata = new RoutingMetadata(metadata);
63+
64+
return routingMetadata.iterator().next();
65+
}
66+
}

0 commit comments

Comments
 (0)