Skip to content

Commit 49f5f16

Browse files
authored
Improve the RPC feature (#78)
* Improve the RPC feature * Add thread-safe publish * Add an example * Improve the RPC reconnection * Add server-side retry --- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 7de9f93 commit 49f5f16

14 files changed

+497
-69
lines changed

RabbitMQ.AMQP.Client/IMessage.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
// 2.0, and the Mozilla Public License, version 2.0.
33
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

5+
using System.Collections.Generic;
6+
57
namespace RabbitMQ.AMQP.Client
68
{
79
public interface IMessage
@@ -32,6 +34,15 @@ public interface IMessage
3234
IMessage GroupId(string groupId);
3335
string GroupId();
3436

37+
// Application properties
38+
39+
public IMessage ApplicationProperty(string key, object value);
40+
41+
public object ApplicationProperty(string key);
42+
43+
public IDictionary<object, object> ApplicationProperties();
44+
45+
// Message annotations
3546
public IMessage Annotation(string key, object value);
3647

3748
public object Annotation(string key);

RabbitMQ.AMQP.Client/IRpcClient.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ public interface IRpcClientAddressBuilder : IAddressBuilder<IRpcClientAddressBui
1313
public interface IRpcClientBuilder
1414
{
1515
IRpcClientAddressBuilder RequestAddress();
16-
IRpcClientBuilder ReplyToQueue(string replyToQueue);
16+
IRpcClientBuilder ReplyToQueue(string replyToQueueName);
17+
18+
IRpcClientBuilder ReplyToQueue(IQueueSpecification replyToQueue);
1719
IRpcClientBuilder CorrelationIdExtractor(Func<IMessage, object>? correlationIdExtractor);
1820

1921
IRpcClientBuilder RequestPostProcessor(Func<IMessage, object, IMessage>? requestPostProcessor);

RabbitMQ.AMQP.Client/IRpcServer.cs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,42 @@ namespace RabbitMQ.AMQP.Client
77

88
public interface IRpcServerBuilder
99
{
10+
/// <summary>
11+
/// The queue from which requests are consumed.
12+
/// The client sends requests to this queue and the server consumes them.
13+
/// </summary>
14+
/// <param name="requestQueue"></param>
15+
/// <returns></returns>
1016
IRpcServerBuilder RequestQueue(string requestQueue);
1117
IRpcServerBuilder RequestQueue(IQueueSpecification requestQueue);
18+
19+
/// <summary>
20+
/// Extracts the correlation id from the request message.
21+
/// each message has a correlation id that is used to match the request with the response.
22+
/// There are default implementations for the correlation id extractor.
23+
/// With this method, you can provide a custom implementation.
24+
/// </summary>
25+
/// <param name="correlationIdExtractor"></param>
26+
/// <returns></returns>
27+
1228
IRpcServerBuilder CorrelationIdExtractor(Func<IMessage, object>? correlationIdExtractor);
1329

30+
/// <summary>
31+
/// Post processes the reply message before sending it to the client.
32+
/// The object parameter is the correlation id extracted from the request message.
33+
/// There are default implementations for the reply post processor that use the correlationId() field
34+
/// to set the correlation id of the reply message.
35+
/// With this method, you can provide a custom implementation.
36+
/// </summary>
37+
/// <param name="replyPostProcessor"></param>
38+
/// <returns></returns>
1439
IRpcServerBuilder ReplyPostProcessor(Func<IMessage, object, IMessage>? replyPostProcessor);
1540

41+
/// <summary>
42+
/// Handle the request message and return the reply message.
43+
/// </summary>
44+
/// <param name="handler"></param>
45+
/// <returns></returns>
1646
IRpcServerBuilder Handler(RpcHandler handler);
1747

1848
Task<IRpcServer> BuildAsync();

RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

55
using System;
6+
using System.Collections.Generic;
67
using Amqp;
78
using Amqp.Framing;
89
using Amqp.Types;
@@ -58,6 +59,19 @@ private void EnsureAnnotations()
5859
NativeMessage.MessageAnnotations ??= new MessageAnnotations();
5960
}
6061

62+
private void ThrowIfApplicationPropertiesNotSet()
63+
{
64+
if (NativeMessage.ApplicationProperties == null)
65+
{
66+
throw new FieldNotSetException();
67+
}
68+
}
69+
70+
private void EnsureApplicationProperties()
71+
{
72+
NativeMessage.ApplicationProperties ??= new ApplicationProperties();
73+
}
74+
6175
public object Body()
6276
{
6377
// TODO do we need to do anything with NativeMessage.BodySection?
@@ -156,6 +170,25 @@ public string GroupId()
156170
return NativeMessage.Properties.GroupId;
157171
}
158172

173+
public IMessage ApplicationProperty(string key, object value)
174+
{
175+
EnsureApplicationProperties();
176+
NativeMessage.ApplicationProperties[key] = value;
177+
return this;
178+
}
179+
180+
public object ApplicationProperty(string key)
181+
{
182+
ThrowIfApplicationPropertiesNotSet();
183+
return NativeMessage.ApplicationProperties[key];
184+
}
185+
186+
public IDictionary<object, object> ApplicationProperties()
187+
{
188+
ThrowIfApplicationPropertiesNotSet();
189+
return NativeMessage.ApplicationProperties.Map;
190+
}
191+
159192
// Annotations
160193

161194
public IMessage Annotation(string key, object value)

RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs

Lines changed: 50 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Concurrent;
23
using System.Collections.Generic;
34
using System.Threading;
45
using System.Threading.Tasks;
@@ -37,9 +38,15 @@ public IRpcClientAddressBuilder RequestAddress()
3738
return _addressBuilder;
3839
}
3940

40-
public IRpcClientBuilder ReplyToQueue(string replyToQueue)
41+
public IRpcClientBuilder ReplyToQueue(string replyToQueueName)
4142
{
42-
_configuration.ReplyToQueue = replyToQueue;
43+
_configuration.ReplyToQueue = replyToQueueName;
44+
return this;
45+
}
46+
47+
public IRpcClientBuilder ReplyToQueue(IQueueSpecification replyToQueue)
48+
{
49+
_configuration.ReplyToQueue = replyToQueue.QueueName;
4350
return this;
4451
}
4552

@@ -77,13 +84,24 @@ public async Task<IRpcClient> BuildAsync()
7784
}
7885
}
7986

