Skip to content

Commit 7cfdcbb

Browse files
authored
Add documentation (#79)
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 49f5f16 commit 7cfdcbb

File tree

7 files changed

+145
-17
lines changed

7 files changed

+145
-17
lines changed

README.md

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,26 @@
11
# RabbitMQ AMQP 1.0 .NET Client
22

33
This library is meant to be used with RabbitMQ 4.0.
4-
Still work in progress suitable for testing in pre-production environments
4+
Suitable for testing in pre-production environments
55

6-
## How to Run
76

8-
- Start the broker with `./.ci/ubuntu/one-node/gha-setup.sh start`. Note that this has been tested on Ubuntu 22 with docker.
9-
- Run the tests with ` dotnet test ./Build.csproj --logger "console;verbosity=detailed"`
10-
- Stop RabbitMQ with `./.ci/ubuntu/one-node/gha-setup.sh stop`
7+
## Install
118

12-
## Getting Started
9+
The client is distributed via [NuGet](https://www.nuget.org/packages/RabbitMQ.AMQP.Client/).
1310

14-
You can find an example in: `docs/Examples/GettingStarted`
11+
## Examples
1512

16-
## Install
13+
Inside the `docs/Examples` directory you can find examples of how to use the client.
1714

18-
The client is distributed via [NuGet](https://www.nuget.org/packages/RabbitMQ.AMQP.Client/).
1915

2016
## Documentation
2117

18+
- [Client Guide](https://www.rabbitmq.com/client-libraries/amqp-client-libraries)
2219
- [API](https://rabbitmq.github.io/rabbitmq-amqp-dotnet-client/api/RabbitMQ.AMQP.Client.html)
20+
21+
22+
## How to Run
23+
24+
- Start the broker with `./.ci/ubuntu/one-node/gha-setup.sh start`. Note that this has been tested on Ubuntu 22 with docker.
25+
- Run the tests with ` dotnet test ./Build.csproj --logger "console;verbosity=detailed"`
26+
- Stop RabbitMQ with `./.ci/ubuntu/one-node/gha-setup.sh stop`

RabbitMQ.AMQP.Client/IRpcClient.cs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,89 @@ public interface IRpcClientAddressBuilder : IAddressBuilder<IRpcClientAddressBui
1010
IRpcClientBuilder RpcClient();
1111
}
1212

13+
/// <summary>
14+
/// IRpcClientBuilder is the interface for creating an RPC client.
15+
/// See also <seealso cref="IRpcClient"/> and <seealso cref="IRpcServerBuilder"/>
16+
/// </summary>
1317
public interface IRpcClientBuilder
1418
{
19+
/// <summary>
20+
/// Request address where the client sends requests.
21+
/// The server consumes requests from this address.
22+
/// </summary>
23+
/// <returns></returns>
1524
IRpcClientAddressBuilder RequestAddress();
25+
26+
/// <summary>
27+
/// The queue from which requests are consumed.
28+
/// if not set the client will create a temporary queue.
29+
/// </summary>
30+
/// <param name="replyToQueueName"> The queue name</param>
31+
/// <returns></returns>
1632
IRpcClientBuilder ReplyToQueue(string replyToQueueName);
1733

1834
IRpcClientBuilder ReplyToQueue(IQueueSpecification replyToQueue);
35+
36+
/// <summary>
37+
/// Extracts the correlation id from the request message.
38+
/// each message has a correlation id that is used to match the request with the response.
39+
/// There are default implementations for the correlation id extractor.
40+
/// With this method, you can provide a custom implementation.
41+
/// </summary>
42+
/// <param name="correlationIdExtractor"></param>
43+
/// <returns></returns>
1944
IRpcClientBuilder CorrelationIdExtractor(Func<IMessage, object>? correlationIdExtractor);
2045

46+
/// <summary>
47+
/// Post processes the reply message before sending it to the server.
48+
/// The object parameter is the correlation id extracted from the request message.
49+
/// There are default implementations for the reply post processor that use the correlationId() field
50+
/// to set the correlation id of the reply message.
51+
/// With this method, you can provide a custom implementation.
52+
/// </summary>
53+
/// <param name="requestPostProcessor"></param>
54+
/// <returns></returns>
2155
IRpcClientBuilder RequestPostProcessor(Func<IMessage, object, IMessage>? requestPostProcessor);
2256

57+
/// <summary>
58+
/// Client and Server must agree on the correlation id.
59+
/// The client will provide the correlation id to send to the server.
60+
/// If the default correlation id is not suitable, you can provide a custom correlation id supplier.
61+
/// Be careful to provide a unique correlation id for each request.
62+
/// </summary>
63+
/// <param name="correlationIdSupplier"></param>
64+
/// <returns></returns>
65+
2366
IRpcClientBuilder CorrelationIdSupplier(Func<object>? correlationIdSupplier);
67+
68+
/// <summary>
69+
/// The time to wait for a reply from the server.
70+
/// </summary>
71+
/// <param name="timeout"></param>
72+
/// <returns></returns>
2473
IRpcClientBuilder Timeout(TimeSpan timeout);
74+
/// <summary>
75+
/// Build and return the RPC client.
76+
/// </summary>
77+
/// <returns></returns>
2578
Task<IRpcClient> BuildAsync();
2679
}
2780

81+
/// <summary>
82+
/// IRpcClient is the interface for an RPC client.
83+
/// See also <seealso cref="IRpcServer"/> and <seealso cref="IRpcClientBuilder"/>
84+
/// </summary>
2885
public interface IRpcClient : ILifeCycle
2986
{
87+
/// <summary>
88+
/// PublishAsync sends a request message to the server and blocks the thread until the response is received.
89+
/// The PublishAsync is thread-safe and can be called from multiple threads.
90+
/// The Function returns the response message.
91+
/// If the server does not respond within the timeout, the function throws a TimeoutException.
92+
/// </summary>
93+
/// <param name="message"> The request message</param>
94+
/// <param name="cancellationToken">Cancellation token</param>
95+
/// <returns></returns>
3096
Task<IMessage> PublishAsync(IMessage message, CancellationToken cancellationToken = default);
3197
}
3298
}

RabbitMQ.AMQP.Client/IRpcServer.cs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,12 @@
33

44
namespace RabbitMQ.AMQP.Client
55
{
6-
public delegate Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request);
76

7+
/// <summary>
8+
/// IRpcServerBuilder is the interface for creating an RPC server.
9+
/// The RPC server consumes requests from a queue and sends replies to a reply queue.
10+
/// See also <seealso cref="IRpcServer"/> and <seealso cref="IRpcClientBuilder"/>
11+
/// </summary>
812
public interface IRpcServerBuilder
913
{
1014
/// <summary>
@@ -45,9 +49,25 @@ public interface IRpcServerBuilder
4549
/// <returns></returns>
4650
IRpcServerBuilder Handler(RpcHandler handler);
4751

52+
/// <summary>
53+
/// Build and return the RPC server.
54+
/// </summary>
55+
/// <returns></returns>
4856
Task<IRpcServer> BuildAsync();
4957
}
5058

59+
/// <summary>
60+
/// Event handler for handling RPC requests.
61+
/// </summary>
62+
public delegate Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request);
63+
64+
/// <summary>
65+
/// IRpcServer interface for creating an RPC server.
66+
/// The RPC is simulated by sending a request message and receiving a reply message.
67+
/// Where the client sends the queue where wants to receive the reply.
68+
/// RPC client ---> request queue ---> RPC server ---> reply queue ---> RPC client
69+
/// See also <seealso cref="IRpcClient"/>
70+
/// </summary>
5171
public interface IRpcServer : ILifeCycle
5272
{
5373

RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -188,12 +188,6 @@ public override async Task CloseAsync()
188188
}
189189
}
190190

191-
/// <summary>
192-
/// PublishAsync sends a request message to the server and blocks the thread until the response is received.
193-
/// </summary>
194-
/// <param name="message"> The request message</param>
195-
/// <param name="cancellationToken">Cancellation token</param>
196-
/// <returns></returns>
197191
public async Task<IMessage> PublishAsync(IMessage message, CancellationToken cancellationToken = default)
198192
{
199193
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);

RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ await Utils.WaitWithBackOffUntilFuncAsync(async () =>
151151
{
152152
Trace.WriteLine(TraceLevel.Error, $"Failed to send reply, retrying in {span}");
153153
}
154-
}, 3).ConfigureAwait(false);
154+
}, 5).ConfigureAwait(false);
155155
}
156156
})
157157
.Queue(_configuration.RequestQueue).BuildAndStartAsync()

