Skip to content

First version for API documentation #57

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: rabbitmq-amqp-dotnet-client
name: Trigger build and test

on:
push:
Expand All @@ -8,4 +8,4 @@ on:

jobs:
call-build-test:
uses: ./.github/workflows/build-test.yaml
uses: ./.github/workflows/wf_build-and-test.yaml
9 changes: 9 additions & 0 deletions .github/workflows/trigger_gh-pages.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
name: Trigger github pages

on:
push:
branches: [ main ]

jobs:
call-gh-pages:
uses: ./.github/workflows/wf_gh-pages.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: publish rabbitmq-dotnet-client
name: Trigger publish the client to nuget

on:
release:
Expand All @@ -8,8 +8,8 @@ on:

jobs:
call-build-test:
uses: ./.github/workflows/build-test.yaml
uses: ./.github/workflows/wf_build-and-test.yaml
call-publish-nuget:
uses: ./.github/workflows/publish-nuget.yaml
uses: ./.github/workflows/wf_publish-nuget.yaml
needs: call-build-test
secrets: inherit
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Test against supported .NET
name: Workflow test against supported .NET

on:
- workflow_call
Expand Down Expand Up @@ -34,7 +34,7 @@ jobs:
id: install-start-rabbitmq
run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1
- name: Test
timeout-minutes: 15
timeout-minutes: 20
run: dotnet test ${{ github.workspace }}\Build.csproj --no-restore --no-build --logger 'console;verbosity=detailed'
- name: Check for errors in RabbitMQ logs
run: ${{ github.workspace }}\.ci\windows\gha-log-check.ps1
Expand Down
45 changes: 45 additions & 0 deletions .github/workflows/wf_gh-pages.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: Workflow github pages

# Your GitHub workflow file under .github/workflows/
# Trigger the action on push to main
on:
- workflow_call


# Sets permissions of the GITHUB_TOKEN to allow deployment to GitHub Pages
permissions:
actions: read
pages: write
id-token: write

# Allow only one concurrent deployment, skipping runs queued between the run in-progress and latest queued.
# However, do NOT cancel in-progress runs as we want to allow these production deployments to complete.
concurrency:
group: "pages"
cancel-in-progress: false

jobs:
publish-docs:
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Dotnet Setup
uses: actions/setup-dotnet@v3
with:
dotnet-version: 8.x

- run: dotnet tool update -g docfx
- run: docfx docfx.json

- name: Upload artifact
uses: actions/upload-pages-artifact@v3
with:
# Upload entire repository
path: '_site'
- name: Deploy to GitHub Pages
id: deployment
uses: actions/deploy-pages@v4
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: publish-nuget
name: Workflow publish the client to nuget

on:
workflow_call:
Expand Down
27 changes: 4 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# RabbitMQ AMQP 1.0 .NET Client

This library is in early stages of development. It is meant to be used with RabbitMQ 4.0.
This library is meant to be used with RabbitMQ 4.0.
Still work in progress suitable for testing in pre-production environments

## How to Run

Expand All @@ -16,26 +17,6 @@ You can find an example in: `docs/Examples/GettingStarted`

