Skip to content

Implement RPC server and client #76

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions RabbitMQ.AMQP.Client/IAddressBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,10 @@ public interface IAddressBuilder<out T>

T Key(string key);
}

public interface IMessageAddressBuilder : IAddressBuilder<IMessageAddressBuilder>
{

IMessage Build();
}
}
4 changes: 4 additions & 0 deletions RabbitMQ.AMQP.Client/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public interface IConnection : ILifeCycle

IConsumerBuilder ConsumerBuilder();

IRpcServerBuilder RpcServerBuilder();

IRpcClientBuilder RpcClientBuilder();

public ReadOnlyCollection<IPublisher> GetPublishers();

public ReadOnlyCollection<IConsumer> GetConsumers();
Expand Down
14 changes: 12 additions & 2 deletions RabbitMQ.AMQP.Client/IMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,30 @@ public interface IMessage
public object Body();

// properties
string MessageId();
IMessage MessageId(string id);

string CorrelationId();
IMessage MessageId(object id);
object MessageId();

object CorrelationId();
IMessage CorrelationId(string id);
IMessage CorrelationId(object id);

string ReplyTo();
IMessage ReplyTo(string id);

string To();
IMessage To(string id);

string Subject();
IMessage Subject(string subject);
IMessage GroupId(string groupId);
string GroupId();

public IMessage Annotation(string key, object value);

public object Annotation(string key);

IMessageAddressBuilder ToAddress();
}
}
30 changes: 30 additions & 0 deletions RabbitMQ.AMQP.Client/IRpcClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitMQ.AMQP.Client
{

public interface IRpcClientAddressBuilder : IAddressBuilder<IRpcClientAddressBuilder>
{
IRpcClientBuilder RpcClient();
}

public interface IRpcClientBuilder
{
IRpcClientAddressBuilder RequestAddress();
IRpcClientBuilder ReplyToQueue(string replyToQueue);
IRpcClientBuilder CorrelationIdExtractor(Func<IMessage, object>? correlationIdExtractor);

IRpcClientBuilder RequestPostProcessor(Func<IMessage, object, IMessage>? requestPostProcessor);

IRpcClientBuilder CorrelationIdSupplier(Func<object>? correlationIdSupplier);
IRpcClientBuilder Timeout(TimeSpan timeout);
Task<IRpcClient> BuildAsync();
}

public interface IRpcClient : ILifeCycle
{
Task<IMessage> PublishAsync(IMessage message, CancellationToken cancellationToken = default);
}
}
29 changes: 29 additions & 0 deletions RabbitMQ.AMQP.Client/IRpcServer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System;
using System.Threading.Tasks;

namespace RabbitMQ.AMQP.Client
{
public delegate Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request);

public interface IRpcServerBuilder
{
IRpcServerBuilder RequestQueue(string requestQueue);
IRpcServerBuilder RequestQueue(IQueueSpecification requestQueue);
IRpcServerBuilder CorrelationIdExtractor(Func<IMessage, object>? correlationIdExtractor);

IRpcServerBuilder ReplyPostProcessor(Func<IMessage, object, IMessage>? replyPostProcessor);

IRpcServerBuilder Handler(RpcHandler handler);

Task<IRpcServer> BuildAsync();
}

public interface IRpcServer : ILifeCycle
{

public interface IContext
{
IMessage Message(object body);
}
}
}
137 changes: 137 additions & 0 deletions RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
using System;

namespace RabbitMQ.AMQP.Client.Impl
{
public abstract class DefaultAddressBuilder<T> : IAddressBuilder<T>
{
private string? _exchange = null;
private string? _queue = null;
private string? _key = null;
protected T? _owner = default;

public T Exchange(IExchangeSpecification exchangeSpec)
{
return Exchange(exchangeSpec.ExchangeName);
}

public T Exchange(string? exchangeName)
{
_exchange = exchangeName;
if (_owner == null)
{
throw new InvalidOperationException("Owner is null");
}

return _owner;
}

public T Queue(IQueueSpecification queueSpec) => Queue(queueSpec.QueueName);

public T Queue(string? queueName)
{
_queue = queueName;
if (_owner == null)
{
throw new InvalidOperationException("Owner is null");
}

return _owner;
}

public T Key(string? key)
{
_key = key;
if (_owner == null)
{
throw new InvalidOperationException("Owner is null");
}

return _owner;
}

public string Address()
{
if (_exchange == null && _queue == null)
{
throw new InvalidAddressException("Exchange or Queue must be set");
}

if (_exchange != null && _queue != null)
{
throw new InvalidAddressException("Exchange and Queue cannot be set together");
}

if (_exchange != null)
{
if (string.IsNullOrEmpty(_exchange))
{
throw new InvalidAddressException("Exchange must be set");
}

if (_key != null && false == string.IsNullOrEmpty(_key))
{
return $"/{Consts.Exchanges}/{Utils.EncodePathSegment(_exchange)}/{Utils.EncodePathSegment(_key)}";
}

return $"/{Consts.Exchanges}/{Utils.EncodePathSegment(_exchange)}";
}

if (_queue == null)
{
return "";
}

if (string.IsNullOrEmpty(_queue))
{
throw new InvalidAddressException("Queue must be set");
}

return $"/{Consts.Queues}/{Utils.EncodePathSegment(_queue)}";
}
}

public class AddressBuilder : DefaultAddressBuilder<AddressBuilder>
{
public AddressBuilder()
{
_owner = this;
}
}

public static class AddressBuilderHelper
{
public static AddressBuilder AddressBuilder() => new();
}

public class MessageAddressBuilder : DefaultAddressBuilder<IMessageAddressBuilder>, IMessageAddressBuilder
{
private readonly IMessage _message;

public MessageAddressBuilder(IMessage message)
{
_message = message;
_owner = this;
}

public IMessage Build()
{
_message.To(Address());
return _message;
}
}

public class RpcClientAddressBuilder : DefaultAddressBuilder<IRpcClientAddressBuilder>, IRpcClientAddressBuilder
{
readonly AmqpRpcClientBuilder _builder;
public RpcClientAddressBuilder(AmqpRpcClientBuilder builder)
{
_builder = builder;
_owner = this;
}

public IRpcClientBuilder RpcClient()
{
return _builder;
}
}
}
11 changes: 11 additions & 0 deletions RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
private readonly TaskCompletionSource<bool> _connectionClosedTcs =
new(TaskCreationOptions.RunContinuationsAsynchronously);

