Skip to content

Commit e56a08b

Browse files
committed
* Implement Unpause
* Finish `PauseShouldStopMessageArrivalUnpauseShouldResumeIt` test
1 parent 82a3fcd commit e56a08b

File tree

6 files changed

+85
-58
lines changed

6 files changed

+85
-58
lines changed

RabbitMQ.AMQP.Client/ILifeCycle.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public override string ToString()
2222

2323
public delegate void LifeCycleCallBack(object sender, State previousState, State currentState, Error? failureCause);
2424

25-
public interface ILifeCycle
25+
public interface ILifeCycle : IDisposable
2626
{
2727
Task CloseAsync();
2828

RabbitMQ.AMQP.Client/Impl/AbstractLifeCycle.cs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ public class AmqpNotOpenException(string message) : Exception(message);
66

77
public abstract class AbstractLifeCycle : ILifeCycle
88
{
9+
private bool _disposedValue;
10+
911
public virtual Task OpenAsync()
1012
{
1113
OnNewStatus(State.Open, null);
@@ -16,6 +18,15 @@ public virtual Task OpenAsync()
1618

1719
public State State { get; internal set; } = State.Closed;
1820

21+
public event LifeCycleCallBack? ChangeState;
22+
23+
public void Dispose()
24+
{
25+
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
26+
Dispose(disposing: true);
27+
GC.SuppressFinalize(this);
28+
}
29+
1930
protected void ThrowIfClosed()
2031
{
2132
switch (State)
@@ -49,7 +60,27 @@ protected void OnNewStatus(State newState, Error? error)
4960
ChangeState?.Invoke(this, oldStatus, newState, error);
5061
}
5162

52-
public event LifeCycleCallBack? ChangeState;
63+
protected virtual void Dispose(bool disposing)
64+
{
65+
if (!_disposedValue)
66+
{
67+
if (disposing)
68+
{
69+
// TODO: dispose managed state (managed objects)
70+
}
71+
72+
// TODO: free unmanaged resources (unmanaged objects) and override finalizer
73+
// TODO: set large fields to null
74+
_disposedValue = true;
75+
}
76+
}
77+
78+
// // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources
79+
// ~AbstractLifeCycle()
80+
// {
81+
// // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
82+
// Dispose(disposing: false);
83+
// }
5384
}
5485

5586
public abstract class AbstractReconnectLifeCycle : AbstractLifeCycle

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -129,13 +129,6 @@ await ConnectionCloseTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(10
129129
OnNewStatus(State.Closed, null);
130130
}
131131

132-
public void Dispose()
133-
{
134-
// TODO probably more should happen in this method
135-
_semaphoreOpen.Dispose();
136-
_semaphoreClose.Dispose();
137-
}
138-
139132
public override string ToString()
140133
{
141134
string info = $"AmqpConnection{{ConnectionSettings='{_connectionSettings}', Status='{State.ToString()}'}}";
@@ -147,6 +140,18 @@ public override string ToString()
147140
return _nativeConnection;
148141
}
149142

143+
protected override void Dispose(bool disposing)
144+
{
145+
if (disposing)
146+
{
147+
// TODO probably more should/could happen in this method
148+
_semaphoreOpen.Dispose();
149+
_semaphoreClose.Dispose();
150+
}
151+
152+
base.Dispose(disposing);
153+
}
154+
150155
/// <summary>
151156
/// Closes all the publishers. It is called when the connection is closed.
152157
/// </summary>

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@ public void Pause()
115115
throw new InvalidOperationException("error transitioning from PAUSING -> PAUSED, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
116116
}
117117
}
118+
else
119+
{
120+
// TODO: log a warning that user tried to pause an already-paused consumer?
121+
}
118122
}
119123

120124
public long UnsettledMessageCount()
@@ -124,7 +128,21 @@ public long UnsettledMessageCount()
124128

