Skip to content

Add documentation #79

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
# RabbitMQ AMQP 1.0 .NET Client

This library is meant to be used with RabbitMQ 4.0.
Still work in progress suitable for testing in pre-production environments
Suitable for testing in pre-production environments

## How to Run

- Start the broker with `./.ci/ubuntu/one-node/gha-setup.sh start`. Note that this has been tested on Ubuntu 22 with docker.
- Run the tests with ` dotnet test ./Build.csproj --logger "console;verbosity=detailed"`
- Stop RabbitMQ with `./.ci/ubuntu/one-node/gha-setup.sh stop`
## Install

## Getting Started
The client is distributed via [NuGet](https://www.nuget.org/packages/RabbitMQ.AMQP.Client/).

You can find an example in: `docs/Examples/GettingStarted`
## Examples

## Install
Inside the `docs/Examples` directory you can find examples of how to use the client.

The client is distributed via [NuGet](https://www.nuget.org/packages/RabbitMQ.AMQP.Client/).

## Documentation

- [Client Guide](https://www.rabbitmq.com/client-libraries/amqp-client-libraries)
- [API](https://rabbitmq.github.io/rabbitmq-amqp-dotnet-client/api/RabbitMQ.AMQP.Client.html)


## How to Run

- Start the broker with `./.ci/ubuntu/one-node/gha-setup.sh start`. Note that this has been tested on Ubuntu 22 with docker.
- Run the tests with ` dotnet test ./Build.csproj --logger "console;verbosity=detailed"`
- Stop RabbitMQ with `./.ci/ubuntu/one-node/gha-setup.sh stop`
66 changes: 66 additions & 0 deletions RabbitMQ.AMQP.Client/IRpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,89 @@ public interface IRpcClientAddressBuilder : IAddressBuilder<IRpcClientAddressBui
IRpcClientBuilder RpcClient();
}

/// <summary>
/// IRpcClientBuilder is the interface for creating an RPC client.
/// See also <seealso cref="IRpcClient"/> and <seealso cref="IRpcServerBuilder"/>
/// </summary>
public interface IRpcClientBuilder
{
/// <summary>
/// Request address where the client sends requests.
/// The server consumes requests from this address.
/// </summary>
/// <returns></returns>
IRpcClientAddressBuilder RequestAddress();

/// <summary>
/// The queue from which requests are consumed.
/// if not set the client will create a temporary queue.
/// </summary>
/// <param name="replyToQueueName"> The queue name</param>
/// <returns></returns>
IRpcClientBuilder ReplyToQueue(string replyToQueueName);

IRpcClientBuilder ReplyToQueue(IQueueSpecification replyToQueue);

/// <summary>
/// Extracts the correlation id from the request message.
/// each message has a correlation id that is used to match the request with the response.
/// There are default implementations for the correlation id extractor.
/// With this method, you can provide a custom implementation.
/// </summary>
/// <param name="correlationIdExtractor"></param>
/// <returns></returns>
IRpcClientBuilder CorrelationIdExtractor(Func<IMessage, object>? correlationIdExtractor);

/// <summary>
/// Post processes the reply message before sending it to the server.
/// The object parameter is the correlation id extracted from the request message.
/// There are default implementations for the reply post processor that use the correlationId() field
/// to set the correlation id of the reply message.
/// With this method, you can provide a custom implementation.
/// </summary>
/// <param name="requestPostProcessor"></param>
/// <returns></returns>
IRpcClientBuilder RequestPostProcessor(Func<IMessage, object, IMessage>? requestPostProcessor);

/// <summary>
/// Client and Server must agree on the correlation id.
/// The client will provide the correlation id to send to the server.
/// If the default correlation id is not suitable, you can provide a custom correlation id supplier.
/// Be careful to provide a unique correlation id for each request.
/// </summary>
/// <param name="correlationIdSupplier"></param>
/// <returns></returns>

IRpcClientBuilder CorrelationIdSupplier(Func<object>? correlationIdSupplier);

/// <summary>
/// The time to wait for a reply from the server.
/// </summary>
/// <param name="timeout"></param>
/// <returns></returns>
IRpcClientBuilder Timeout(TimeSpan timeout);
/// <summary>
/// Build and return the RPC client.
/// </summary>
/// <returns></returns>
Task<IRpcClient> BuildAsync();
}

/// <summary>
/// IRpcClient is the interface for an RPC client.
/// See also <seealso cref="IRpcServer"/> and <seealso cref="IRpcClientBuilder"/>
/// </summary>
public interface IRpcClient : ILifeCycle
{
/// <summary>
/// PublishAsync sends a request message to the server and blocks the thread until the response is received.
/// The PublishAsync is thread-safe and can be called from multiple threads.
/// The Function returns the response message.
/// If the server does not respond within the timeout, the function throws a TimeoutException.
/// </summary>
/// <param name="message"> The request message</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns></returns>
Task<IMessage> PublishAsync(IMessage message, CancellationToken cancellationToken = default);
}
}
22 changes: 21 additions & 1 deletion RabbitMQ.AMQP.Client/IRpcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@

namespace RabbitMQ.AMQP.Client
{
public delegate Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request);

/// <summary>
/// IRpcServerBuilder is the interface for creating an RPC server.
/// The RPC server consumes requests from a queue and sends replies to a reply queue.
/// See also <seealso cref="IRpcServer"/> and <seealso cref="IRpcClientBuilder"/>
/// </summary>
public interface IRpcServerBuilder
{
/// <summary>
Expand Down Expand Up @@ -45,9 +49,25 @@ public interface IRpcServerBuilder
/// <returns></returns>
IRpcServerBuilder Handler(RpcHandler handler);

/// <summary>
/// Build and return the RPC server.
/// </summary>
/// <returns></returns>
Task<IRpcServer> BuildAsync();
}

/// <summary>
/// Event handler for handling RPC requests.
/// </summary>
public delegate Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request);

/// <summary>
/// IRpcServer interface for creating an RPC server.
/// The RPC is simulated by sending a request message and receiving a reply message.
/// Where the client sends the queue where wants to receive the reply.
/// RPC client ---> request queue ---> RPC server ---> reply queue ---> RPC client
/// See also <seealso cref="IRpcClient"/>
/// </summary>
public interface IRpcServer : ILifeCycle
{

Expand Down
6 changes: 0 additions & 6 deletions RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,6 @@ public override async Task CloseAsync()
}
}