87+
/// <summary>
88+
/// AmqpRpcClient is an implementation of <see cref="IRpcClient"/>.
89+
/// It is a wrapper around <see cref="IPublisher"/> and <see cref="IConsumer"/> to create an RPC client over AMQP 1.0.
90+
/// even the PublishAsync is async the RPClient blocks the thread until the response is received.
91+
/// within the timeout.
92+
///
93+
/// The PublishAsync is thread-safe and can be called from multiple threads.
94+
///
95+
/// See also the server side <see cref="IRpcServer"/>.
96+
/// </summary>
8097
public class AmqpRpcClient : AbstractLifeCycle, IRpcClient
8198
{
8299
private readonly RpcClientConfiguration _configuration;
83100
private IConsumer? _consumer = null;
84101
private IPublisher? _publisher = null;
85-
private readonly Dictionary<object, TaskCompletionSource<IMessage>> _pendingRequests = new();
102+
private readonly ConcurrentDictionary<object, TaskCompletionSource<IMessage>> _pendingRequests = new();
86103
private readonly string _correlationId = Guid.NewGuid().ToString();
104+
private readonly SemaphoreSlim _semaphore = new(1, 1);
87105
private int _nextCorrelationId = 0;
88106

89107
private object CorrelationIdSupplier()
@@ -170,27 +188,42 @@ public override async Task CloseAsync()
170188
}
171189
}
172190

191+
/// <summary>
192+
/// PublishAsync sends a request message to the server and blocks the thread until the response is received.
193+
/// </summary>
194+
/// <param name="message"> The request message</param>
195+
/// <param name="cancellationToken">Cancellation token</param>
196+
/// <returns></returns>
173197
public async Task<IMessage> PublishAsync(IMessage message, CancellationToken cancellationToken = default)
174198
{
175-
object correlationId = CorrelationIdSupplier();
176-
message = RequestPostProcess(message, correlationId);
177-
_pendingRequests.Add(correlationId, new TaskCompletionSource<IMessage>(TaskCreationOptions.RunContinuationsAsynchronously));
178-
if (_publisher != null)
199+
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
200+
try
179201
{
180-
PublishResult pr = await _publisher.PublishAsync(
181-
message.To(_configuration.RequestAddress), cancellationToken).ConfigureAwait(false);
182-
183-
if (pr.Outcome.State != OutcomeState.Accepted)
202+
object correlationId = CorrelationIdSupplier();
203+
message = RequestPostProcess(message, correlationId);
204+
_pendingRequests.TryAdd(correlationId,
205+
new TaskCompletionSource<IMessage>(TaskCreationOptions.RunContinuationsAsynchronously));
206+
if (_publisher != null)
184207
{
185-
_pendingRequests[correlationId]
186-
.SetException(new Exception($"Failed to send request state: {pr.Outcome.State}"));
208+
PublishResult pr = await _publisher.PublishAsync(
209+
message.To(_configuration.RequestAddress), cancellationToken).ConfigureAwait(false);
210+
211+
if (pr.Outcome.State != OutcomeState.Accepted)
212+
{
213+
_pendingRequests[correlationId]
214+
.SetException(new Exception($"Failed to send request state: {pr.Outcome.State}"));
215+
}
187216
}
188-
}
189217

190-
await _pendingRequests[correlationId].Task.WaitAsync(_configuration.Timeout)
191-
.ConfigureAwait(false);
218+
await _pendingRequests[correlationId].Task.WaitAsync(_configuration.Timeout)
219+
.ConfigureAwait(false);
192220

193-
return await _pendingRequests[correlationId].Task.ConfigureAwait(false);
221+
return await _pendingRequests[correlationId].Task.ConfigureAwait(false);
222+
}
223+
finally
224+
{
225+
_semaphore.Release();
226+
}
194227
}
195228
}
196229
}

RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@ public class RpcConfiguration
1313
public Func<IMessage, object, IMessage>? ReplyPostProcessor { get; set; }
1414
}
1515

16+
/// <summary>
17+
/// AmqpRpcServerBuilder is a builder for creating an AMQP RPC server.
18+
/// </summary>
1619
public class AmqpRpcServerBuilder : IRpcServerBuilder
1720
{
18-
readonly RpcConfiguration _configuration = new RpcConfiguration();
21+
readonly RpcConfiguration _configuration = new();
1922

2023
public AmqpRpcServerBuilder(AmqpConnection connection)
2124
{
@@ -60,6 +63,10 @@ public async Task<IRpcServer> BuildAsync()
6063
}
6164
}
6265

66+
/// <summary>
67+
/// AmqpRpcServer implements the <see cref="IRpcServer"/> interface.
68+
/// With the RpcClient you can create an RPC communication over AMQP 1.0.
69+
/// </summary>
6370
public class AmqpRpcServer : AbstractLifeCycle, IRpcServer
6471
{
6572
private readonly RpcConfiguration _configuration;
@@ -91,7 +98,9 @@ private object ExtractCorrelationId(IMessage message)
9198

9299
private IMessage ReplyPostProcessor(IMessage reply, object correlationId)
93100
{
94-
return _configuration.ReplyPostProcessor != null ? _configuration.ReplyPostProcessor(reply, correlationId) : reply.CorrelationId(correlationId);
101+
return _configuration.ReplyPostProcessor != null
102+
? _configuration.ReplyPostProcessor(reply, correlationId)
103+
: reply.CorrelationId(correlationId);
95104
}
96105

97106
public AmqpRpcServer(RpcConfiguration configuration)
@@ -117,12 +126,32 @@ public override async Task OpenAsync()
117126
}
118127
else
119128
{
120-
Trace.WriteLine(TraceLevel.Error, "No reply-to address in request");
129+
Trace.WriteLine(TraceLevel.Error, "[RPC server] No reply-to address in request");
121130
}
122131

