Skip to content

Commit a5fd6ec

Browse files
committed
Add support for RPC
With RPC client and server API.
1 parent 68a5195 commit a5fd6ec

14 files changed

+674
-3
lines changed

src/main/java/com/rabbitmq/model/Connection.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ public interface Connection extends Closeable, Resource {
2727

2828
ConsumerBuilder consumerBuilder();
2929

30+
RpcClientBuilder rpcClientBuilder();
31+
32+
RpcServerBuilder rpcServerBuilder();
33+
3034
@Override
3135
void close();
3236
}

src/main/java/com/rabbitmq/model/Message.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,16 @@ public interface Message {
3737

3838
UUID messageIdAsUuid();
3939

40+
Object correlationId();
41+
42+
String correlationIdAsString();
43+
44+
long correlationIdAsLong();
45+
46+
byte[] correlationIdAsBinary();
47+
48+
UUID correlationIdAsUuid();
49+
4050
byte[] userId();
4151

4252
String to();
@@ -45,6 +55,8 @@ public interface Message {
4555

4656
String replyTo();
4757

58+
Message messageId(Object id);
59+
4860
Message messageId(String id);
4961

5062
Message messageId(long id);
@@ -53,6 +65,16 @@ public interface Message {
5365

5466
Message messageId(UUID id);
5567

68+
Message correlationId(Object correlationId);
69+
70+
Message correlationId(String correlationId);
71+
72+
Message correlationId(long correlationId);
73+
74+
Message correlationId(byte[] correlationId);
75+
76+
Message correlationId(UUID correlationId);
77+
5678
Message userId(byte[] userId);
5779

5880
Message to(String address);
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.model;
19+
20+
import java.util.concurrent.CompletableFuture;
21+
22+
public interface RpcClient extends AutoCloseable {
23+
24+
Message message();
25+
26+
Message message(byte[] body);
27+
28+
CompletableFuture<Message> publish(Message message);
29+
30+
@Override
31+
void close();
32+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.model;
19+
20+
public interface RpcClientBuilder {
21+
22+
RpcClientAddressBuilder requestAddress();
23+
24+
RpcClientBuilder replyToQueue(String replyToQueue);
25+
26+
RpcClient build();
27+
28+
interface RpcClientAddressBuilder extends AddressBuilder<RpcClientAddressBuilder> {
29+
30+
RpcClientBuilder rpcClient();
31+
}
32+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.model;
19+
20+
public interface RpcServer extends AutoCloseable {
21+
22+
interface Handler {
23+
24+
Message handle(Context ctx, Message request);
25+
}
26+
27+
interface Context {
28+
29+
Message message();
30+
31+
Message message(byte[] body);
32+
}
33+
34+
@Override
35+
void close();
36+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.model;
19+
20+
public interface RpcServerBuilder {
21+
22+
RpcServerBuilder requestQueue(String requestQueue);
23+
24+
RpcServerBuilder handler(RpcServer.Handler handler);
25+
26+
RpcServerAddressBuilder replyToAddress();
27+
28+
RpcServer build();
29+
30+
interface RpcServerAddressBuilder extends AddressBuilder<RpcServerAddressBuilder> {
31+
32+
RpcServerBuilder rpcServer();
33+
}
34+
}

src/main/java/com/rabbitmq/model/amqp/AmqpConnection.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,16 @@ public ConsumerBuilder consumerBuilder() {
152152
return new AmqpConsumerBuilder(this);
153153
}
154154

155+
@Override
156+
public RpcClientBuilder rpcClientBuilder() {
157+
return new RpcSupport.AmqpRpcClientBuilder(this);
158+
}
159+
160+
@Override
161+
public RpcServerBuilder rpcServerBuilder() {
162+
return new RpcSupport.AmqpRpcServerBuilder(this);
163+
}
164+
155165
@Override
156166
public void close() {
157167
if (this.closed.compareAndSet(false, true)) {

src/main/java/com/rabbitmq/model/amqp/AmqpMessage.java

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,31 @@ public UUID messageIdAsUuid() {
8787
return returnFromDelegate(m -> (UUID) m.messageId());
8888
}
8989

90+
@Override
91+
public Object correlationId() {
92+
return returnFromDelegate(org.apache.qpid.protonj2.client.Message::correlationId);
93+
}
94+
95+
@Override
96+
public String correlationIdAsString() {
97+
return returnFromDelegate(m -> (String) m.correlationId());
98+
}
99+
100+
@Override
101+
public long correlationIdAsLong() {
102+
return returnFromDelegate(m -> ((UnsignedLong) m.correlationId())).longValue();
103+
}
104+
105+
@Override
106+
public byte[] correlationIdAsBinary() {
107+
return returnFromDelegate(m -> ((Binary) m.correlationId()).asByteArray());
108+
}
109+
110+
@Override
111+
public UUID correlationIdAsUuid() {
112+
return returnFromDelegate(m -> (UUID) m.correlationId());
113+
}
114+
90115
@Override
91116
public byte[] userId() {
92117
return returnFromDelegate(org.apache.qpid.protonj2.client.Message::userId);
@@ -107,6 +132,12 @@ public String replyTo() {
107132
return returnFromDelegate(org.apache.qpid.protonj2.client.Message::replyTo);
108133
}
109134

135+
@Override
136+
public Message messageId(Object id) {
137+
callOnDelegate(m -> m.messageId(id));
138+
return this;
139+
}
140+
110141
@Override
111142
public Message messageId(String id) {
112143
callOnDelegate(m -> m.messageId(id));
@@ -131,6 +162,36 @@ public Message messageId(UUID id) {
131162
return this;
132163
}
133164

165+
@Override
166+
public Message correlationId(Object correlationId) {
167+
callOnDelegate(m -> m.correlationId(correlationId));
168+
return this;
169+
}
170+
171+
@Override
172+
public Message correlationId(String correlationId) {
173+
callOnDelegate(m -> m.correlationId(correlationId));
174+
return this;
175+
}
176+
177+
@Override
178+
public Message correlationId(long correlationId) {
179+
callOnDelegate(m -> m.correlationId(correlationId));
180+
return this;
181+
}
182+
183+
@Override
184+
public Message correlationId(byte[] correlationId) {
185+
callOnDelegate(m -> m.correlationId(correlationId));
186+
return this;
187+
}
188+
189+
@Override
190+
public Message correlationId(UUID correlationId) {
191+
callOnDelegate(m -> m.correlationId(correlationId));
192+
return this;
193+
}
194+
134195
@Override
135196
public Message userId(byte[] userId) {
136197
callOnDelegate(m -> m.userId(userId));
@@ -297,10 +358,10 @@ public Object removeProperty(String key) {
297358

298359
@Override
299360
public MessageAddressBuilder address() {
300-
return new DefaultMessageAddressBuilder<>(this);
361+
return new DefaultMessageAddressBuilder(this);
301362
}
302363

303-
private static class DefaultMessageAddressBuilder<T>
364+
private static class DefaultMessageAddressBuilder
304365
extends DefaultAddressBuilder<MessageAddressBuilder> implements MessageAddressBuilder {
305366

306367
private final Message message;

src/main/java/com/rabbitmq/model/amqp/AmqpPublisherBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
1818
package com.rabbitmq.model.amqp;
1919

20+
import com.rabbitmq.model.AddressBuilder;
2021
import com.rabbitmq.model.Publisher;
2122
import com.rabbitmq.model.PublisherBuilder;
2223
import com.rabbitmq.model.Resource;
@@ -72,6 +73,10 @@ List<Resource.StateListener> listeners() {
7273
return listeners;
7374
}
7475

76+
AddressBuilder<?> addressBuilder() {
77+
return this.addressBuilder;
78+
}
79+
7580
String address() {
7681
return this.addressBuilder.address();
7782
}

0 commit comments

Comments
 (0)