Skip to content

Commit 290bdf2

Browse files
stebetlukebakken
authored andcommitted
Adding fully asynchronous versions of connect and publish.
1 parent e909e1f commit 290bdf2

16 files changed

+325
-81
lines changed

projects/RabbitMQ.Client/client/api/IChannel.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,26 @@ void BasicPublish<TProperties>(string exchange, string routingKey, in TPropertie
208208
/// </remarks>
209209
void BasicPublish<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
210210
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
211+
/// <summary>
212+
/// Asynchronously publishes a message.
213+
/// </summary>
214+
/// <remarks>
215+
/// <para>
216+
/// Routing key must be shorter than 255 bytes.
217+
/// </para>
218+
/// </remarks>
219+
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
220+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
221+
/// <summary>
222+
/// Asynchronously publishes a message.
223+
/// </summary>
224+
/// <remarks>
225+
/// <para>
226+
/// Routing key must be shorter than 255 bytes.
227+
/// </para>
228+
/// </remarks>
229+
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
230+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
211231
#nullable disable
212232

213233
/// <summary>

projects/RabbitMQ.Client/client/api/IChannelExtensions.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Threading.Tasks;
3435
using RabbitMQ.Client.client.impl;
3536

3637
namespace RabbitMQ.Client
@@ -93,6 +94,12 @@ public static void BasicPublish(this IChannel channel, string exchange, string r
9394

9495
public static void BasicPublish(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
9596
=> channel.BasicPublish(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);
97+
98+
public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
99+
=> channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);
100+
101+
public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
102+
=> channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);
96103
#nullable disable
97104

98105
/// <summary>

projects/RabbitMQ.Client/client/framing/Channel.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System.Collections.Generic;
33+
using System.Threading.Tasks;
3334
using RabbitMQ.Client.client.framing;
3435
using RabbitMQ.Client.Impl;
3536

@@ -108,6 +109,11 @@ public override void _Private_ConnectionOpen(string virtualHost)
108109
{
109110
ChannelSend(new ConnectionOpen(virtualHost));
110111
}
112+
113+
public override ValueTask _Private_ConnectionOpenAsync(string virtualHost)
114+
{
115+
return ModelSendAsync(new ConnectionOpen(virtualHost));
116+
}
111117

112118
public override void _Private_ConnectionSecureOk(byte[] response)
113119
{

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,14 @@ public void BasicPublish<TProperties>(string exchange, string routingKey, in TPr
317317
public void BasicPublish<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
318318
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
319319
=> InnerChannel.BasicPublish(exchange, routingKey, in basicProperties, body, mandatory);
320+
321+
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
322+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
323+
=> InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory);
324+
325+
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
326+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
327+
=> InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory);
320328

321329
public void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)
322330
{

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 89 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,11 @@ namespace RabbitMQ.Client.Impl
4949
internal abstract class ChannelBase : IChannel, IRecoverable
5050
{
5151
///<summary>Only used to kick-start a connection open
52-
///sequence. See <see cref="Connection.Open"/> </summary>
53-
internal BlockingCell<ConnectionStartDetails> m_connectionStartCell;
52+
///sequence. See <see cref="Connection.OpenAsync"/> </summary>
53+
internal TaskCompletionSource<ConnectionStartDetails> m_connectionStartCell;
5454

55+
// AMQP only allows one RPC operation to be active at a time.
56+
private readonly SemaphoreSlim _rpcSemaphore = new SemaphoreSlim(1, 1);
5557
private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
5658
private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true);
5759

@@ -258,13 +260,19 @@ internal void ConnectionOpen(string virtualHost)
258260
k.GetReply(HandshakeContinuationTimeout);
259261
}
260262
}
263+
264+
internal ValueTask ConnectionOpenAsync(string virtualHost)
265+
{
266+
return _Private_ConnectionOpenAsync(virtualHost);
267+
}
261268

