Skip to content

Commit 8672d5f

Browse files
authored
Implement RPC server and client (#76)
* Implement the RPC client/ server feature * Implement anonymous publisher --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 3606164 commit 8672d5f

20 files changed

+1129
-112
lines changed

RabbitMQ.AMQP.Client/IAddressBuilder.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,10 @@ public interface IAddressBuilder<out T>
2323

2424
T Key(string key);
2525
}
26+
27+
public interface IMessageAddressBuilder : IAddressBuilder<IMessageAddressBuilder>
28+
{
29+
30+
IMessage Build();
31+
}
2632
}

RabbitMQ.AMQP.Client/IConnection.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ public interface IConnection : ILifeCycle
2626

2727
IConsumerBuilder ConsumerBuilder();
2828

29+
IRpcServerBuilder RpcServerBuilder();
30+
31+
IRpcClientBuilder RpcClientBuilder();
32+
2933
public ReadOnlyCollection<IPublisher> GetPublishers();
3034

3135
public ReadOnlyCollection<IConsumer> GetConsumers();

RabbitMQ.AMQP.Client/IMessage.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,30 @@ public interface IMessage
1212
public object Body();
1313

1414
// properties
15-
string MessageId();
1615
IMessage MessageId(string id);
1716

18-
string CorrelationId();
17+
IMessage MessageId(object id);
18+
object MessageId();
19+
20+
object CorrelationId();
1921
IMessage CorrelationId(string id);
22+
IMessage CorrelationId(object id);
2023

2124
string ReplyTo();
2225
IMessage ReplyTo(string id);
2326

27+
string To();
28+
IMessage To(string id);
29+
2430
string Subject();
2531
IMessage Subject(string subject);
32+
IMessage GroupId(string groupId);
33+
string GroupId();
2634

2735
public IMessage Annotation(string key, object value);
2836

2937
public object Annotation(string key);
38+
39+
IMessageAddressBuilder ToAddress();
3040
}
3141
}

RabbitMQ.AMQP.Client/IRpcClient.cs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
5+
namespace RabbitMQ.AMQP.Client
6+
{
7+
8+
public interface IRpcClientAddressBuilder : IAddressBuilder<IRpcClientAddressBuilder>
9+
{
10+
IRpcClientBuilder RpcClient();
11+
}
12+
13+
public interface IRpcClientBuilder
14+
{
15+
IRpcClientAddressBuilder RequestAddress();
16+
IRpcClientBuilder ReplyToQueue(string replyToQueue);
17+
IRpcClientBuilder CorrelationIdExtractor(Func<IMessage, object>? correlationIdExtractor);
18+
19+
IRpcClientBuilder RequestPostProcessor(Func<IMessage, object, IMessage>? requestPostProcessor);
20+
21+
IRpcClientBuilder CorrelationIdSupplier(Func<object>? correlationIdSupplier);
22+
IRpcClientBuilder Timeout(TimeSpan timeout);
23+
Task<IRpcClient> BuildAsync();
24+
}
25+
26+
public interface IRpcClient : ILifeCycle
27+
{
28+
Task<IMessage> PublishAsync(IMessage message, CancellationToken cancellationToken = default);
29+
}
30+
}

