Skip to content

Commit c22949f

Browse files
committed
Revert "Make event handlers for basic.ack, basic.nack and basic.return async."
This reverts commit ccf0764.
1 parent ccf0764 commit c22949f

File tree

10 files changed

+50
-117
lines changed

10 files changed

+50
-117
lines changed

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,10 @@ RabbitMQ.Client.IBasicProperties.UserId.get -> string
426426
RabbitMQ.Client.IBasicProperties.UserId.set -> void
427427
RabbitMQ.Client.IChannel
428428
RabbitMQ.Client.IChannel.BasicAckAsync(ulong deliveryTag, bool multiple, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
429+
RabbitMQ.Client.IChannel.BasicAcks -> System.EventHandler<RabbitMQ.Client.Events.BasicAckEventArgs>
429430
RabbitMQ.Client.IChannel.BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
431+
RabbitMQ.Client.IChannel.BasicNacks -> System.EventHandler<RabbitMQ.Client.Events.BasicNackEventArgs>
432+
RabbitMQ.Client.IChannel.BasicReturn -> System.EventHandler<RabbitMQ.Client.Events.BasicReturnEventArgs>
430433
RabbitMQ.Client.IChannel.CallbackException -> System.EventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs>
431434
RabbitMQ.Client.IChannel.ChannelNumber.get -> int
432435
RabbitMQ.Client.IChannel.ChannelShutdown -> System.EventHandler<RabbitMQ.Client.ShutdownEventArgs>
@@ -892,7 +895,3 @@ RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, Syst
892895
const RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency = 1 -> ushort
893896
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort
894897
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
895-
RabbitMQ.Client.IChannel.BasicAcksAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.BasicAckEventArgs!>!
896-
RabbitMQ.Client.IChannel.BasicNacksAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.BasicNackEventArgs!>!
897-
RabbitMQ.Client.IChannel.BasicReturnAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.BasicReturnEventArgs!>!
898-
RabbitMQ.Client.IChannel.CallbackExceptionAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs!>!

projects/RabbitMQ.Client/client/api/IChannel.cs

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -106,17 +106,17 @@ public interface IChannel : IDisposable
106106
/// <summary>
107107
/// Signalled when a Basic.Ack command arrives from the broker.
108108
/// </summary>
109-
event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync;
109+
event EventHandler<BasicAckEventArgs> BasicAcks;
110110

111111
/// <summary>
112112
/// Signalled when a Basic.Nack command arrives from the broker.
113113
/// </summary>
114-
event AsyncEventHandler<BasicNackEventArgs> BasicNacksAsync;
114+
event EventHandler<BasicNackEventArgs> BasicNacks;
115115

116116
/// <summary>
117117
/// Signalled when a Basic.Return command arrives from the broker.
118118
/// </summary>
119-
event AsyncEventHandler<BasicReturnEventArgs> BasicReturnAsync;
119+
event EventHandler<BasicReturnEventArgs> BasicReturn;
120120

121121
/// <summary>
122122
/// Signalled when an exception occurs in a callback invoked by the channel.
@@ -127,15 +127,6 @@ public interface IChannel : IDisposable
127127
/// </summary>
128128
event EventHandler<CallbackExceptionEventArgs> CallbackException;
129129

130-
/// <summary>
131-
/// Signalled when an exception occurs in an asynchronous callback invoked by the channel.
132-
/// Examples of cases where this event will be signalled
133-
/// include exceptions thrown in <see cref="IChannel.BasicAcksAsync"/>,
134-
/// <see cref="IChannel.BasicNacksAsync"/>,
135-
/// <see cref="IChannel.BasicReturnAsync"/> event handlers, etc.
136-
/// </summary>
137-
event AsyncEventHandler<CallbackExceptionEventArgs> CallbackExceptionAsync;
138-
139130
event EventHandler<FlowControlEventArgs> FlowControl;
140131

141132
/// <summary>
@@ -193,7 +184,7 @@ Task<string> BasicConsumeAsync(string queue, bool autoAck, string consumerTag, b
193184
/// <summary>
194185
/// Asynchronously retrieve an individual message, if
195186
/// one is available; returns null if the server answers that
196-
/// no messages are currently available. See also <see cref="IChannel.BasicAckAsync(ulong, bool, CancellationToken)"/>
187+
/// no messages are currently available. See also <see cref="IChannel.BasicAckAsync" />.
197188
/// </summary>
198189
/// <param name="queue">The queue.</param>
199190
/// <param name="autoAck">If set to <c>true</c>, automatically ack the message.</param>
@@ -277,9 +268,7 @@ Task CloseAsync(ShutdownEventArgs reason, bool abort,
277268
/// <summary>
278269
/// Asynchronously enable publisher confirmations.
279270
/// </summary>
280-
/// <param name="trackConfirmations">Set to <c>false</c> if tracking via
281-
/// <see cref="IChannel.BasicAckAsync(ulong, bool, CancellationToken)"/> and
282-
/// <see cref="IChannel.BasicNackAsync(ulong, bool, bool, CancellationToken)"/> yourself.</param>
271+
/// <param name="trackConfirmations">Set to <c>false</c> if tracking via <see cref="BasicAcks"/> and <see cref="BasicNacks"/> yourself.</param>
283272
/// <param name="cancellationToken">CancellationToken for this operation.</param>
284273
Task ConfirmSelectAsync(bool trackConfirmations = true,
285274
CancellationToken cancellationToken = default);

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -79,22 +79,22 @@ public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel
7979
_consumerDispatchConcurrency = consumerDispatchConcurrency;
8080
}
8181

82-
public event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync
82+
public event EventHandler<BasicAckEventArgs> BasicAcks
8383
{
84-
add => InnerChannel.BasicAcksAsync += value;
85-
remove => InnerChannel.BasicAcksAsync -= value;
84+
add => InnerChannel.BasicAcks += value;
85+
remove => InnerChannel.BasicAcks -= value;
8686
}
8787

88-
public event AsyncEventHandler<BasicNackEventArgs> BasicNacksAsync
88+
public event EventHandler<BasicNackEventArgs> BasicNacks
8989
{
90-
add => InnerChannel.BasicNacksAsync += value;
91-
remove => InnerChannel.BasicNacksAsync -= value;
90+
add => InnerChannel.BasicNacks += value;
91+
remove => InnerChannel.BasicNacks -= value;
9292
}
9393

94-
public event AsyncEventHandler<BasicReturnEventArgs> BasicReturnAsync
94+
public event EventHandler<BasicReturnEventArgs> BasicReturn
9595
{
96-
add => InnerChannel.BasicReturnAsync += value;
97-
remove => InnerChannel.BasicReturnAsync -= value;
96+
add => InnerChannel.BasicReturn += value;
97+
remove => InnerChannel.BasicReturn -= value;
9898
}
9999

100100
public event EventHandler<CallbackExceptionEventArgs> CallbackException
@@ -103,12 +103,6 @@ public event EventHandler<CallbackExceptionEventArgs> CallbackException
103103
remove => InnerChannel.CallbackException -= value;
104104
}
105105

106-
public event AsyncEventHandler<CallbackExceptionEventArgs> CallbackExceptionAsync
107-
{
108-
add => InnerChannel.CallbackExceptionAsync += value;
109-
remove => InnerChannel.CallbackExceptionAsync -= value;
110-
}
111-
112106
public event EventHandler<FlowControlEventArgs> FlowControl
113107
{
114108
add { InnerChannel.FlowControl += value; }

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 21 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -79,16 +79,11 @@ protected ChannelBase(ConnectionConfig config, ISession session,
7979
ContinuationTimeout = config.ContinuationTimeout;
8080
ConsumerDispatcher = new AsyncConsumerDispatcher(this,
8181
perChannelConsumerDispatchConcurrency.GetValueOrDefault(config.ConsumerDispatchConcurrency));
82-
83-
void onException(Exception exception, string context) =>
82+
Action<Exception, string> onException = (exception, context) =>
8483
OnCallbackException(CallbackExceptionEventArgs.Build(exception, context));
85-
86-
Task onExceptionAsync(Exception exception, string context) =>
87-
OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context));
88-
89-
_basicAcksWrapper = new AsyncEventingWrapper<BasicAckEventArgs>("OnBasicAck", onExceptionAsync);
90-
_basicNacksWrapper = new AsyncEventingWrapper<BasicNackEventArgs>("OnBasicNack", onExceptionAsync);
91-
_basicReturnWrapper = new AsyncEventingWrapper<BasicReturnEventArgs>("OnBasicReturn", onExceptionAsync);
84+
_basicAcksWrapper = new EventingWrapper<BasicAckEventArgs>("OnBasicAck", onException);
85+
_basicNacksWrapper = new EventingWrapper<BasicNackEventArgs>("OnBasicNack", onException);
86+
_basicReturnWrapper = new EventingWrapper<BasicReturnEventArgs>("OnBasicReturn", onException);
9287
_callbackExceptionWrapper =
9388
new EventingWrapper<CallbackExceptionEventArgs>(string.Empty, (exception, context) => { });
9489
_flowControlWrapper = new EventingWrapper<FlowControlEventArgs>("OnFlowControl", onException);
@@ -102,46 +97,44 @@ Task onExceptionAsync(Exception exception, string context) =>
10297
internal TimeSpan HandshakeContinuationTimeout { get; set; } = TimeSpan.FromSeconds(10);
10398
public TimeSpan ContinuationTimeout { get; set; }
10499

105-
public event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync
100+
public event EventHandler<BasicAckEventArgs> BasicAcks
106101
{
107102
add => _basicAcksWrapper.AddHandler(value);
108103
remove => _basicAcksWrapper.RemoveHandler(value);
109104
}
110-
private AsyncEventingWrapper<BasicAckEventArgs> _basicAcksWrapper;
111105

112-
public event AsyncEventHandler<BasicNackEventArgs> BasicNacksAsync
106+
private EventingWrapper<BasicAckEventArgs> _basicAcksWrapper;
107+
108+
public event EventHandler<BasicNackEventArgs> BasicNacks
113109
{
114110
add => _basicNacksWrapper.AddHandler(value);
115111
remove => _basicNacksWrapper.RemoveHandler(value);
116112
}
117-
private AsyncEventingWrapper<BasicNackEventArgs> _basicNacksWrapper;
118113

119-
public event AsyncEventHandler<BasicReturnEventArgs> BasicReturnAsync
114+
private EventingWrapper<BasicNackEventArgs> _basicNacksWrapper;
115+
116+
public event EventHandler<BasicReturnEventArgs> BasicReturn
120117
{
121118
add => _basicReturnWrapper.AddHandler(value);
122119
remove => _basicReturnWrapper.RemoveHandler(value);
123120
}
124-
private AsyncEventingWrapper<BasicReturnEventArgs> _basicReturnWrapper;
121+
122+
private EventingWrapper<BasicReturnEventArgs> _basicReturnWrapper;
125123

126124
public event EventHandler<CallbackExceptionEventArgs> CallbackException
127125
{
128126
add => _callbackExceptionWrapper.AddHandler(value);
129127
remove => _callbackExceptionWrapper.RemoveHandler(value);
130128
}
131-
private EventingWrapper<CallbackExceptionEventArgs> _callbackExceptionWrapper;
132129

133-
public event AsyncEventHandler<CallbackExceptionEventArgs> CallbackExceptionAsync
134-
{
135-
add => _callbackExceptionAsyncWrapper.AddHandler(value);
136-
remove => _callbackExceptionAsyncWrapper.RemoveHandler(value);
137-
}
138-
private AsyncEventingWrapper<CallbackExceptionEventArgs> _callbackExceptionAsyncWrapper;
130+
private EventingWrapper<CallbackExceptionEventArgs> _callbackExceptionWrapper;
139131

140132
public event EventHandler<FlowControlEventArgs> FlowControl
141133
{
142134
add => _flowControlWrapper.AddHandler(value);
143135
remove => _flowControlWrapper.RemoveHandler(value);
144136
}
137+
145138
private EventingWrapper<FlowControlEventArgs> _flowControlWrapper;
146139

147140
public event EventHandler<ShutdownEventArgs> ChannelShutdown
@@ -159,13 +152,15 @@ public event EventHandler<ShutdownEventArgs> ChannelShutdown
159152
}
160153
remove => _channelShutdownWrapper.RemoveHandler(value);
161154
}
155+
162156
private EventingWrapper<ShutdownEventArgs> _channelShutdownWrapper;
163157

