Skip to content

Commit 56c149e

Browse files
committed
Make RPC mechanics configurable
1 parent 3496f4d commit 56c149e

File tree

8 files changed

+213
-34
lines changed

8 files changed

+213
-34
lines changed

src/main/java/com/rabbitmq/model/Consumer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public interface Consumer extends AutoCloseable {
2222
@Override
2323
void close();
2424

25+
@FunctionalInterface
2526
interface MessageHandler {
2627

2728
void handle(Context context, Message message);

src/main/java/com/rabbitmq/model/RpcClientBuilder.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,22 @@
1717
1818
package com.rabbitmq.model;
1919

20+
import java.util.function.BiFunction;
21+
import java.util.function.Function;
22+
import java.util.function.Supplier;
23+
2024
public interface RpcClientBuilder {
2125

2226
RpcClientAddressBuilder requestAddress();
2327

2428
RpcClientBuilder replyToQueue(String replyToQueue);
2529

30+
RpcClientBuilder correlationIdSupplier(Supplier<Object> correlationIdSupplier);
31+
32+
RpcClientBuilder requestPostProcessor(BiFunction<Message, Object, Message> requestPostProcessor);
33+
34+
RpcClientBuilder correlationIdExtractor(Function<Message, Object> correlationIdExtractor);
35+
2636
RpcClient build();
2737

2838
interface RpcClientAddressBuilder extends AddressBuilder<RpcClientAddressBuilder> {

src/main/java/com/rabbitmq/model/RpcServer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
public interface RpcServer extends AutoCloseable {
2121

22+
@FunctionalInterface
2223
interface Handler {
2324

2425
Message handle(Context ctx, Message request);

src/main/java/com/rabbitmq/model/RpcServerBuilder.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
1818
package com.rabbitmq.model;
1919

20+
import java.util.function.BiFunction;
21+
import java.util.function.Function;
22+
2023
public interface RpcServerBuilder {
2124

2225
RpcServerBuilder requestQueue(String requestQueue);
@@ -25,6 +28,10 @@ public interface RpcServerBuilder {
2528

2629
RpcServerAddressBuilder replyToAddress();
2730

31+
RpcServerBuilder correlationIdExtractor(Function<Message, Object> correlationIdExtractor);
32+
33+
RpcServerBuilder replyPostProcessor(BiFunction<Message, Object, Message> replyPostProcessor);
34+
2835
RpcServer build();
2936

3037
interface RpcServerAddressBuilder extends AddressBuilder<RpcServerAddressBuilder> {

src/main/java/com/rabbitmq/model/amqp/AmqpRpcClient.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.ConcurrentHashMap;
2525
import java.util.concurrent.atomic.AtomicLong;
2626
import java.util.function.BiFunction;
27+
import java.util.function.Function;
2728
import java.util.function.Supplier;
2829

2930
class AmqpRpcClient implements RpcClient {
@@ -32,11 +33,11 @@ class AmqpRpcClient implements RpcClient {
3233

3334
private final Publisher publisher;
3435
private final Consumer consumer;
35-
private final String replyToAddress;
3636
private final Map<Object, CompletableFuture<Message>> outstandingRequests =
3737
new ConcurrentHashMap<>();
3838
private final Supplier<Object> correlationIdSupplier;
3939
private final BiFunction<Message, Object, Message> requestPostProcessor;
40+
private final Function<Message, Object> correlationIdExtractor;
4041

4142
AmqpRpcClient(RpcSupport.AmqpRpcClientBuilder builder) {
4243
AmqpConnection connection = builder.connection();
@@ -52,6 +53,11 @@ class AmqpRpcClient implements RpcClient {
5253
connection.management().queue().exclusive(true).autoDelete(true).declare();
5354
replyTo = queueInfo.name();
5455
}
56+
if (builder.correlationIdExtractor() == null) {
57+
this.correlationIdExtractor = Message::correlationId;
58+
} else {
59+
this.correlationIdExtractor = builder.correlationIdExtractor();
60+
}
5561
this.consumer =
5662
connection
5763
.consumerBuilder()
@@ -60,22 +66,31 @@ class AmqpRpcClient implements RpcClient {
6066
(ctx, msg) -> {
6167
ctx.accept();
6268
CompletableFuture<Message> result =
63-
this.outstandingRequests.remove(msg.correlationId());
69+
this.outstandingRequests.remove(this.correlationIdExtractor.apply(msg));
6470
if (result != null) {
6571
result.complete(msg);
6672
}
6773
})
6874
.build();
69-
DefaultAddressBuilder<?> addressBuilder = new DefaultAddressBuilder<>(null) {};
70-
addressBuilder.queue(replyTo);
71-
this.replyToAddress = addressBuilder.address();
7275

73-
String correlationIdPrefix = UUID.randomUUID().toString();
74-
AtomicLong correlationIdSequence = new AtomicLong();
75-
this.correlationIdSupplier =
76-
() -> correlationIdPrefix + "-" + correlationIdSequence.getAndIncrement();
77-
this.requestPostProcessor =
78-
(request, correlationId) -> request.replyTo(this.replyToAddress).messageId(correlationId);
76+
if (builder.correlationIdSupplier() == null) {
77+
String correlationIdPrefix = UUID.randomUUID().toString();
78+
AtomicLong correlationIdSequence = new AtomicLong();
79+
this.correlationIdSupplier =
80+
() -> correlationIdPrefix + "-" + correlationIdSequence.getAndIncrement();
81+
} else {
82+
this.correlationIdSupplier = builder.correlationIdSupplier();
83+
}
84+
85+
if (builder.requestPostProcessor() == null) {
86+
DefaultAddressBuilder<?> addressBuilder = new DefaultAddressBuilder<>(null) {};
87+
addressBuilder.queue(replyTo);
88+
String replyToAddress = addressBuilder.address();
89+
this.requestPostProcessor =
90+
(request, correlationId) -> request.replyTo(replyToAddress).messageId(correlationId);
91+
} else {
92+
this.requestPostProcessor = builder.requestPostProcessor();
93+
}
7994
}
8095

8196
@Override
@@ -93,7 +108,7 @@ public CompletableFuture<Message> publish(Message message) {
93108
Object correlationId = this.correlationIdSupplier.get();
94109
message = this.requestPostProcessor.apply(message, correlationId);
95110
CompletableFuture<Message> result = new CompletableFuture<>();
96-
this.outstandingRequests.put(message.messageId(), result);
111+
this.outstandingRequests.put(correlationId, result);
97112
this.publisher.publish(message, NO_OP_CALLBACK);
98113
return result;
99114
}

src/main/java/com/rabbitmq/model/amqp/AmqpRpcServer.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,16 @@ public Message message(byte[] body) {
5151
return publisher.message(body);
5252
}
5353
};
54-
this.correlationIdExtractor = Message::messageId;
55-
this.replyPostProcessor = Message::correlationId;
54+
if (builder.correlationIdExtractor() == null) {
55+
this.correlationIdExtractor = Message::messageId;
56+
} else {
57+
this.correlationIdExtractor = builder.correlationIdExtractor();
58+
}
59+
if (builder.replyPostProcessor() == null) {
60+
this.replyPostProcessor = Message::correlationId;
61+
} else {
62+
this.replyPostProcessor = builder.replyPostProcessor();
63+
}
5664
this.consumer =
5765
connection
5866
.consumerBuilder()
@@ -61,7 +69,9 @@ public Message message(byte[] body) {
6169
(ctx, msg) -> {
6270
ctx.accept();
6371
Message reply = handler.handle(context, msg);
64-
reply.to(msg.replyTo());
72+
if (msg.replyTo() != null) {
73+
reply.to(msg.replyTo());
74+
}
6575
Object correlationId = correlationIdExtractor.apply(msg);
6676
reply = replyPostProcessor.apply(reply, correlationId);
6777
this.publisher.publish(reply, NO_OP_CALLBACK);

src/main/java/com/rabbitmq/model/amqp/RpcSupport.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
package com.rabbitmq.model.amqp;
1919

2020
import com.rabbitmq.model.*;
21+
import java.util.function.BiFunction;
22+
import java.util.function.Function;
23+
import java.util.function.Supplier;
2124

2225
abstract class RpcSupport {
2326

@@ -30,6 +33,9 @@ static class AmqpRpcClientBuilder implements RpcClientBuilder {
3033
private final DefaultRpcClientAddressBuilder requestAddressBuilder =
3134
new DefaultRpcClientAddressBuilder(this);
3235
private String replyToQueue;
36+
private Supplier<Object> correlationIdSupplier;
37+
private BiFunction<Message, Object, Message> requestPostProcessor;
38+
private Function<Message, Object> correlationIdExtractor;
3339

3440
AmqpRpcClientBuilder(AmqpConnection connection) {
3541
this.connection = connection;
@@ -46,8 +52,35 @@ public RpcClientBuilder replyToQueue(String replyToQueue) {
4652
return this;
4753
}
4854

55+
@Override
56+
public RpcClientBuilder correlationIdSupplier(Supplier<Object> correlationIdSupplier) {
57+
this.correlationIdSupplier = correlationIdSupplier;
58+
return this;
59+
}
60+
61+
@Override
62+
public RpcClientBuilder requestPostProcessor(
63+
BiFunction<Message, Object, Message> requestPostProcessor) {
64+
this.requestPostProcessor = requestPostProcessor;
65+
return this;
66+
}
67+
68+
@Override
69+
public RpcClientBuilder correlationIdExtractor(
70+
Function<Message, Object> correlationIdExtractor) {
71+
this.correlationIdExtractor = correlationIdExtractor;
72+
return this;
73+
}
74+
75+
Function<Message, Object> correlationIdExtractor() {
76+
return correlationIdExtractor;
77+
}
78+
4979
@Override
5080
public RpcClient build() {
81+
if (this.requestAddressBuilder.address() == null) {
82+
throw new IllegalArgumentException("Request address cannot be null");
83+
}
5184
return new AmqpRpcClient(this);
5285
}
5386

@@ -58,6 +91,14 @@ AmqpConnection connection() {
5891
String replyToQueue() {
5992
return this.replyToQueue;
6093
}
94+
95+
Supplier<Object> correlationIdSupplier() {
96+
return this.correlationIdSupplier;
97+
}
98+
99+
BiFunction<Message, Object, Message> requestPostProcessor() {
100+
return this.requestPostProcessor;
101+
}
61102
}
62103

63104
private static class DefaultRpcClientAddressBuilder
@@ -90,6 +131,8 @@ static class AmqpRpcServerBuilder implements RpcServerBuilder {
90131
private RpcServer.Handler handler;
91132
private final DefaultRpcServerAddressBuilder replyToAddressBuilder =
92133
new DefaultRpcServerAddressBuilder(this);
134+
private Function<Message, Object> correlationIdExtractor;
135+
private BiFunction<Message, Object, Message> replyPostProcessor;
93136

94137
AmqpRpcServerBuilder(AmqpConnection connection) {
95138
this.connection = connection;
@@ -112,6 +155,20 @@ public RpcServerAddressBuilder replyToAddress() {
112155
return this.replyToAddressBuilder;
113156
}
114157

158+
@Override
159+
public RpcServerBuilder correlationIdExtractor(
160+
Function<Message, Object> correlationIdExtractor) {
161+
this.correlationIdExtractor = correlationIdExtractor;
162+
return this;
163+
}
164+
165+
@Override
166+
public RpcServerBuilder replyPostProcessor(
167+
BiFunction<Message, Object, Message> replyPostProcessor) {
168+
this.replyPostProcessor = replyPostProcessor;
169+
return this;
170+
}
171+
115172
@Override
116173
public RpcServer build() {
117174
return new AmqpRpcServer(this);
@@ -128,6 +185,14 @@ String requestQueue() {
128185
RpcServer.Handler handler() {
129186
return this.handler;
130187
}
188+
189+
Function<Message, Object> correlationIdExtractor() {
190+
return this.correlationIdExtractor;
191+
}
192+
193+
BiFunction<Message, Object, Message> replyPostProcessor() {
194+
return this.replyPostProcessor;
195+
}
131196
}
132197

133198
private static class DefaultRpcServerAddressBuilder

0 commit comments

Comments
 (0)