262-
internal ConnectionSecureOrTune ConnectionSecureOk(byte[] response)
269+
internal async ValueTask<ConnectionSecureOrTune> ConnectionSecureOkAsync(byte[] response)
263270
{
264-
var k = new ConnectionStartRpcContinuation();
265-
lock (_rpcLock)
271+
var k = new ConnectionSecureOrTuneContinuation();
272+
await _rpcSemaphore.WaitAsync().ConfigureAwait(false);
273+
Enqueue(k);
274+
try
266275
{
267-
Enqueue(k);
268276
try
269277
{
270278
_Private_ConnectionSecureOk(response);
@@ -275,31 +283,40 @@ internal ConnectionSecureOrTune ConnectionSecureOk(byte[] response)
275283
// which is a much more suitable exception before connection
276284
// negotiation finishes
277285
}
278-
k.GetReply(HandshakeContinuationTimeout);
286+
287+
return await k;
288+
}
289+
finally
290+
{
291+
_rpcSemaphore.Release();
279292
}
280-
return k.m_result;
281293
}
282294

283-
internal ConnectionSecureOrTune ConnectionStartOk(IDictionary<string, object> clientProperties, string mechanism, byte[] response, string locale)
295+
internal async ValueTask<ConnectionSecureOrTune> ConnectionStartOkAsync(IDictionary<string, object> clientProperties, string mechanism, byte[] response,
296+
string locale)
284297
{
285-
var k = new ConnectionStartRpcContinuation();
286-
lock (_rpcLock)
298+
var k = new ConnectionSecureOrTuneContinuation();
299+
await _rpcSemaphore.WaitAsync().ConfigureAwait(false);
300+
Enqueue(k);
301+
try
287302
{
288-
Enqueue(k);
289303
try
290304
{
291-
_Private_ConnectionStartOk(clientProperties, mechanism,
292-
response, locale);
305+
_Private_ConnectionStartOk(clientProperties, mechanism, response, locale);
293306
}
294307
catch (AlreadyClosedException)
295308
{
296309
// let continuation throw OperationInterruptedException,
297310
// which is a much more suitable exception before connection
298311
// negotiation finishes
299312
}
300-
k.GetReply(HandshakeContinuationTimeout);
313+
314+
return await k;
315+
}
316+
finally
317+
{
318+
_rpcSemaphore.Release();
301319
}
302-
return k.m_result;
303320
}
304321

305322
protected abstract bool DispatchAsynchronous(in IncomingCommand cmd);
@@ -324,7 +341,7 @@ internal void FinishClose()
324341
Session.Close(reason);
325342
}
326343

327-
m_connectionStartCell?.ContinueWithValue(null);
344+
m_connectionStartCell?.TrySetResult(null);
328345
}
329346

330347
private void HandleCommand(in IncomingCommand cmd)
@@ -384,6 +401,12 @@ protected void ChannelSend<T>(in T method) where T : struct, IOutgoingAmqpMethod
384401
{
385402
Session.Transmit(in method);
386403
}
404+
405+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
406+
protected ValueTask ModelSendAsync<T>(in T method) where T : struct, IOutgoingAmqpMethod
407+
{
408+
return Session.TransmitAsync(in method);
409+
}
387410

388411
[MethodImpl(MethodImplOptions.AggressiveInlining)]
389412
protected void ChannelSend<TMethod, THeader>(in TMethod method, in THeader header, ReadOnlyMemory<byte> body)
@@ -396,6 +419,19 @@ protected void ChannelSend<TMethod, THeader>(in TMethod method, in THeader heade
396419
}
397420
Session.Transmit(in method, in header, body);
398421
}
422+
423+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
424+
protected ValueTask ModelSendAsync<TMethod, THeader>(in TMethod method, in THeader header, ReadOnlyMemory<byte> body)
425+
where TMethod : struct, IOutgoingAmqpMethod
426+
where THeader : IAmqpHeader
427+
{
428+
if (!_flowControlBlock.IsSet)
429+
{
430+
_flowControlBlock.Wait();
431+
}
432+
433+
return Session.TransmitAsync(in method, in header, body);
434+
}
399435

