Skip to content

Commit 83aba18

Browse files
committed
Document RPC support
1 parent 38e8739 commit 83aba18

File tree

9 files changed

+243
-21
lines changed

9 files changed

+243
-21
lines changed

src/docs/asciidoc/usage.adoc

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,103 @@ include::{test-examples}/Api.java[tag=connection-recovery-deactivate]
261261
--------
262262
<1> Deactivate recovery
263263

264+
=== Remote Procedure Call (RPC)
264265

266+
Remote procedure call with RabbitMQ consists in a client sending a request message and a server replying with a response message.
267+
Both the RPC client and server are _client applications_ and the messages flow through the broker.
268+
The RPC client must send a reply-to queue address with the request.
269+
The RPC server uses this reply-to queue address to send the response.
270+
There must also be a way to correlate a request with its response, this is usually handled with a header that the RPC client and server agree on.
271+
272+
The library provides RPC client and server support classes.
273+
They use sensible defaults and some of the internal mechanics are configurable.
274+
They should meet the requirements of most RPC use cases.
275+
It is still possible to implement one part or the other with regular publishers and consumers for special cases, as this is what the RPC support classes do.
276+
277+
Here is how to create an RPC server instance:
278+
279+
.Creating an RPC server
280+
[source,java,indent=0]
281+
--------
282+
include::{test-examples}/RpcApi.java[tag=rpc-server-creation]
283+
--------
284+
<1> Use builder from connection
285+
<2> Set the queue to consume requests from (it must exist)
286+
<3> Define the processing logic
287+
<4> Create the reply message
288+
289+
Note the RPC server does not create the queue it waits requests on.
290+
It must be created beforehand.
291+
292+
Here is how to create an RPC client:
293+
294+
.Creating an RPC client
295+
[source,java,indent=0]
296+
--------
297+
include::{test-examples}/RpcApi.java[tag=rpc-client-creation]
298+
--------
299+
<1> Use builder from connection
300+
<2> Set the address to send request messages to
301+
302+
The RPC client will send its request to the configured destination.
303+
It can be an exchange or a queue, like in the example above.
304+
305+
Here is how to send a request:
306+
307+
.Sending a request
308+
[source,java,indent=0]
309+
--------
310+
include::{test-examples}/RpcApi.java[tag=rpc-client-request]
311+
--------
312+
<1> Create the message request
313+
<2> Send the request
314+
<3> Wait for the reply (synchronously)
315+
316+
The `RpcClient#publish(Message)` method returns a `CompletableFuture<Message>` that holds the reply message.
317+
It is then possible to wait for the reply asynchronously or synchronously.
318+
319+
The RPC server uses the following defaults:
320+
321+
* it uses the _request_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`message-id` property] for the correlation ID.
322+
* it assigns the correlation ID (so the _request_ `message-id` by default) to the _reply_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`correlation-id` property].
323+
* it assigns the _request_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`reply-to` property] to the _reply_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`to` property] if it is defined.
324+
This behavior is hardcoded but it is possible to cancel it thanks to a reply post-processor.
325+
326+
The RPC client uses the following defaults:
327+
328+
* it creates and waits for replies on an auto-delete, exclusive queue if no reply-to queue is set.
329+
* it uses a string-based correlation ID generator, with a fixed random UUID prefix and a strictly monotonic increasing sequence suffix (`{UUID}-{sequence}`, e.g. `6f839461-6b19-47e1-80b3-6be10d899d85-42`).
330+
The prefix is different for each `RpcClient` instance and the suffix is incremented by one for each request.
331+
* it sets the _request_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`reply-to` property] to the reply-to queue address (defined by the user or created automatically, see above).
332+
* it sets the _request_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`message-id` property] to the generated correlation ID.
333+
* it extracts the correlation ID from the _reply_ https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties[`correlation-id` property] to correlate a reply with the appropriate request.
334+
335+
Let's see how to customize some of the RPC support mechanics.
336+
Imagine the request `message-id` property is a critical piece of information and we do not want to use it as the correlation ID.
337+
The request can use the `correlation-id` property and the RPC server just has to extract the correlation ID from this property (instead of the `message-id` property by default).
338+
Let's also use a random UUID for the correlation ID generation (avoids doing this in production: this is OK in terms of uniqueness but not optimal in terms of performance because randomness is not cheap).
339+
340+
Here is how to declare the RPC client:
341+
342+
.Customizing the RPC client
343+
[source,java,indent=0]
344+
--------
345+
include::{test-examples}/RpcApi.java[tag=rpc-custom-client-creation]
346+
--------
347+
<1> Declare the reply-to queue
348+
<2> Use a random UUID as correlation ID
349+
<3> Use the `correlation-id` property for the request
350+
<4> Set the `reply-to` property
351+
<5> Set the address to send request messages to
352+
353+
We just have to tell the RPC server to get the correlation ID from the request `correlation-id` property:
354+
355+
.Customizing the RPC server
356+
[source,java,indent=0]
357+
--------
358+
include::{test-examples}/RpcApi.java[tag=rpc-custom-server-creation]
359+
--------
360+
<1> Get the correlation ID from the request `correlation-id` property
265361