Tests/Rpc/RpcServerTests.cs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,5 +318,41 @@ public async Task RpcClientMultiThreadShouldBeSafe()
318318
await rpcServer.CloseAsync();
319319
await rpcClient.CloseAsync();
320320
}
321+
322+
/// <summary>
323+
/// The RPC client `PublishAsync` should raise a timeout exception if the server does not reply within the timeout
324+
/// </summary>
325+
[Fact]
326+
public async Task RpcClientShouldRaiseTimeoutError()
327+
{
328+
Assert.NotNull(_connection);
329+
Assert.NotNull(_management);
330+
string requestQueue = _queueName;
331+
await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync();
332+
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) =>
333+
{
334+
var reply = context.Message("pong");
335+
object millisecondsToWait = request.ApplicationProperty("wait");
336+
Thread.Sleep(TimeSpan.FromMilliseconds((int)millisecondsToWait));
337+
return Task.FromResult(reply);
338+
}).RequestQueue(_queueName).BuildAsync();
339+
Assert.NotNull(rpcServer);
340+
341+
IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress()
342+
.Queue(requestQueue)
343+
.RpcClient()
344+
.Timeout(TimeSpan.FromMilliseconds(300))
345+
.BuildAsync();
346+
347+
IMessage reply = await rpcClient.PublishAsync(
348+
new AmqpMessage("ping").ApplicationProperty("wait", 1));
349+
Assert.Equal("pong", reply.Body());
350+
351+
await Assert.ThrowsAsync<TimeoutException>(() => rpcClient.PublishAsync(
352+
new AmqpMessage("ping").ApplicationProperty("wait", 700)));
353+
354+
await rpcClient.CloseAsync();
355+
await rpcServer.CloseAsync();
356+
}
321357
}
322358
}

docs/Examples/README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# RabbitMQ AMQP 1.0 .NET Client examples
2+
3+
This directory contains examples of how to use the RabbitMQ AMQP 1.0 .NET client.
4+
5+
- Getting Started with the Client [here](./GettingStarted/)
6+
- RPC Server and Client [here](./Rpc/)
7+
- How to write a reliable client [here](./HAClient/)
8+
- Performance Test [here](./PerformanceTest/). You can tune some parameters in the `Program.cs` file.

0 commit comments

Comments
 (0)