The client is distributed via [NuGet](https://www.nuget.org/packages/RabbitMQ.AMQP.Client/).

## TODO

- [x] Declare queues
- [x] Declare exchanges
- [x] Declare bindings
- [x] Simple Publish messages
- [x] Implement backpressure (it is implemented with MaxInflightMessages `MaxInFlight(2000).`)
- [x] Simple Consume messages
- [x] Recovery connection on connection lost
- [x] Recovery management on connection lost
- [x] Recovery queues on connection lost
- [x] Recovery publishers on connection lost
- [x] Recovery consumers on connection lost
- [x] Implement Environment to manage the connections
- [x] Complete the consumer part with `pause` and `unpause`
- [x] Complete the binding/unbinding with the special characters
- [x] Complete the queues/exchanges name with the special characters
- [ ] Implement metrics ( See `System.Diagnostics.DiagnosticSource` [Link](https://learn.microsoft.com/en-us/dotnet/core/diagnostics/metrics-instrumentation) )
- [x] Recovery exchanges on connection lost
- [x] Recovery bindings on connection lost
- [ ] Docker image to test in LRE [not mandatory]
- [ ] Check the TODO in the code
## Documentation

- [API](https://rabbitmq.github.io/rabbitmq-amqp-dotnet-client/api/RabbitMQ.AMQP.Client.html)
3 changes: 3 additions & 0 deletions RabbitMQ.AMQP.Client/IEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public interface IEnvironment
/// <returns>IConnection</returns>
public Task<IConnection> CreateConnectionAsync();

/// <summary>
/// Get all connections.
/// </summary>
public ReadOnlyCollection<IConnection> GetConnections();

/// <summary>
Expand Down
5 changes: 5 additions & 0 deletions RabbitMQ.AMQP.Client/IManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ public PreconditionFailedException(string message) : base(message)
}
}

/// <summary>
/// IManagement interface and is responsible for managing the AMQP resources.
/// RabbitMQ uses AMQP end point: "/management" to manage the resources like queues, exchanges, and bindings.
/// The management endpoint works like an HTTP RPC endpoint where the client sends a request to the server.
/// </summary>
public interface IManagement : ILifeCycle
{
IQueueSpecification Queue();
Expand Down
15 changes: 14 additions & 1 deletion RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace RabbitMQ.AMQP.Client.Impl
{
/// <summary>
/// AmqpConnection is the concrete implementation of <see cref="IConnection"/>
/// It is a wrapper around the AMQP.Net Lite <see cref="Connection"/> class
/// It is a wrapper around the Microsoft AMQP.Net Lite <see cref="Connection"/> class
/// </summary>
public class AmqpConnection : AbstractLifeCycle, IConnection
{
Expand Down Expand Up @@ -50,11 +50,23 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
private readonly TaskCompletionSource<bool> _connectionClosedTcs =
new(TaskCreationOptions.RunContinuationsAsynchronously);


/// <summary>
/// Read-only collection of publishers.
/// See <see cref="IPublisher"/>
/// </summary>
/// <returns> All the active Publishers </returns>
public ReadOnlyCollection<IPublisher> GetPublishers()
{
return Publishers.Values.ToList().AsReadOnly();
}


/// <summary>
/// Read-only collection of consumers.
/// See <see cref="IConsumer"/>
/// </summary>
/// <returns> All the active Consumers </returns>
public ReadOnlyCollection<IConsumer> GetConsumers()
{
return Consumers.Values.ToList().AsReadOnly();
Expand All @@ -77,6 +89,7 @@ await connection.OpenAsync()
return connection;
}


public IManagement Management()
{
return _management;
Expand Down
64 changes: 50 additions & 14 deletions RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
namespace RabbitMQ.AMQP.Client.Impl
{
/// <summary>
/// AmqpManagement implements the IManagement interface and is responsible for managing the AMQP resources.
/// RabbitMQ uses AMQP end point: "/management" to manage the resources like queues, exchanges, and bindings.
/// The management endpoint works like an HTTP RPC endpoint where the client sends a request to the server
/// AmqpManagement implements the IManagement
/// See <see cref="IManagement"/> for more information
/// </summary>
public class AmqpManagement : AbstractLifeCycle, IManagement, IManagementTopology
{
Expand Down Expand Up @@ -55,23 +54,45 @@ internal AmqpManagement(AmqpManagementParameters amqpManagementParameters)
_amqpManagementParameters = amqpManagementParameters;
}

/// <summary>
/// Create a new queue specification
/// See <see cref="IQueueSpecification"/> for more information
/// </summary>
/// <returns> A builder for IQueueSpecification </returns>
public IQueueSpecification Queue()
{
ThrowIfClosed();
return new AmqpQueueSpecification(this);
}

/// <summary>
/// Create a new queue specification with the given name
/// See <see cref="IQueueSpecification"/> for more information
/// </summary>
/// <returns>A builder for IQueueSpecification </returns>
public IQueueSpecification Queue(string name)
{
return Queue().Name(name);
}


/// <summary>
/// Get the queue info for the given queue specification
/// See <see cref="IQueueInfo"/> for more information
/// </summary>
/// <returns> Queue Information</returns>
public Task<IQueueInfo> GetQueueInfoAsync(IQueueSpecification queueSpec,
CancellationToken cancellationToken = default)
{
return GetQueueInfoAsync(queueSpec.QueueName, cancellationToken);
}


/// <summary>
/// Get the queue info for the given queue name
/// See <see cref="IQueueInfo"/> for more information
/// </summary>
/// <returns> Queue Information</returns>
public async Task<IQueueInfo> GetQueueInfoAsync(string queueName,
CancellationToken cancellationToken = default)
{
Expand All @@ -86,6 +107,7 @@ public async Task<IQueueInfo> GetQueueInfoAsync(string queueName,
return new DefaultQueueInfo((Map)response.Body);
}


internal IQueueSpecification Queue(QueueSpec spec)
{
return Queue().Name(spec.QueueName)
Expand All @@ -94,12 +116,25 @@ internal IQueueSpecification Queue(QueueSpec spec)
.Arguments(spec.QueueArguments);
}


/// <summary>
/// Create a new AMQPExchange specification
/// See <see cref="IExchangeSpecification"/> for more information
/// </summary>
/// <returns>A builder for IExchangeSpecification</returns>
public IExchangeSpecification Exchange()
{
ThrowIfClosed();
return new AmqpExchangeSpecification(this);
}


/// <summary>
///
/// Create a new AMQPExchange specification with the given name
/// See <see cref="IExchangeSpecification"/> for more information
/// </summary>
/// <returns> A builder for IExchangeSpecification</returns>
public IExchangeSpecification Exchange(string name)
{
return Exchange().Name(name);
Expand Down Expand Up @@ -133,6 +168,9 @@ ITopologyListener IManagementTopology.TopologyListener()
return _amqpManagementParameters.TopologyListener();
}

/// <summary>
/// Open the management session to RabbitMQ
/// </summary>
public override async Task OpenAsync()
{
if (State == State.Open)
Expand Down Expand Up @@ -195,7 +233,8 @@ private async Task ProcessResponses()
// this is not a problem, it is just a timeout.
// the timeout is set to 60 seconds.
// For the moment I'd trace it at some point we can remove it
Trace.WriteLine(TraceLevel.Verbose, $"Management:Timeout {timeout.Seconds} s.. waiting for message.");
Trace.WriteLine(TraceLevel.Verbose,
$"Management:Timeout {timeout.Seconds} s.. waiting for message.");
continue;
}

Expand Down Expand Up @@ -228,9 +267,11 @@ private async Task EnsureReceiverLinkAsync()
RcvSettleMode = ReceiverSettleMode.First,
Properties = new Fields { { new Symbol("paired"), true } },
LinkName = LinkPairName,
Source = new Source() { Address = ManagementNodeAddress, ExpiryPolicy = new Symbol("LINK_DETACH"), },
Source =
new Source() { Address = ManagementNodeAddress, ExpiryPolicy = new Symbol("LINK_DETACH"), },
Handle = 1,
Target = new Target() { Address = ManagementNodeAddress, ExpiryPolicy = new Symbol("SESSION_END"), },
Target =
new Target() { Address = ManagementNodeAddress, ExpiryPolicy = new Symbol("SESSION_END"), },
};

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand Down Expand Up @@ -334,13 +375,7 @@ internal Task<Message> RequestAsync(string id, object? body, string path, string
{
var message = new Message(body)
{
Properties = new Properties
{
MessageId = id,
To = path,
Subject = method,
ReplyTo = ReplyTo
}
Properties = new Properties { MessageId = id, To = path, Subject = method, ReplyTo = ReplyTo }
};

return RequestAsync(message, expectedResponseCodes, timeout, cancellationToken);
Expand Down Expand Up @@ -462,7 +497,8 @@ protected virtual Task InternalSendAsync(Message message, TimeSpan timeout)
if (_senderLink is null)
{
// TODO create "internal bug" exception type?
throw new InvalidOperationException("_senderLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
throw new InvalidOperationException(
"_senderLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
}

return _senderLink.SendAsync(message, timeout);
Expand Down
Loading