164158
public event EventHandler<EventArgs> Recovery
165159
{
166160
add => _recoveryWrapper.AddHandler(value);
167161
remove => _recoveryWrapper.RemoveHandler(value);
168162
}
163+
169164
private EventingWrapper<EventArgs> _recoveryWrapper;
170165

171166
internal void RunRecoveryEventHandlers(object sender)
@@ -472,11 +467,6 @@ internal void OnCallbackException(CallbackExceptionEventArgs args)
472467
_callbackExceptionWrapper.Invoke(this, args);
473468
}
474469

475-
internal Task OnCallbackExceptionAsync(CallbackExceptionEventArgs args)
476-
{
477-
return _callbackExceptionAsyncWrapper.InvokeAsync(this, args);
478-
}
479-
480470
///<summary>Broadcasts notification of the final shutdown of the channel.</summary>
481471
///<remarks>
482472
///<para>
@@ -582,8 +572,7 @@ protected async Task<bool> HandleBasicAck(IncomingCommand cmd, CancellationToken
582572
if (!_basicAcksWrapper.IsEmpty)
583573
{
584574
var args = new BasicAckEventArgs(ack._deliveryTag, ack._multiple);
585-
await _basicAcksWrapper.InvokeAsync(this, args)
586-
.ConfigureAwait(false);
575+
_basicAcksWrapper.Invoke(this, args);
587576
}
588577

589578
await HandleAckNack(ack._deliveryTag, ack._multiple, false, cancellationToken)
@@ -598,8 +587,7 @@ protected async Task<bool> HandleBasicNack(IncomingCommand cmd, CancellationToke
598587
{
599588
var args = new BasicNackEventArgs(
600589
nack._deliveryTag, nack._multiple, nack._requeue);
601-
await _basicNacksWrapper.InvokeAsync(this, args)
602-
.ConfigureAwait(false);
590+
_basicNacksWrapper.Invoke(this, args);
603591
}
604592

605593
await HandleAckNack(nack._deliveryTag, nack._multiple, true, cancellationToken)
@@ -640,19 +628,15 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag)
640628
return deliveryTag;
641629
}
642630