/// <summary>
/// PublishAsync sends a request message to the server and blocks the thread until the response is received.
/// </summary>
/// <param name="message"> The request message</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns></returns>
public async Task<IMessage> PublishAsync(IMessage message, CancellationToken cancellationToken = default)
{
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ await Utils.WaitWithBackOffUntilFuncAsync(async () =>
{
Trace.WriteLine(TraceLevel.Error, $"Failed to send reply, retrying in {span}");
}
}, 3).ConfigureAwait(false);
}, 5).ConfigureAwait(false);
}
})
.Queue(_configuration.RequestQueue).BuildAndStartAsync()
Expand Down
36 changes: 36 additions & 0 deletions Tests/Rpc/RpcServerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -318,5 +318,41 @@ public async Task RpcClientMultiThreadShouldBeSafe()
await rpcServer.CloseAsync();
await rpcClient.CloseAsync();
}

/// <summary>
/// The RPC client `PublishAsync` should raise a timeout exception if the server does not reply within the timeout
/// </summary>
[Fact]
public async Task RpcClientShouldRaiseTimeoutError()
{
Assert.NotNull(_connection);
Assert.NotNull(_management);
string requestQueue = _queueName;
await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync();
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) =>
{
var reply = context.Message("pong");
object millisecondsToWait = request.ApplicationProperty("wait");
Thread.Sleep(TimeSpan.FromMilliseconds((int)millisecondsToWait));
return Task.FromResult(reply);
}).RequestQueue(_queueName).BuildAsync();
Assert.NotNull(rpcServer);

IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress()
.Queue(requestQueue)
.RpcClient()
.Timeout(TimeSpan.FromMilliseconds(300))
.BuildAsync();

IMessage reply = await rpcClient.PublishAsync(
new AmqpMessage("ping").ApplicationProperty("wait", 1));
Assert.Equal("pong", reply.Body());

await Assert.ThrowsAsync<TimeoutException>(() => rpcClient.PublishAsync(
new AmqpMessage("ping").ApplicationProperty("wait", 700)));

await rpcClient.CloseAsync();
await rpcServer.CloseAsync();
}
}
}
8 changes: 8 additions & 0 deletions docs/Examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# RabbitMQ AMQP 1.0 .NET Client examples

This directory contains examples of how to use the RabbitMQ AMQP 1.0 .NET client.

- Getting Started with the Client [here](./GettingStarted/)
- RPC Server and Client [here](./Rpc/)
- How to write a reliable client [here](./HAClient/)
- Performance Test [here](./PerformanceTest/). You can tune some parameters in the `Program.cs` file.