Skip to content

Commit e4611e6

Browse files
committed
Expose nowait version of BasicCancel
Part of a fix to #341
1 parent 313e1e8 commit e4611e6

File tree

5 files changed

+61
-3
lines changed

5 files changed

+61
-3
lines changed

projects/client/RabbitMQ.Client/src/client/api/IModel.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,12 @@ public interface IModel : IDisposable
193193
[AmqpMethodDoNotImplement(null)]
194194
void BasicCancel(string consumerTag);
195195

196+
/// <summary>
197+
/// Same as BasicCancel but sets nowait to true and returns void (as there
198+
/// will be no response from the server).
199+
/// </summary>
200+
void BasicCancelNoWait(string consumerTag);
201+
196202
/// <summary>Start a Basic content-class consumer.</summary>
197203
[AmqpMethodDoNotImplement(null)]
198204
string BasicConsume(

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringModel.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,16 @@ public void BasicCancel(string consumerTag)
758758
_delegate.BasicCancel(consumerTag);
759759
}
760760

761+
public void BasicCancelNoWait(string consumerTag)
762+
{
763+
RecordedConsumer cons = _connection.DeleteRecordedConsumer(consumerTag);
764+
if (cons != null)
765+
{
766+
_connection.MaybeDeleteRecordedAutoDeleteQueue(cons.Queue);
767+
}
768+
_delegate.BasicCancelNoWait(consumerTag);
769+
}
770+
761771
public string BasicConsume(
762772
string queue,
763773
bool autoAck,

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1013,6 +1013,7 @@ public void BasicCancel(string consumerTag)
10131013
_Private_BasicCancel(consumerTag, false);
10141014
k.GetReply(ContinuationTimeout);
10151015
}
1016+
10161017
lock (m_consumers)
10171018
{
10181019
m_consumers.Remove(consumerTag);
@@ -1021,6 +1022,16 @@ public void BasicCancel(string consumerTag)
10211022
ModelShutdown -= k.m_consumer.HandleModelShutdown;
10221023
}
10231024

1025+
public void BasicCancelNoWait(string consumerTag)
1026+
{
1027+
_Private_BasicCancel(consumerTag, true);
1028+
1029+
lock (m_consumers)
1030+
{
1031+
m_consumers.Remove(consumerTag);
1032+
}
1033+
}
1034+
10241035
public string BasicConsume(string queue,
10251036
bool autoAck,
10261037
string consumerTag,
@@ -1224,7 +1235,7 @@ public void ExchangeDelete(string exchange,
12241235
public void ExchangeDeleteNoWait(string exchange,
12251236
bool ifUnused)
12261237
{
1227-
_Private_ExchangeDelete(exchange, ifUnused, false);
1238+
_Private_ExchangeDelete(exchange, ifUnused, true);
12281239
}
12291240

12301241
public void ExchangeUnbind(string destination,

projects/client/Unit/src/unit/APIApproval.Approve.approved.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,7 @@ namespace RabbitMQ.Client
369369
void Abort(ushort replyCode, string replyText);
370370
void BasicAck(ulong deliveryTag, bool multiple);
371371
void BasicCancel(string consumerTag);
372+
void BasicCancelNoWait(string consumerTag);
372373
string BasicConsume(string queue, bool autoAck, string consumerTag, bool noLocal, bool exclusive, System.Collections.Generic.IDictionary<string, object> arguments, RabbitMQ.Client.IBasicConsumer consumer);
373374
RabbitMQ.Client.BasicGetResult BasicGet(string queue, bool autoAck);
374375
void BasicNack(ulong deliveryTag, bool multiple, bool requeue);

projects/client/Unit/src/unit/TestAsyncConsumer.cs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ namespace RabbitMQ.Client.Unit
5151
[TestFixture]
5252
public class TestAsyncConsumer
5353
{
54-
5554
[Test]
5655
public void TestBasicRoundtrip()
5756
{
@@ -81,6 +80,37 @@ public void TestBasicRoundtrip()
8180
Assert.IsFalse(waitResFalse);
8281
}
8382
}
83+
[Test]
84+
public void TestBasicRoundtripNoWait()
85+
{
86+
var cf = new ConnectionFactory{ DispatchConsumersAsync = true };
87+
using (IConnection c = cf.CreateConnection())
88+
{
89+
using (IModel m = c.CreateModel())
90+
{
91+
QueueDeclareOk q = m.QueueDeclare();
92+
IBasicProperties bp = m.CreateBasicProperties();
93+
byte[] body = System.Text.Encoding.UTF8.GetBytes("async-hi");
94+
m.BasicPublish("", q.QueueName, bp, body);
95+
var consumer = new AsyncEventingBasicConsumer(m);
96+
var are = new AutoResetEvent(false);
97+
consumer.Received += async (o, a) =>
98+
{
99+
are.Set();
100+
await Task.Yield();
101+
};
102+
string tag = m.BasicConsume(q.QueueName, true, consumer);
103+
// ensure we get a delivery
104+
bool waitRes = are.WaitOne(2000);
105+
Assert.IsTrue(waitRes);
106+
// unsubscribe and ensure no further deliveries
107+
m.BasicCancelNoWait(tag);
108+
m.BasicPublish("", q.QueueName, bp, body);
109+
bool waitResFalse = are.WaitOne(2000);
110+
Assert.IsFalse(waitResFalse);
111+
}
112+
}
113+
}
84114

85115
[Test]
86116
public void NonAsyncConsumerShouldThrowInvalidOperationException()
@@ -98,4 +128,4 @@ public void NonAsyncConsumerShouldThrowInvalidOperationException()
98128
}
99129
}
100130
}
101-
}
131+
}

0 commit comments

Comments
 (0)