266362
== Dependencies
267363

src/main/java/com/rabbitmq/model/Message.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,9 @@ public interface Message {
138138
// TODO support message annotations
139139
// TODO support message headers
140140

141-
MessageAddressBuilder address();
141+
MessageAddressBuilder toAddress();
142+
143+
MessageAddressBuilder replyToAddress();
142144

143145
interface MessageAddressBuilder extends AddressBuilder<MessageAddressBuilder> {
144146

src/main/java/com/rabbitmq/model/amqp/AmqpMessage.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.math.BigDecimal;
2525
import java.util.Date;
2626
import java.util.UUID;
27+
import java.util.function.BiConsumer;
2728
import org.apache.qpid.protonj2.client.exceptions.ClientException;
2829
import org.apache.qpid.protonj2.types.*;
2930

@@ -359,18 +360,29 @@ public Object removeProperty(String key) {
359360
}
360361

361362
@Override
362-
public MessageAddressBuilder address() {
363-
return new DefaultMessageAddressBuilder(this);
363+
public MessageAddressBuilder toAddress() {
364+
return new DefaultMessageAddressBuilder(this, DefaultMessageAddressBuilder.TO_CALLBACK);
365+
}
366+
367+
@Override
368+
public MessageAddressBuilder replyToAddress() {
369+
return new DefaultMessageAddressBuilder(this, DefaultMessageAddressBuilder.REPLY_TO_CALLBACK);
364370
}
365371

366372
private static class DefaultMessageAddressBuilder
367373
extends DefaultAddressBuilder<MessageAddressBuilder> implements MessageAddressBuilder {
368374

375+
private static final BiConsumer<Message, String> TO_CALLBACK = Message::to;
376+
private static final BiConsumer<Message, String> REPLY_TO_CALLBACK = Message::replyTo;
377+
369378
private final Message message;
379+
private final BiConsumer<Message, String> buildCallback;
370380

371-
private DefaultMessageAddressBuilder(Message message) {
381+
private DefaultMessageAddressBuilder(
382+
Message message, BiConsumer<Message, String> buildCallback) {
372383
super(null);
373384
this.message = message;
385+
this.buildCallback = buildCallback;
374386
}
375387

376388
@Override
@@ -380,7 +392,7 @@ MessageAddressBuilder result() {
380392

381393
@Override
382394
public Message message() {
383-
this.message.to(this.address());
395+
this.buildCallback.accept(this.message, this.address());
384396
return this.message;
385397
}
386398
}

src/main/java/com/rabbitmq/model/amqp/AmqpRpcClient.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ class AmqpRpcClient implements RpcClient {
4242
AmqpRpcClient(RpcSupport.AmqpRpcClientBuilder builder) {
4343
AmqpConnection connection = builder.connection();
4444

45-
// TODO request address cannot be null
4645
AmqpPublisherBuilder publisherBuilder = (AmqpPublisherBuilder) connection.publisherBuilder();
4746
((DefaultAddressBuilder<?>) builder.requestAddress()).copyTo(publisherBuilder.addressBuilder());
4847
this.publisher = publisherBuilder.build();

src/main/java/com/rabbitmq/model/amqp/RpcSupport.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,6 @@ Function<Message, Object> correlationIdExtractor() {
7878

7979
@Override
8080
public RpcClient build() {
81-
if (this.requestAddressBuilder.address() == null) {
82-
throw new IllegalArgumentException("Request address cannot be null");
83-
}
8481
return new AmqpRpcClient(this);
8582
}
8683

src/test/java/com/rabbitmq/model/amqp/AddressFormatTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -179,14 +179,14 @@ void exchangeKeyInToField(TestInfo info) {
179179

180180
CountDownLatch failedLatch = new CountDownLatch(2);
181181
publisher.publish(
182-
publisher.message().address().exchange(e).message(),
182+
publisher.message().toAddress().exchange(e).message(),
183183
ctx -> {
184184
if (ctx.status() == Publisher.Status.FAILED) {
185185
failedLatch.countDown();
186186
}
187187
});
188188
publisher.publish(
189-
publisher.message().address().exchange(e).key("foo").message(),
189+
publisher.message().toAddress().exchange(e).key("foo").message(),
190190
ctx -> {
191191
if (ctx.status() == Publisher.Status.FAILED) {
192192
failedLatch.countDown();
@@ -205,8 +205,8 @@ void exchangeKeyInToField(TestInfo info) {
205205
})
206206
.build();
207207

208-
publisher.publish(publisher.message().address().exchange(e).key(k).message(), ctx -> {});
209-
publisher.publish(publisher.message().address().queue(q).message(), ctx -> {});
208+
publisher.publish(publisher.message().toAddress().exchange(e).key(k).message(), ctx -> {});
209+
publisher.publish(publisher.message().toAddress().queue(q).message(), ctx -> {});
210210
assertThat(consumeLatch).completes();
211211
} finally {
212212
management.queueDeletion().delete(q);

src/test/java/com/rabbitmq/model/amqp/RpcTest.java

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,7 @@ void rpcWithDefaults() {
9292
() -> {
9393
String request = UUID.randomUUID().toString();
9494
CompletableFuture<Message> responseFuture =
95-
rpcClient.publish(
96-
rpcClient
97-
.message(request.getBytes(UTF_8))
98-
.messageId(UUID.randomUUID()));
95+
rpcClient.publish(rpcClient.message(request.getBytes(UTF_8)));
9996
Message response = responseFuture.get(10, TimeUnit.SECONDS);
10097
assertThat(response.body()).asString(UTF_8).isEqualTo(process(request));
10198
latch.countDown();
@@ -141,7 +138,7 @@ void rpcWithCustomSettings() {
141138
(ctx, request) -> {
142139
Message reply = HANDLER.handle(ctx, request);
143140
return reply
144-
.address()
141+
.toAddress()
145142
.queue(request.property("reply-to-queue").toString())
146143
.message();
147144
})
@@ -166,6 +163,54 @@ void rpcWithCustomSettings() {
166163
}
167164
}
168165

166+
@Test
167+
void rpcUseCorrelationIdRequestProperty() {
168+
try (Connection clientConnection = environment.connectionBuilder().build();
169+
Connection serverConnection = environment.connectionBuilder().build()) {
170+
171+
String requestQueue = serverConnection.management().queue().exclusive(true).declare().name();
172+
173+
String replyToQueue =
174+
clientConnection.management().queue().autoDelete(true).exclusive(true).declare().name();
175+
RpcClient rpcClient =
176+
clientConnection
177+
.rpcClientBuilder()
178+
.correlationIdSupplier(UUID::randomUUID)
179+
.requestPostProcessor(
180+
(msg, corrId) ->
181+
msg.correlationId(corrId).replyToAddress().queue(replyToQueue).message())
182+
.replyToQueue(replyToQueue)
183+
.requestAddress()
184+
.queue(requestQueue)
185+
.rpcClient()
186+
.build();
187+
188+
serverConnection
189+
.rpcServerBuilder()
190+
.correlationIdExtractor(Message::correlationId)
191+
.requestQueue(requestQueue)
192+
.handler(HANDLER)
193+
.build();
194+
195+
int requestCount = 100;
196+
CountDownLatch latch = new CountDownLatch(requestCount);
197+
IntStream.range(0, requestCount)
198+
.forEach(
199+
ignored ->
200+
executorService.submit(
201+
() -> {
202+
String request = UUID.randomUUID().toString();
203+
CompletableFuture<Message> responseFuture =
204+
rpcClient.publish(rpcClient.message(request.getBytes(UTF_8)));
205+
Message response = responseFuture.get(10, TimeUnit.SECONDS);
206+
assertThat(response.body()).asString(UTF_8).isEqualTo(process(request));
207+
latch.countDown();
208+
return null;
209+
}));
210+
TestUtils.assertThat(latch).completes();
211+
}
212+
}
213+
169214
@Test
170215
void rpcShouldRecoverAfterConnectionIsClosed()
171216
throws ExecutionException, InterruptedException, TimeoutException {

src/test/java/com/rabbitmq/model/docs/Api.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,15 +117,15 @@ void targetAddressNull() {
117117
.build(); // <1>
118118

119119
Message message1 = publisher.message()
120-
.address().exchange("foo").key("bar") // <2>
120+
.toAddress().exchange("foo").key("bar") // <2>
121121
.message();
122122

123123
Message message2 = publisher.message()
124-
.address().exchange("foo") // <3>
124+
.toAddress().exchange("foo") // <3>
125125
.message();
126126

127127
Message message3 = publisher.message()
128-
.address().queue("my-queue") // <4>
128+
.toAddress().queue("my-queue") // <4>
129129
.message();
130130
// end::target-address-null[]
131131
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package com.rabbitmq.model.docs;
2+
3+
import com.rabbitmq.model.Connection;
4+
import com.rabbitmq.model.Message;
5+
import com.rabbitmq.model.RpcClient;
6+
import com.rabbitmq.model.RpcServer;
7+
8+
import java.nio.charset.StandardCharsets;
9+
import java.util.UUID;
10+
import java.util.concurrent.CompletableFuture;
11+
import java.util.concurrent.TimeUnit;
12+
13+
import static java.nio.charset.StandardCharsets.UTF_8;
14+
15+
public class RpcApi {
16+
17+
void rpcWithDefaults() throws Exception {
18+
Connection connection = null;
19+
// tag::rpc-server-creation[]
20+
RpcServer rpcServer = connection.rpcServerBuilder() // <1>
21+
.requestQueue("rpc-server") // <2>
22+
.handler((ctx, req) -> { // <3>
23+
String in = new String(req.body(), UTF_8);
24+
String out = "*** " + in + " ***";
25+
return ctx.message(out.getBytes(UTF_8)); // <4>
26+
}).build();
27+
// end::rpc-server-creation[]
28+
29+
// tag::rpc-client-creation[]
30+
RpcClient rpcClient = connection.rpcClientBuilder() // <1>
31+
.requestAddress().queue("rpc-server") // <2>
32+
.rpcClient()
33+
.build();
34+
// end::rpc-client-creation[]
35+
36+
// tag::rpc-client-request[]
37+
Message request = rpcClient.message("hello".getBytes(UTF_8)); // <1>
38+
CompletableFuture<Message> replyFuture = rpcClient.publish(request); // <2>
39+
Message reply = replyFuture.get(10, TimeUnit.SECONDS); // <3>
40+
// end::rpc-client-request[]
41+
}
42+
43+
void rpcWithCustomSettings() throws Exception {
44+
Connection connection = null;
45+
// tag::rpc-custom-client-creation[]
46+
String replyToQueue = connection.management().queue()
47+
.autoDelete(true).exclusive(true)
48+
.declare().name(); // <1>
49+
RpcClient rpcClient = connection.rpcClientBuilder()
50+
.correlationIdSupplier(UUID::randomUUID) // <2>
51+
.requestPostProcessor((msg, corrId) ->
52+
msg.correlationId(corrId) // <3>
53+
.replyToAddress().queue(replyToQueue).message()) // <4>
54+
.replyToQueue(replyToQueue)
55+
.requestAddress().queue("rpc-server") // <5>
56+
.rpcClient()
57+
.build();
58+
// end::rpc-custom-client-creation[]
59+
60+
// tag::rpc-custom-server-creation[]
61+
RpcServer rpcServer = connection.rpcServerBuilder()
62+
.correlationIdExtractor(Message::correlationId) // <1>
63+
.requestQueue("rpc-server")
64+
.handler((ctx, req) -> {
65+
String in = new String(req.body(), UTF_8);
66+
String out = "*** " + in + " ***";
67+
return ctx.message(out.getBytes(UTF_8));
68+
}).build();
69+
// end::rpc-custom-server-creation[]
70+
}
71+
}

0 commit comments

Comments
 (0)