public IRpcServerBuilder RpcServerBuilder()
{
return new AmqpRpcServerBuilder(this);
}

public IRpcClientBuilder RpcClientBuilder()
{

return new AmqpRpcClientBuilder(this);
}

/// <summary>
/// Read-only collection of publishers.
/// See <see cref="IPublisher"/>
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public IConsumerBuilder Queue(IQueueSpecification queueSpec)

public IConsumerBuilder Queue(string queueName)
{
string address = new AddressBuilder().Queue(queueName).Address();
string address = AddressBuilderHelper.AddressBuilder().Queue(queueName).Address();
_configuration.Address = address;
return this;
}
Expand Down
55 changes: 50 additions & 5 deletions RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,27 @@ public object Body()
return NativeMessage.Body;
}

public string MessageId()
public object MessageId()
{
ThrowIfPropertiesNotSet();
return NativeMessage.Properties.MessageId;
return NativeMessage.Properties.GetMessageId();
}

public IMessage MessageId(string id)
{
EnsureProperties();
NativeMessage.Properties.MessageId = id;
NativeMessage.Properties.SetMessageId(id);
return this;
}

public string CorrelationId()
public IMessage MessageId(object id)
{
EnsureProperties();
NativeMessage.Properties.SetMessageId(id);
return this;
}

public object CorrelationId()
{
ThrowIfPropertiesNotSet();
return NativeMessage.Properties.CorrelationId;
Expand All @@ -86,7 +93,14 @@ public string CorrelationId()
public IMessage CorrelationId(string id)
{
EnsureProperties();
NativeMessage.Properties.CorrelationId = id;
NativeMessage.Properties.SetCorrelationId(id);
return this;
}

public IMessage CorrelationId(object id)
{
EnsureProperties();
NativeMessage.Properties.SetCorrelationId(id);
return this;
}

Expand All @@ -103,6 +117,19 @@ public IMessage ReplyTo(string id)
return this;
}

public string To()
{
ThrowIfPropertiesNotSet();
return NativeMessage.Properties.To;
}

public IMessage To(string id)
{
EnsureProperties();
NativeMessage.Properties.To = id;
return this;
}

public string Subject()
{
ThrowIfPropertiesNotSet();
Expand All @@ -116,6 +143,19 @@ public IMessage Subject(string subject)
return this;
}

public IMessage GroupId(string groupId)
{
EnsureProperties();
NativeMessage.Properties.GroupId = groupId;
return this;
}

public string GroupId()
{
ThrowIfPropertiesNotSet();
return NativeMessage.Properties.GroupId;
}

// Annotations

public IMessage Annotation(string key, object value)
Expand All @@ -130,5 +170,10 @@ public object Annotation(string key)
ThrowIfAnnotationsNotSet();
return NativeMessage.MessageAnnotations[new Symbol(key)];
}

public IMessageAddressBuilder ToAddress()
{
return new MessageAddressBuilder(this);
}
}
}
4 changes: 2 additions & 2 deletions RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ public class AmqpPublisher : AbstractReconnectLifeCycle, IPublisher
{
private readonly AmqpConnection _connection;
private readonly TimeSpan _timeout;
private readonly string _address;
private readonly string? _address;
private readonly Guid _id = Guid.NewGuid();

private SenderLink? _senderLink = null;

public AmqpPublisher(AmqpConnection connection, string address, TimeSpan timeout)
public AmqpPublisher(AmqpConnection connection, string? address, TimeSpan timeout)
{
_connection = connection;
_address = address;
Expand Down
Loading
Loading