125129
public void Unpause()
126130
{
127-
throw new NotImplementedException();
131+
if (_receiverLink is null)
132+
{
133+
// TODO create "internal bug" exception type?
134+
throw new InvalidOperationException("_receiverLink is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
135+
}
136+
137+
if ((int)PauseStatus.PAUSED == Interlocked.CompareExchange(ref Unsafe.As<PauseStatus, int>(ref _pauseStatus),
138+
(int)PauseStatus.UNPAUSED, (int)PauseStatus.PAUSED))
139+
{
140+
_receiverLink.SetCredit(credit: _initialCredits);
141+
}
142+
else
143+
{
144+
// TODO: log a warning that user tried to unpause a not-paused consumer?
145+
}
128146
}
129147

130148
public override async Task CloseAsync()

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ RabbitMQ.AMQP.Client.IMessageHandler.Handle(Amqp.Listener.Context! context, Rabb
163163
RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle
164164
RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.AbstractLifeCycle() -> void
165165
RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.ChangeState -> RabbitMQ.AMQP.Client.LifeCycleCallBack?
166+
RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.Dispose() -> void
166167
RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.OnNewStatus(RabbitMQ.AMQP.Client.State newState, RabbitMQ.AMQP.Client.Error? error) -> void
167168
RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.State.get -> RabbitMQ.AMQP.Client.State
168169
RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.ThrowIfClosed() -> void
@@ -192,7 +193,6 @@ RabbitMQ.AMQP.Client.Impl.AmqpClassicSpecification.Queue() -> RabbitMQ.AMQP.Clie
192193
RabbitMQ.AMQP.Client.Impl.AmqpClassicSpecification.Version(RabbitMQ.AMQP.Client.ClassicQueueVersion version) -> RabbitMQ.AMQP.Client.IClassicQueueSpecification!
193194
RabbitMQ.AMQP.Client.Impl.AmqpConnection
194195
RabbitMQ.AMQP.Client.Impl.AmqpConnection.ConsumerBuilder() -> RabbitMQ.AMQP.Client.IConsumerBuilder!
195-
RabbitMQ.AMQP.Client.Impl.AmqpConnection.Dispose() -> void
196196
RabbitMQ.AMQP.Client.Impl.AmqpConnection.GetConsumers() -> System.Collections.ObjectModel.ReadOnlyCollection<RabbitMQ.AMQP.Client.IConsumer!>!
197197
RabbitMQ.AMQP.Client.Impl.AmqpConnection.GetPublishers() -> System.Collections.ObjectModel.ReadOnlyCollection<RabbitMQ.AMQP.Client.IPublisher!>!
198198
RabbitMQ.AMQP.Client.Impl.AmqpConnection.Id.get -> long
@@ -581,5 +581,6 @@ static RabbitMQ.AMQP.Client.Impl.Utils.ValidatePositive(string! label, long valu
581581
static RabbitMQ.AMQP.Client.Impl.Utils.ValidatePositive(string! label, long value, long max) -> void
582582
static readonly RabbitMQ.AMQP.Client.SaslMechanism.External -> RabbitMQ.AMQP.Client.SaslMechanism!
583583
static readonly RabbitMQ.AMQP.Client.SaslMechanism.Plain -> RabbitMQ.AMQP.Client.SaslMechanism!
584+
virtual RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.Dispose(bool disposing) -> void
584585
virtual RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.OpenAsync() -> System.Threading.Tasks.Task!
585586
virtual RabbitMQ.AMQP.Client.Impl.AmqpManagement.InternalSendAsync(Amqp.Message! message, System.TimeSpan timeout) -> System.Threading.Tasks.Task!

Tests/Consumer/ConsumerPauseTests.cs

Lines changed: 19 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -57,21 +57,20 @@ public async Task PauseShouldStopMessageArrivalUnpauseShouldResumeIt()
5757

5858
var messageContexts = new ConcurrentBag<IContext>();
5959

60-
var consumeTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
61-
6260
IConsumer consumer = await connection.ConsumerBuilder()
6361
.Queue(declaredQueueInfo.Name())
6462
.InitialCredits(initialCredits)
6563
.MessageHandler((IContext ctx, IMessage msg) =>
6664
{
6765
messageContexts.Add(ctx);
68-
if (messageContexts.Count == initialCredits)
69-
{
70-
consumeTcs.SetResult();
71-
}
7266
}).BuildAsync();
7367

74-
await consumeTcs.Task;
68+
Task<bool> WaitForMessageContextCountAtLeast(int expectedCount)
69+
{
70+
return Task.FromResult(messageContexts.Count >= expectedCount);
71+
}
72+
73+
await SystemUtils.WaitUntilAsync(() => WaitForMessageContextCountAtLeast(initialCredits));
7574

7675
IQueueInfo queueInfo = await management.GetQueueInfoAsync(declaredQueueInfo.Name());
7776
ulong expectedMessageCount = messageCount - initialCredits;
@@ -103,51 +102,24 @@ async Task<bool> MessagesUnacknowledgedIsZero()
103102
}
104103

105104
await SystemUtils.WaitUntilAsync(MessagesUnacknowledgedIsZero);
105+
106+
Assert.Equal(initialCredits, messageContexts.Count);
107+
108+
consumer.Unpause();
109+
110+
await SystemUtils.WaitUntilAsync(() => WaitForMessageContextCountAtLeast(initialCredits * 2));
111+
112+
consumer.Pause();
113+
114+
foreach (IContext ctx in messageContexts)
115+
{
116+
ctx.Accept();
117+
}
106118
}
107119
finally
108120
{
109121
await management.CloseAsync();
110122
await connection.CloseAsync();
111123
}
112124
}
113-
114-
/*
115-
// void pauseShouldStopMessageArrivalUnpauseShouldResumeIt() {
116-
// String q = connection.management().queue().exclusive(true).declare().name();
117-
// Publisher publisher = connection.publisherBuilder().queue(q).build();
118-
// int messageCount = 100;
119-
// CountDownLatch publishLatch = new CountDownLatch(messageCount);
120-
// Publisher.Callback callback = ctx -> publishLatch.countDown();
121-
// range(0, messageCount).forEach(ignored -> publisher.publish(publisher.message(), callback));
122-
123-
// assertThat(publishLatch).completes();
124-
125-
// int initialCredits = 10;
126-
// Set<com.rabbitmq.client.amqp.Consumer.Context> messageContexts = ConcurrentHashMap.newKeySet();
127-
// com.rabbitmq.client.amqp.Consumer consumer =
128-
// connection
129-
// .consumerBuilder()
130-
// .queue(q)
131-
// .initialCredits(initialCredits)
132-
// .messageHandler((ctx, msg) -> messageContexts.add(ctx))
133-
// .build();
134-
135-
// waitAtMost(() -> messageContexts.size() == initialCredits);
136-
137-
// assertThat(connection.management().queueInfo(q)).hasMessageCount(messageCount - initialCredits);
138-
139-
// assertThat(Cli.queueInfo(q).unackedMessageCount()).isEqualTo(initialCredits);
140-
141-
// consumer.pause();
142-
// new ArrayList<>(messageContexts).forEach(com.rabbitmq.client.amqp.Consumer.Context::accept);
143-
144-
// waitAtMost(() -> Cli.queueInfo(q).unackedMessageCount() == 0);
145-
146-
waitAtMost(() -> messageContexts.size() == initialCredits);
147-
consumer.unpause();
148-
waitAtMost(() -> messageContexts.size() == initialCredits * 2);
149-
consumer.pause();
150-
messageContexts.forEach(com.rabbitmq.client.amqp.Consumer.Context::accept);
151-
}
152-
*/
153125
}

0 commit comments

Comments
 (0)