643-
protected Task HandleBasicReturn(IncomingCommand cmd)
631+
protected void HandleBasicReturn(IncomingCommand cmd)
644632
{
645633
if (!_basicReturnWrapper.IsEmpty)
646634
{
647635
var basicReturn = new BasicReturn(cmd.MethodSpan);
648636
var e = new BasicReturnEventArgs(basicReturn._replyCode, basicReturn._replyText,
649637
basicReturn._exchange, basicReturn._routingKey,
650638
new ReadOnlyBasicProperties(cmd.HeaderSpan), cmd.Body.Memory);
651-
return _basicReturnWrapper.InvokeAsync(this, e);
652-
}
653-
else
654-
{
655-
return Task.CompletedTask;
639+
_basicReturnWrapper.Invoke(this, e);
656640
}
657641
}
658642

projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,15 +112,15 @@ async Task HandlePublishConfirmsAsynchronously()
112112
var allMessagesConfirmedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
113113
var outstandingConfirms = new LinkedList<ulong>();
114114
var semaphore = new SemaphoreSlim(1, 1);
115-
async Task CleanOutstandingConfirmsAsync(ulong deliveryTag, bool multiple)
115+
void CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
116116
{
117117
if (debug)
118118
{
119119
Console.WriteLine("{0} [DEBUG] confirming message: {1} (multiple: {2})",
120120
DateTime.Now, deliveryTag, multiple);
121121
}
122122

123-
await semaphore.WaitAsync();
123+
semaphore.Wait();
124124
try
125125
{
126126
if (multiple)
@@ -158,13 +158,11 @@ async Task CleanOutstandingConfirmsAsync(ulong deliveryTag, bool multiple)
158158
}
159159
}
160160