400436
internal void OnCallbackException(CallbackExceptionEventArgs args)
401437
{
@@ -730,13 +766,7 @@ protected void HandleConnectionClose(in IncomingCommand cmd)
730766

731767
protected void HandleConnectionSecure(in IncomingCommand cmd)
732768
{
733-
var challenge = new ConnectionSecure(cmd.MethodBytes.Span)._challenge;
734-
cmd.ReturnMethodBuffer();
735-
var k = (ConnectionStartRpcContinuation)_continuationQueue.Next();
736-
k.m_result = new ConnectionSecureOrTune
737-
{
738-
m_challenge = challenge
739-
};
769+
var k = (ConnectionSecureOrTuneContinuation)_continuationQueue.Next();
740770
k.HandleCommand(IncomingCommand.Empty); // release the continuation.
741771
}
742772

@@ -758,25 +788,14 @@ protected void HandleConnectionStart(in IncomingCommand cmd)
758788
m_mechanisms = method._mechanisms,
759789
m_locales = method._locales
760790
};
761-
m_connectionStartCell.ContinueWithValue(details);
791+
m_connectionStartCell?.SetResult(details);
762792
m_connectionStartCell = null;
763793
}
764794

765795
protected void HandleConnectionTune(in IncomingCommand cmd)
766796
{
767-
var connectionTune = new ConnectionTune(cmd.MethodBytes.Span);
768-
cmd.ReturnMethodBuffer();
769-
var k = (ConnectionStartRpcContinuation)_continuationQueue.Next();
770-
k.m_result = new ConnectionSecureOrTune
771-
{
772-
m_tuneDetails =
773-
{
774-
m_channelMax = connectionTune._channelMax,
775-
m_frameMax = connectionTune._frameMax,
776-
m_heartbeatInSeconds = connectionTune._heartbeat
777-
}
778-
};
779-
k.HandleCommand(IncomingCommand.Empty); // release the continuation.
797+
var k = (ConnectionSecureOrTuneContinuation)_continuationQueue.Next();
798+
k.HandleCommand(cmd); // release the continuation.
780799
}
781800

782801
protected void HandleConnectionUnblocked()
@@ -815,6 +834,8 @@ protected void HandleQueueDeclareOk(in IncomingCommand cmd)
815834

816835
public abstract void _Private_ConnectionOpen(string virtualHost);
817836

837+
public abstract ValueTask _Private_ConnectionOpenAsync(string virtualHost);
838+
818839
public abstract void _Private_ConnectionSecureOk(byte[] response);
819840

820841
public abstract void _Private_ConnectionStartOk(IDictionary<string, object> clientProperties, string mechanism, byte[] response, string locale);
@@ -929,6 +950,36 @@ public void BasicPublish<TProperties>(CachedString exchange, CachedString routin
929950
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
930951
ChannelSend(in cmd, in basicProperties, body);
931952
}
953+
954+
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
955+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
956+
{
957+
if (NextPublishSeqNo > 0)
958+
{
959+
lock (_confirmLock)
960+
{
961+
_pendingDeliveryTags.AddLast(NextPublishSeqNo++);
962+
}
963+
}
964+
965+
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
966+
return ModelSendAsync(in cmd, in basicProperties, body);
967+
}
968+
969+
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
970+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
971+
{
972+
if (NextPublishSeqNo > 0)
973+
{
974+
lock (_confirmLock)
975+
{
976+
_pendingDeliveryTags.AddLast(NextPublishSeqNo++);
977+
}
978+
}
979+
980+
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
981+
return ModelSendAsync(in cmd, in basicProperties, body);
982+
}
932983

