Skip to content

Commit eaa76c8

Browse files
committed
Add ExchangeBindAsync
1 parent 5a93720 commit eaa76c8

File tree

9 files changed

+160
-163
lines changed

9 files changed

+160
-163
lines changed

projects/OAuth2Test/TestOAuth2.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ private async Task Publish(IChannel publisher)
148148
private async ValueTask<IChannel> declareConsumer()
149149
{
150150
IChannel subscriber = _connection.CreateChannel();
151-
await subscriber.QueueDeclareAsync("testqueue", true, false, false, arguments: null);
151+
await subscriber.QueueDeclareAsync(queue: "testqueue", passive: false, true, false, false, arguments: null);
152152
subscriber.QueueBind("testqueue", Exchange, "hello");
153153
return subscriber;
154154
}

projects/RabbitMQ.Client/client/api/IChannel.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,6 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
276276
/// </remarks>
277277
void ExchangeBind(string destination, string source, string routingKey, IDictionary<string, object> arguments);
278278

279-
/*
280-
* TODO LRB rabbitmq/rabbitmq-dotnet-client#1347
281279
/// <summary>
282280
/// Asynchronously binds an exchange to an exchange.
283281
/// </summary>
@@ -287,7 +285,6 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
287285
/// </para>
288286
/// </remarks>
289287
ValueTask ExchangeBindAsync(string destination, string source, string routingKey, IDictionary<string, object> arguments);
290-
*/
291288

292289
/// <summary>
293290
/// Like ExchangeBind but sets nowait to true.
@@ -308,10 +305,10 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
308305

309306
/// <summary>Asynchronously declare an exchange.</summary>
310307
/// <remarks>
311-
/// The exchange is declared non-passive and non-internal.
312-
/// The "nowait" option is not exercised.
308+
/// The exchange is declared non-internal.
309+
/// The "nowait" option is not used.
313310
/// </remarks>
314-
ValueTask ExchangeDeclareAsync(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments);
311+
ValueTask ExchangeDeclareAsync(string exchange, string type, bool passive, bool durable, bool autoDelete, IDictionary<string, object> arguments);
315312

316313
/// <summary>
317314
/// Same as ExchangeDeclare but sets nowait to true and returns void (as there
@@ -403,11 +400,12 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
403400
/// Asynchronously declares a queue. See the <a href="https://www.rabbitmq.com/queues.html">Queues guide</a> to learn more.
404401
/// </summary>
405402
/// <param name="queue">The name of the queue. Pass an empty string to make the server generate a name.</param>
403+
/// <param name="passive">Set to <code>true</code> to passively declare the queue (i.e. check for its existence)</param>
406404
/// <param name="durable">Should this queue will survive a broker restart?</param>
407405
/// <param name="exclusive">Should this queue use be limited to its declaring connection? Such a queue will be deleted when its declaring connection closes.</param>
408406
/// <param name="autoDelete">Should this queue be auto-deleted when its last consumer (if any) unsubscribes?</param>
409407
/// <param name="arguments">Optional; additional queue arguments, e.g. "x-queue-type"</param>
410-
ValueTask<QueueDeclareOk> QueueDeclareAsync(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments);
408+
ValueTask<QueueDeclareOk> QueueDeclareAsync(string queue, bool passive, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments);
411409

412410
/// <summary>
413411
/// Declares a queue. See the <a href="https://www.rabbitmq.com/queues.html">Queues guide</a> to learn more.

projects/RabbitMQ.Client/client/api/QueueDeleteOk.cs

Lines changed: 0 additions & 55 deletions
This file was deleted.

projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public AsyncRpcContinuation(TimeSpan continuationTimeout)
5454
{
5555
if (_tcs.TrySetCanceled())
5656
{
57-
// TODO LRB #1347
57+
// TODO LRB rabbitmq/rabbitmq-dotnet-client#1347
5858
// Cancellation was successful, does this mean we should set a TimeoutException
5959
// in the same manner as BlockingCell?
6060
}
@@ -107,7 +107,7 @@ public override void HandleCommand(in IncomingCommand cmd)
107107
else if (cmd.CommandId == ProtocolCommandId.ConnectionTune)
108108
{
109109
var tune = new ConnectionTune(cmd.MethodBytes.Span);
110-
// TODO LRB #1347
110+
// TODO LRB rabbitmq/rabbitmq-dotnet-client#1347
111111
// What to do if setting a result fails?
112112
_tcs.TrySetResult(new ConnectionSecureOrTune
113113
{
@@ -155,6 +155,13 @@ public override void HandleCommand(in IncomingCommand cmd)
155155
}
156156
}
157157

158+
internal class ExchangeBindAsyncRpcContinuation : SimpleAsyncRpcContinuation
159+
{
160+
public ExchangeBindAsyncRpcContinuation(TimeSpan continuationTimeout) : base(ProtocolCommandId.ExchangeBindOk, continuationTimeout)
161+
{
162+
}
163+
}
164+
158165
internal class ExchangeDeclareAsyncRpcContinuation : SimpleAsyncRpcContinuation
159166
{
160167
public ExchangeDeclareAsyncRpcContinuation(TimeSpan continuationTimeout) : base(ProtocolCommandId.ExchangeDeclareOk, continuationTimeout)
@@ -197,7 +204,7 @@ public override void HandleCommand(in IncomingCommand cmd)
197204
}
198205
}
199206

200-
internal class QueueDeleteAsyncRpcContinuation : AsyncRpcContinuation<QueueDeleteOk>
207+
internal class QueueDeleteAsyncRpcContinuation : AsyncRpcContinuation<uint>
201208
{
202209
public QueueDeleteAsyncRpcContinuation(TimeSpan continuationTimeout) : base(continuationTimeout)
203210
{
@@ -210,8 +217,7 @@ public override void HandleCommand(in IncomingCommand cmd)
210217
var method = new Client.Framing.Impl.QueueDeleteOk(cmd.MethodBytes.Span);
211218
if (cmd.CommandId == ProtocolCommandId.QueueDeleteOk)
212219
{
213-
var result = new QueueDeleteOk(method._messageCount);
214-
_tcs.TrySetResult(result);
220+
_tcs.TrySetResult(method._messageCount);
215221
}
216222
else
217223
{

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,13 @@ public void ExchangeBind(string destination, string source, string routingKey, I
362362
_innerChannel.ExchangeBind(destination, source, routingKey, arguments);
363363
}
364364

365+
public async ValueTask ExchangeBindAsync(string destination, string source, string routingKey, IDictionary<string, object> arguments)
366+
{
367+
ThrowIfDisposed();
368+
await _innerChannel.ExchangeBindAsync(destination, source, routingKey, arguments);
369+
_connection.RecordBinding(new RecordedBinding(false, destination, source, routingKey, arguments));
370+
}
371+
365372
public void ExchangeBindNoWait(string destination, string source, string routingKey, IDictionary<string, object> arguments)
366373
=> InnerChannel.ExchangeBindNoWait(destination, source, routingKey, arguments);
367374

@@ -372,11 +379,14 @@ public void ExchangeDeclare(string exchange, string type, bool durable, bool aut
372379
_connection.RecordExchange(new RecordedExchange(exchange, type, durable, autoDelete, arguments));
373380
}
374381

375-
public async ValueTask ExchangeDeclareAsync(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments)
382+
public async ValueTask ExchangeDeclareAsync(string exchange, string type, bool passive, bool durable, bool autoDelete, IDictionary<string, object> arguments)
376383
{
377384
ThrowIfDisposed();
378-
await _innerChannel.ExchangeDeclareAsync(exchange, type, durable, autoDelete, arguments);
379-
_connection.RecordExchange(new RecordedExchange(exchange, type, durable, autoDelete, arguments));
385+
await _innerChannel.ExchangeDeclareAsync(exchange, type, passive, durable, autoDelete, arguments);
386+
if (false == passive)
387+
{
388+
_connection.RecordExchange(new RecordedExchange(exchange, type, durable, autoDelete, arguments));
389+
}
380390
}
381391

382392
public void ExchangeDeclareNoWait(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments)
@@ -443,11 +453,14 @@ public void QueueDeclareNoWait(string queue, bool durable, bool exclusive, bool
443453
_connection.RecordQueue(new RecordedQueue(queue, queue.Length == 0, durable, exclusive, autoDelete, arguments));
444454
}
445455

446-
public async ValueTask<QueueDeclareOk> QueueDeclareAsync(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments)
456+
public async ValueTask<QueueDeclareOk> QueueDeclareAsync(string queue, bool passive, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments)
447457
{
448458
ThrowIfDisposed();
449-
QueueDeclareOk result = await _innerChannel.QueueDeclareAsync(queue, durable, exclusive, autoDelete, arguments);
450-
_connection.RecordQueue(new RecordedQueue(result.QueueName, queue.Length == 0, durable, exclusive, autoDelete, arguments));
459+
QueueDeclareOk result = await _innerChannel.QueueDeclareAsync(queue, passive, durable, exclusive, autoDelete, arguments);
460+
if (false == passive)
461+
{
462+
_connection.RecordQueue(new RecordedQueue(result.QueueName, queue.Length == 0, durable, exclusive, autoDelete, arguments));
463+
}
451464
return result;
452465
}
453466

0 commit comments

Comments
 (0)