161-
channel.BasicAcksAsync += (sender, ea) =>
162-
CleanOutstandingConfirmsAsync(ea.DeliveryTag, ea.Multiple);
163-
164-
channel.BasicNacksAsync += (sender, ea) =>
161+
channel.BasicAcks += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
162+
channel.BasicNacks += (sender, ea) =>
165163
{
166164
Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number: {ea.DeliveryTag} has been nacked (multiple: {ea.Multiple})");
167-
return CleanOutstandingConfirmsAsync(ea.DeliveryTag, ea.Multiple);
165+
CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
168166
};
169167

170168
var sw = new Stopwatch();

projects/Test/Common/IntegrationFixture.cs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -269,22 +269,6 @@ protected void AddCallbackExceptionHandlers(IConnection conn, IChannel channel)
269269
{
270270
}
271271
};
272-
273-
channel.CallbackExceptionAsync += (o, ea) =>
274-
{
275-
_channelCallbackException = ea.Exception;
276-
277-
try
278-
{
279-
_output.WriteLine("{0} channel callback exception: {1}",
280-
_testDisplayName, _channelCallbackException);
281-
}
282-
catch (InvalidOperationException)
283-
{
284-
}
285-
286-
return Task.CompletedTask;
287-
};
288272
}
289273
}
290274

projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -156,18 +156,8 @@ public async Task TestBasicAckEventHandlerRecovery()
156156
{
157157
await _channel.ConfirmSelectAsync();
158158
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
159-
160-
((AutorecoveringChannel)_channel).BasicAcksAsync += (m, args) =>
161-
{
162-
tcs.SetResult(true);
163-
return Task.CompletedTask;
164-
};
165-
166-
((AutorecoveringChannel)_channel).BasicNacksAsync += (m, args) =>
167-
{
168-
tcs.SetResult(true);
169-
return Task.CompletedTask;
170-
};
159+
((AutorecoveringChannel)_channel).BasicAcks += (m, args) => tcs.SetResult(true);
160+
((AutorecoveringChannel)_channel).BasicNacks += (m, args) => tcs.SetResult(true);
171161

172162
await CloseAndWaitForRecoveryAsync();
173163
await CloseAndWaitForRecoveryAsync();

projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,14 @@ public async Task ConcurrentPublishSingleChannel()
5656
{
5757
int publishAckCount = 0;
5858

59-
_channel.BasicAcksAsync += (object sender, BasicAckEventArgs e) =>
59+
_channel.BasicAcks += (object sender, BasicAckEventArgs e) =>
6060
{
6161
Interlocked.Increment(ref publishAckCount);
62-
return Task.CompletedTask;
6362
};
6463

65-
_channel.BasicNacksAsync += (object sender, BasicNackEventArgs e) =>
64+
_channel.BasicNacks += (object sender, BasicNackEventArgs e) =>
6665
{
6766
_output.WriteLine($"channel #{_channel.ChannelNumber} saw a nack, deliveryTag: {e.DeliveryTag}, multiple: {e.Multiple}");
68-
return Task.CompletedTask;
6967
};
7068

7169
await _channel.ConfirmSelectAsync(trackConfirmations: false);

projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,20 +125,18 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in
125125

126126
await ch.ConfirmSelectAsync(trackConfirmations: false);
127127

128-
ch.BasicAcksAsync += (object sender, BasicAckEventArgs e) =>
128+
ch.BasicAcks += (object sender, BasicAckEventArgs e) =>
129129
{
130130
if (e.DeliveryTag >= _messageCount)
131131
{
132132
tcs.SetResult(true);
133133
}
134-
return Task.CompletedTask;
135134
};
136135

137-
ch.BasicNacksAsync += (object sender, BasicNackEventArgs e) =>
136+
ch.BasicNacks += (object sender, BasicNackEventArgs e) =>
138137
{
139138
tcs.SetResult(false);
140139
_output.WriteLine($"channel #{ch.ChannelNumber} saw a nack, deliveryTag: {e.DeliveryTag}, multiple: {e.Multiple}");
141-
return Task.CompletedTask;
142140
};
143141

144142
QueueDeclareOk q = await ch.QueueDeclareAsync(queue: string.Empty, passive: false, durable: false, exclusive: true, autoDelete: true, arguments: null);

0 commit comments

Comments
 (0)