RabbitMQ.AMQP.Client/IRpcServer.cs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
4+
namespace RabbitMQ.AMQP.Client
5+
{
6+
public delegate Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request);
7+
8+
public interface IRpcServerBuilder
9+
{
10+
IRpcServerBuilder RequestQueue(string requestQueue);
11+
IRpcServerBuilder RequestQueue(IQueueSpecification requestQueue);
12+
IRpcServerBuilder CorrelationIdExtractor(Func<IMessage, object>? correlationIdExtractor);
13+
14+
IRpcServerBuilder ReplyPostProcessor(Func<IMessage, object, IMessage>? replyPostProcessor);
15+
16+
IRpcServerBuilder Handler(RpcHandler handler);
17+
18+
Task<IRpcServer> BuildAsync();
19+
}
20+
21+
public interface IRpcServer : ILifeCycle
22+
{
23+
24+
public interface IContext
25+
{
26+
IMessage Message(object body);
27+
}
28+
}
29+
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
using System;
2+
3+
namespace RabbitMQ.AMQP.Client.Impl
4+
{
5+
public abstract class DefaultAddressBuilder<T> : IAddressBuilder<T>
6+
{
7+
private string? _exchange = null;
8+
private string? _queue = null;
9+
private string? _key = null;
10+
protected T? _owner = default;
11+
12+
public T Exchange(IExchangeSpecification exchangeSpec)
13+
{
14+
return Exchange(exchangeSpec.ExchangeName);
15+
}
16+
17+
public T Exchange(string? exchangeName)
18+
{
19+
_exchange = exchangeName;
20+
if (_owner == null)
21+
{
22+
throw new InvalidOperationException("Owner is null");
23+
}
24+
25+
return _owner;
26+
}
27+
28+
public T Queue(IQueueSpecification queueSpec) => Queue(queueSpec.QueueName);
29+
30+
public T Queue(string? queueName)
31+
{
32+
_queue = queueName;
33+
if (_owner == null)
34+
{
35+
throw new InvalidOperationException("Owner is null");
36+
}
37+
38+
return _owner;
39+
}
40+
41+
public T Key(string? key)
42+
{
43+
_key = key;
44+
if (_owner == null)
45+
{
46+
throw new InvalidOperationException("Owner is null");
47+
}
48+
49+
return _owner;
50+
}
51+
52+
public string Address()
53+
{
54+
if (_exchange == null && _queue == null)
55+
{
56+
throw new InvalidAddressException("Exchange or Queue must be set");
57+
}
58+
59+
if (_exchange != null && _queue != null)
60+
{
61+
throw new InvalidAddressException("Exchange and Queue cannot be set together");
62+
}
63+
64+
if (_exchange != null)
65+
{
66+
if (string.IsNullOrEmpty(_exchange))
67+
{
68+
throw new InvalidAddressException("Exchange must be set");
69+
}
70+
71+
if (_key != null && false == string.IsNullOrEmpty(_key))
72+
{
73+
return $"/{Consts.Exchanges}/{Utils.EncodePathSegment(_exchange)}/{Utils.EncodePathSegment(_key)}";
74+
}
75+
76+
return $"/{Consts.Exchanges}/{Utils.EncodePathSegment(_exchange)}";
77+
}
78+
79+
if (_queue == null)
80+
{
81+
return "";
82+
}
83+
84+
if (string.IsNullOrEmpty(_queue))
85+
{
86+
throw new InvalidAddressException("Queue must be set");
87+
}
88+
89+
return $"/{Consts.Queues}/{Utils.EncodePathSegment(_queue)}";
90+
}
91+
}
92+
93+
public class AddressBuilder : DefaultAddressBuilder<AddressBuilder>
94+
{
95+
public AddressBuilder()
96+
{
97+
_owner = this;
98+
}
99+
}
100+
101+
public static class AddressBuilderHelper
102+
{
103+
public static AddressBuilder AddressBuilder() => new();
104+
}
105+
106+
public class MessageAddressBuilder : DefaultAddressBuilder<IMessageAddressBuilder>, IMessageAddressBuilder
107+
{
108+
private readonly IMessage _message;
109+
110+
public MessageAddressBuilder(IMessage message)
111+
{
112+
_message = message;
113+
_owner = this;
114+
}
115+
116+
public IMessage Build()
117+
{
118+
_message.To(Address());
119+
return _message;
120+
}
121+
}
122+
123+
public class RpcClientAddressBuilder : DefaultAddressBuilder<IRpcClientAddressBuilder>, IRpcClientAddressBuilder
124+
{
125+
readonly AmqpRpcClientBuilder _builder;
126+
public RpcClientAddressBuilder(AmqpRpcClientBuilder builder)
127+
{
128+
_builder = builder;
129+
_owner = this;
130+
}
131+
132+
public IRpcClientBuilder RpcClient()
133+
{
134+
return _builder;
135+
}
136+
}
137+
}

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,17 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
5050
private readonly TaskCompletionSource<bool> _connectionClosedTcs =
5151
new(TaskCreationOptions.RunContinuationsAsynchronously);
5252