123132
object correlationId = ExtractCorrelationId(request);
124133
reply = ReplyPostProcessor(reply, correlationId);
125-
await SendReply(reply).ConfigureAwait(false);
134+
await Utils.WaitWithBackOffUntilFuncAsync(async () =>
135+
{
136+
try
137+
{
138+
await SendReply(reply).ConfigureAwait(false);
139+
return true;
140+
}
141+
catch (Exception e)
142+
{
143+
Trace.WriteLine(TraceLevel.Error,
144+
$"[RPC server] Failed to send reply: {e.Message}");
145+
return false;
146+
}
147+
},
148+
(success, span) =>
149+
{
150+
if (!success)
151+
{
152+
Trace.WriteLine(TraceLevel.Error, $"Failed to send reply, retrying in {span}");
153+
}
154+
}, 3).ConfigureAwait(false);
126155
}
127156
})
128157
.Queue(_configuration.RequestQueue).BuildAndStartAsync()

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,9 @@ RabbitMQ.AMQP.Client.IManagement.Queue(string! name) -> RabbitMQ.AMQP.Client.IQu
181181
RabbitMQ.AMQP.Client.IMessage
182182
RabbitMQ.AMQP.Client.IMessage.Annotation(string! key) -> object!
183183
RabbitMQ.AMQP.Client.IMessage.Annotation(string! key, object! value) -> RabbitMQ.AMQP.Client.IMessage!
184+
RabbitMQ.AMQP.Client.IMessage.ApplicationProperties() -> System.Collections.Generic.IDictionary<object!, object!>!
185+
RabbitMQ.AMQP.Client.IMessage.ApplicationProperty(string! key) -> object!
186+
RabbitMQ.AMQP.Client.IMessage.ApplicationProperty(string! key, object! value) -> RabbitMQ.AMQP.Client.IMessage!
184187
RabbitMQ.AMQP.Client.IMessage.Body() -> object!
185188
RabbitMQ.AMQP.Client.IMessage.CorrelationId() -> object!
186189
RabbitMQ.AMQP.Client.IMessage.CorrelationId(object! id) -> RabbitMQ.AMQP.Client.IMessage!
@@ -294,6 +297,9 @@ RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(Amqp.Message! nativeMessage) -
294297
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(object! body) -> void
295298
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Annotation(string! key) -> object!
296299
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Annotation(string! key, object! value) -> RabbitMQ.AMQP.Client.IMessage!
300+
RabbitMQ.AMQP.Client.Impl.AmqpMessage.ApplicationProperties() -> System.Collections.Generic.IDictionary<object!, object!>!
301+
RabbitMQ.AMQP.Client.Impl.AmqpMessage.ApplicationProperty(string! key) -> object!
302+
RabbitMQ.AMQP.Client.Impl.AmqpMessage.ApplicationProperty(string! key, object! value) -> RabbitMQ.AMQP.Client.IMessage!
297303
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Body() -> object!
298304
RabbitMQ.AMQP.Client.Impl.AmqpMessage.CorrelationId() -> object!
299305
RabbitMQ.AMQP.Client.Impl.AmqpMessage.CorrelationId(object! id) -> RabbitMQ.AMQP.Client.IMessage!
@@ -365,7 +371,8 @@ RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.AmqpRpcClientBuilder(RabbitMQ.AMQ
365371
RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.BuildAsync() -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IRpcClient!>!
366372
RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.CorrelationIdExtractor(System.Func<RabbitMQ.AMQP.Client.IMessage!, object!>? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
367373
RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.CorrelationIdSupplier(System.Func<object!>? correlationIdSupplier) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
368-
RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.ReplyToQueue(string! replyToQueue) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
374+
RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.ReplyToQueue(RabbitMQ.AMQP.Client.IQueueSpecification! replyToQueue) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
375+
RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.ReplyToQueue(string! replyToQueueName) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
369376
RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.RequestAddress() -> RabbitMQ.AMQP.Client.IRpcClientAddressBuilder!
370377
RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.RequestPostProcessor(System.Func<RabbitMQ.AMQP.Client.IMessage!, object!, RabbitMQ.AMQP.Client.IMessage!>? requestPostProcessor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
371378
RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.Timeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
@@ -594,7 +601,8 @@ RabbitMQ.AMQP.Client.IRpcClientBuilder
594601
RabbitMQ.AMQP.Client.IRpcClientBuilder.BuildAsync() -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IRpcClient!>!
595602
RabbitMQ.AMQP.Client.IRpcClientBuilder.CorrelationIdExtractor(System.Func<RabbitMQ.AMQP.Client.IMessage!, object!>? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
596603
RabbitMQ.AMQP.Client.IRpcClientBuilder.CorrelationIdSupplier(System.Func<object!>? correlationIdSupplier) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
597-
RabbitMQ.AMQP.Client.IRpcClientBuilder.ReplyToQueue(string! replyToQueue) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
604+
RabbitMQ.AMQP.Client.IRpcClientBuilder.ReplyToQueue(RabbitMQ.AMQP.Client.IQueueSpecification! replyToQueue) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
605+
RabbitMQ.AMQP.Client.IRpcClientBuilder.ReplyToQueue(string! replyToQueueName) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
598606
RabbitMQ.AMQP.Client.IRpcClientBuilder.RequestAddress() -> RabbitMQ.AMQP.Client.IRpcClientAddressBuilder!
599607
RabbitMQ.AMQP.Client.IRpcClientBuilder.RequestPostProcessor(System.Func<RabbitMQ.AMQP.Client.IMessage!, object!, RabbitMQ.AMQP.Client.IMessage!>? requestPostProcessor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
600608
RabbitMQ.AMQP.Client.IRpcClientBuilder.Timeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!

0 commit comments

Comments
 (0)