Skip to content

Commit ef8dc73

Browse files
committed
Add support for pausing and unpausing consumers
Fixes #37 * Begin by copying test from Java AMQP client. * Continue fleshing out `ConsumerPauseTests` * Use async/await to ensure management links are started up in order. * Add method to get `IQueueInfo` * Rename some async methods to end with `Async` suffix * Finish implementation of `GetQueueInfoAsync` * Bump up retries * Do not `stopOnFail` when running tests * More work on `PauseShouldStopMessageArrivalUnpauseShouldResumeIt` test * Move HTTP API code that uses `EasyNetQ.Management.Client` to its own class * Make publisher and consumer builders OpenAsync * Implement `IConsumer.Pause` * Add `Pause` to test and it appears to work correctly, huzzah * Renaming methods that return `Task` or `Task<T>` to end with `Async` suffic * Re-ordering methods in `AmqpConnection` so that they are in public, internal, private order * Enable semaphores in `AmqpConnection` and add `IDisposable` * Implement `Unpause` * Finish `PauseShouldStopMessageArrivalUnpauseShouldResumeIt` test
1 parent 4be8beb commit ef8dc73

36 files changed

+1103
-724
lines changed

RabbitMQ.AMQP.Client/IConsumerBuilder.cs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
namespace RabbitMQ.AMQP.Client;
22

3-
43
public enum StreamOffsetSpecification
54
{
65
First,
@@ -18,7 +17,7 @@ public interface IConsumerBuilder
1817

1918
IStreamOptions Stream();
2019

21-
IConsumer Build();
20+
Task<IConsumer> BuildAsync(CancellationToken cancellationToken = default);
2221

2322
public interface IStreamOptions
2423
{
@@ -36,6 +35,4 @@ public interface IStreamOptions
3635

3736
IConsumerBuilder Builder();
3837
}
39-
40-
4138
}

RabbitMQ.AMQP.Client/IEntities.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ public interface IEntityInfo
1010
/// <typeparam name="T"></typeparam>
1111
public interface IEntityInfoDeclaration<T> where T : IEntityInfo
1212
{
13+
// TODO this really should be named DeclareAsync
1314
Task<T> Declare();
1415
}
1516

@@ -18,6 +19,7 @@ public interface IEntityInfoDeclaration<T> where T : IEntityInfo
1819
/// </summary>
1920
public interface IEntityDeclaration
2021
{
22+
// TODO this really should be named DeclareAsync
2123
Task Declare();
2224
}
2325

@@ -49,7 +51,6 @@ public interface IQueueSpecification : IEntityInfoDeclaration<IQueueInfo>
4951
IQueueSpecification Type(QueueType type);
5052
public QueueType Type();
5153

52-
5354
IQueueSpecification DeadLetterExchange(string dlx);
5455

5556
IQueueSpecification DeadLetterRoutingKey(string dlrk);
@@ -60,7 +61,6 @@ public interface IQueueSpecification : IEntityInfoDeclaration<IQueueInfo>
6061

6162
IQueueSpecification SingleActiveConsumer(bool singleActiveConsumer);
6263

63-
6464
IQueueSpecification Expires(TimeSpan expiration);
6565

6666
IStreamSpecification Stream();
@@ -69,10 +69,8 @@ public interface IQueueSpecification : IEntityInfoDeclaration<IQueueInfo>
6969

7070
IClassicQueueSpecification Classic();
7171

72-
7372
IQueueSpecification MaxLength(long maxLength);
7473

75-
7674
IQueueSpecification MessageTtl(TimeSpan ttl);
7775
}
7876

@@ -135,6 +133,7 @@ public interface IClassicQueueSpecification
135133
public interface IQueueDeletion
136134
{
137135
// TODO consider returning a QueueStatus object with some info after deletion
136+
// TODO should be named DeleteAsync and take CancellationToken
138137
Task<IEntityInfo> Delete(string name);
139138
}
140139

@@ -154,6 +153,7 @@ public interface IExchangeSpecification : IEntityDeclaration
154153
public interface IExchangeDeletion
155154
{
156155
// TODO consider returning a ExchangeStatus object with some info after deletion
156+
// TODO should be named DeleteAsync and take CancellationToken
157157
Task Delete(string name);
158158
}
159159

RabbitMQ.AMQP.Client/IEntitiesInfo.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ public enum QueueType
99

1010
public interface IQueueInfo : IEntityInfo
1111
{
12+
// TODO these should be properties, not methods
1213
string Name();
1314

1415
bool Durable();
@@ -19,10 +20,12 @@ public interface IQueueInfo : IEntityInfo
1920

2021
QueueType Type();
2122

23+
// TODO IDictionary
2224
Dictionary<string, object> Arguments();
2325

2426
string Leader();
2527

28+
// TODO IEnumerable? ICollection?
2629
List<string> Replicas();
2730

2831
ulong MessageCount();
@@ -38,4 +41,3 @@ public enum ExchangeType
3841
TOPIC,
3942
HEADERS
4043
}
41-

RabbitMQ.AMQP.Client/ILifeCycle.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public override string ToString()
2222

2323
public delegate void LifeCycleCallBack(object sender, State previousState, State currentState, Error? failureCause);
2424