53+
public IRpcServerBuilder RpcServerBuilder()
54+
{
55+
return new AmqpRpcServerBuilder(this);
56+
}
57+
58+
public IRpcClientBuilder RpcClientBuilder()
59+
{
60+
61+
return new AmqpRpcClientBuilder(this);
62+
}
63+
5364
/// <summary>
5465
/// Read-only collection of publishers.
5566
/// See <see cref="IPublisher"/>

RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public IConsumerBuilder Queue(IQueueSpecification queueSpec)
4343

4444
public IConsumerBuilder Queue(string queueName)
4545
{
46-
string address = new AddressBuilder().Queue(queueName).Address();
46+
string address = AddressBuilderHelper.AddressBuilder().Queue(queueName).Address();
4747
_configuration.Address = address;
4848
return this;
4949
}

RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,20 +64,27 @@ public object Body()
6464
return NativeMessage.Body;
6565
}
6666

67-
public string MessageId()
67+
public object MessageId()
6868
{
6969
ThrowIfPropertiesNotSet();
70-
return NativeMessage.Properties.MessageId;
70+
return NativeMessage.Properties.GetMessageId();
7171
}
7272

7373
public IMessage MessageId(string id)
7474
{
7575
EnsureProperties();
76-
NativeMessage.Properties.MessageId = id;
76+
NativeMessage.Properties.SetMessageId(id);
7777
return this;
7878
}
7979

80-
public string CorrelationId()
80+
public IMessage MessageId(object id)
81+
{
82+
EnsureProperties();
83+
NativeMessage.Properties.SetMessageId(id);
84+
return this;
85+
}
86+
87+
public object CorrelationId()
8188
{
8289
ThrowIfPropertiesNotSet();
8390
return NativeMessage.Properties.CorrelationId;
@@ -86,7 +93,14 @@ public string CorrelationId()
8693
public IMessage CorrelationId(string id)
8794
{
8895
EnsureProperties();
89-
NativeMessage.Properties.CorrelationId = id;
96+
NativeMessage.Properties.SetCorrelationId(id);
97+
return this;
98+
}
99+
100+
public IMessage CorrelationId(object id)
101+
{
102+
EnsureProperties();
103+
NativeMessage.Properties.SetCorrelationId(id);
90104
return this;
91105
}
92106

@@ -103,6 +117,19 @@ public IMessage ReplyTo(string id)
103117
return this;
104118
}
105119

120+
public string To()
121+
{
122+
ThrowIfPropertiesNotSet();
123+
return NativeMessage.Properties.To;
124+
}
125+
126+
public IMessage To(string id)
127+
{
128+
EnsureProperties();
129+
NativeMessage.Properties.To = id;
130+
return this;
131+
}
132+
106133
public string Subject()
107134
{
108135
ThrowIfPropertiesNotSet();
@@ -116,6 +143,19 @@ public IMessage Subject(string subject)
116143
return this;
117144
}
118145

146+
public IMessage GroupId(string groupId)
147+
{
148+
EnsureProperties();
149+
NativeMessage.Properties.GroupId = groupId;
150+
return this;
151+
}
152+
153+
public string GroupId()
154+
{
155+
ThrowIfPropertiesNotSet();
156+
return NativeMessage.Properties.GroupId;
157+
}
158+
119159
// Annotations
120160

121161
public IMessage Annotation(string key, object value)
@@ -130,5 +170,10 @@ public object Annotation(string key)
130170
ThrowIfAnnotationsNotSet();
131171
return NativeMessage.MessageAnnotations[new Symbol(key)];
132172
}
173+
174+
public IMessageAddressBuilder ToAddress()
175+
{
176+
return new MessageAddressBuilder(this);
177+
}
133178
}
134179
}

RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ public class AmqpPublisher : AbstractReconnectLifeCycle, IPublisher
1616
{
1717
private readonly AmqpConnection _connection;
1818
private readonly TimeSpan _timeout;
19-
private readonly string _address;
19+
private readonly string? _address;
2020
private readonly Guid _id = Guid.NewGuid();
2121

2222
private SenderLink? _senderLink = null;
2323

24-
public AmqpPublisher(AmqpConnection connection, string address, TimeSpan timeout)
24+
public AmqpPublisher(AmqpConnection connection, string? address, TimeSpan timeout)
2525
{
2626
_connection = connection;
2727
_address = address;

0 commit comments

Comments
 (0)