933984
public void UpdateSecret(string newSecret, string reason)
934985
{

projects/RabbitMQ.Client/client/impl/Connection.Commands.cs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
using System;
3333
using System.IO;
3434
using System.Text;
35+
using System.Threading.Tasks;
3536
using RabbitMQ.Client.Events;
3637
using RabbitMQ.Client.Exceptions;
3738
using RabbitMQ.Client.Impl;
@@ -70,22 +71,21 @@ internal void HandleConnectionUnblocked()
7071
}
7172
}
7273

73-
private void Open()
74+
private async ValueTask OpenAsync()
7475
{
7576
RabbitMqClientEventSource.Log.ConnectionOpened();
76-
StartAndTune();
77-
_channel0.ConnectionOpen(_config.VirtualHost);
77+
await StartAndTune().ConfigureAwait(false);
78+
await _channel0.ConnectionOpenAsync(_config.VirtualHost);
7879
}
7980

80-
private void StartAndTune()
81+
private async ValueTask StartAndTune()
8182
{
82-
var connectionStartCell = new BlockingCell<ConnectionStartDetails>();
83+
var connectionStartCell = new TaskCompletionSource<ConnectionStartDetails>(TaskCreationOptions.RunContinuationsAsynchronously);
8384
_channel0.m_connectionStartCell = connectionStartCell;
8485
_channel0.HandshakeContinuationTimeout = _config.HandshakeContinuationTimeout;
8586
_frameHandler.ReadTimeout = _config.HandshakeContinuationTimeout;
86-
_frameHandler.SendHeader();
87-
88-
ConnectionStartDetails connectionStart = connectionStartCell.WaitForValue();
87+
await _frameHandler.SendHeader().ConfigureAwait(false);
88+
ConnectionStartDetails connectionStart = await connectionStartCell.Task.ConfigureAwait(false);
8989

9090
if (connectionStart is null)
9191
{
@@ -117,14 +117,14 @@ private void StartAndTune()
117117
ConnectionSecureOrTune res;
118118
if (challenge is null)
119119
{
120-
res = _channel0.ConnectionStartOk(ClientProperties,
120+
res = await _channel0.ConnectionStartOkAsync(ClientProperties,
121121
mechanismFactory.Name,
122122
response,
123-
"en_US");
123+
"en_US").ConfigureAwait(false);
124124
}
125125
else
126126
{
127-
res = _channel0.ConnectionSecureOk(response);
127+
res = await _channel0.ConnectionSecureOkAsync(response).ConfigureAwait(false);
128128
}
129129

130130
if (res.m_challenge is null)

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public Connection(ConnectionConfig config, IFrameHandler frameHandler)
8282
_mainLoopTask = Task.Run(MainLoop);
8383
try
8484
{
85-
Open();
85+
OpenAsync().AsTask().GetAwaiter().GetResult();
8686
}
8787
catch
8888
{
@@ -402,7 +402,16 @@ internal void OnCallbackException(CallbackExceptionEventArgs args)
402402

403403
internal void Write(ReadOnlyMemory<byte> memory)
404404
{
405-
_frameHandler.Write(memory);
405+
var task = _frameHandler.WriteAsync(memory);
406+
if (!task.IsCompletedSuccessfully)
407+
{
408+
task.AsTask().GetAwaiter().GetResult();
409+
}
410+
}
411+
412+
internal ValueTask WriteAsync(ReadOnlyMemory<byte> memory)
413+
{
414+
return _frameHandler.WriteAsync(memory);
406415
}
407416

408417
public void Dispose()

projects/RabbitMQ.Client/client/impl/IFrameHandler.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ internal interface IFrameHandler
6565
///</summary>
6666
bool TryReadFrame(out InboundFrame frame);
6767

68-
void SendHeader();
68+
ValueTask SendHeader();
6969

70-
void Write(ReadOnlyMemory<byte> memory);
70+
ValueTask WriteAsync(ReadOnlyMemory<byte> memory);
7171
}
7272
}

0 commit comments

Comments
 (0)