25-
public interface ILifeCycle
25+
public interface ILifeCycle : IDisposable
2626
{
2727
Task CloseAsync();
2828

RabbitMQ.AMQP.Client/IManagement.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ public interface IManagement : ILifeCycle
99
IQueueSpecification Queue();
1010
IQueueSpecification Queue(string name);
1111

12+
Task<IQueueInfo> GetQueueInfoAsync(string queueName,
13+
CancellationToken cancellationToken = default);
14+
1215
IQueueDeletion QueueDeletion();
1316

1417
IExchangeSpecification Exchange();

RabbitMQ.AMQP.Client/IPublisher.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public class OutcomeDescriptor(ulong code, string description, OutcomeState stat
2121

2222
public interface IPublisher : ILifeCycle
2323
{
24+
// TODO this should be named PublishAsync
2425
Task Publish(IMessage message,
2526
OutcomeDescriptorCallback outcomeCallback); // TODO: Add CancellationToken and callBack
2627
}

RabbitMQ.AMQP.Client/IPublisherBuilder.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@ public interface IPublisherBuilder : IAddressBuilder<IPublisherBuilder>
55
IPublisherBuilder PublishTimeout(TimeSpan timeout);
66

77
IPublisherBuilder MaxInflightMessages(int maxInFlight);
8-
IPublisher Build();
8+
9+
Task<IPublisher> BuildAsync(CancellationToken cancellationToken = default);
910
}

RabbitMQ.AMQP.Client/Impl/AbstractLifeCycle.cs

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ public class AmqpNotOpenException(string message) : Exception(message);
66

77
public abstract class AbstractLifeCycle : ILifeCycle
88
{
9-
protected virtual Task OpenAsync()
9+
private bool _disposedValue;
10+
11+
public virtual Task OpenAsync()
1012
{
1113
OnNewStatus(State.Open, null);
1214
return Task.CompletedTask;
@@ -16,6 +18,15 @@ protected virtual Task OpenAsync()
1618

1719
public State State { get; internal set; } = State.Closed;
1820

21+
public event LifeCycleCallBack? ChangeState;
22+
23+
public void Dispose()
24+
{
25+
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
26+
Dispose(disposing: true);
27+
GC.SuppressFinalize(this);
28+
}
29+
1930
protected void ThrowIfClosed()
2031
{
2132
switch (State)
@@ -49,7 +60,27 @@ protected void OnNewStatus(State newState, Error? error)
4960
ChangeState?.Invoke(this, oldStatus, newState, error);
5061
}
5162

52-
public event LifeCycleCallBack? ChangeState;
63+
protected virtual void Dispose(bool disposing)
64+
{
65+
if (!_disposedValue)
66+
{
67+
if (disposing)
68+
{
69+
// TODO: dispose managed state (managed objects)
70+
}
71+
72+
// TODO: free unmanaged resources (unmanaged objects) and override finalizer
73+
// TODO: set large fields to null
74+
_disposedValue = true;
75+
}
76+
}
77+
78+
// // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources
79+
// ~AbstractLifeCycle()
80+
// {
81+
// // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
82+
// Dispose(disposing: false);
83+
// }
5384
}
5485

5586
public abstract class AbstractReconnectLifeCycle : AbstractLifeCycle
@@ -61,7 +92,7 @@ internal void ChangeStatus(State newState, Error? error)
6192
OnNewStatus(newState, error);
6293
}
6394

64-
internal async Task Reconnect()
95+
internal async Task ReconnectAsync()
6596
{
6697
try
6798
{
@@ -84,7 +115,7 @@ internal async Task Reconnect()
84115
await Task.Delay(delay).ConfigureAwait(false);
85116
if (_backOffDelayPolicy.IsActive())
86117
{
87-
await Reconnect().ConfigureAwait(false);
118+
await ReconnectAsync().ConfigureAwait(false);
88119
}
89120
}
90121
}

RabbitMQ.AMQP.Client/Impl/AmqpBindingSpecification.cs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public async Task Bind()
3737
{ ToQueue ? "destination_queue" : "destination_exchange", Destination }
3838
};
3939

40-
await Management.Request(kv, $"/{Consts.Bindings}",
40+
await Management.RequestAsync(kv, $"/{Consts.Bindings}",
4141
AmqpManagement.Post,
4242
[
4343
AmqpManagement.Code204,
@@ -54,7 +54,7 @@ public async Task Unbind()
5454
$"{($"{destinationCharacter}={Utils.EncodePathSegment(Destination)};" +
5555
$"key={Utils.EncodePathSegment(RoutingKey)};args=")}";
5656

57-
await Management.Request(
57+
await Management.RequestAsync(
5858
null, target,
5959
AmqpManagement.Delete, new[] { AmqpManagement.Code204 }).ConfigureAwait(false);
6060
}
@@ -65,7 +65,7 @@ await Management.Request(
6565
string? uri = MatchBinding(bindings, RoutingKey, ArgsToMap());
6666
if (uri != null)
6767
{
68-
await Management.Request(
68+
await Management.RequestAsync(
6969
null, uri,
7070
AmqpManagement.Delete, new[] { AmqpManagement.Code204 }).ConfigureAwait(false);
7171
}
@@ -125,7 +125,7 @@ private string BindingsTarget(
125125

126126
private async Task<List<Map>> GetBindings(string path)
127127
{
128-
var result = await Management.Request(
128+
Amqp.Message result = await Management.RequestAsync(
129129
null, path,
130130
AmqpManagement.Get, new[] { AmqpManagement.Code200 }).ConfigureAwait(false);
131131

@@ -134,7 +134,6 @@ private async Task<List<Map>> GetBindings(string path)
134134
return [];
135135
}
136136

137-
138137
var l = new List<Map>() { };
139138
foreach (object o in list)
140139
{

0 commit comments

Comments
 (0)