@@ -40,20 +40,89 @@ namespace RabbitMQ.Client.Unit
40
40
[ TestFixture ]
41
41
public class TestAsyncConsumerExceptions : IntegrationFixture
42
42
{
43
+ protected void TestExceptionHandlingWith ( IBasicConsumer consumer ,
44
+ Action < IModel , string , IBasicConsumer , string > action )
45
+ {
46
+ object o = new object ( ) ;
47
+ bool notified = false ;
48
+ string q = _model . QueueDeclare ( ) ;
49
+
50
+
51
+ _model . CallbackException += ( m , evt ) =>
52
+ {
53
+ notified = true ;
54
+ Monitor . PulseAll ( o ) ;
55
+ } ;
56
+
57
+ string tag = _model . BasicConsume ( q , true , consumer ) ;
58
+ action ( _model , q , consumer , tag ) ;
59
+ WaitOn ( o ) ;
60
+
61
+ Assert . IsTrue ( notified ) ;
62
+ }
63
+
64
+ [ SetUp ]
65
+ public override void Init ( )
66
+ {
67
+ _connFactory = new ConnectionFactory
68
+ {
69
+ DispatchConsumersAsync = true
70
+ } ;
71
+
72
+ _conn = _connFactory . CreateConnection ( ) ;
73
+ _model = _conn . CreateModel ( ) ;
74
+ }
75
+
76
+ [ Test ]
77
+ public void TestCancelNotificationExceptionHandling ( )
78
+ {
79
+ IBasicConsumer consumer = new ConsumerFailingOnCancel ( _model ) ;
80
+ TestExceptionHandlingWith ( consumer , ( m , q , c , ct ) => m . QueueDelete ( q ) ) ;
81
+ }
82
+
83
+ [ Test ]
84
+ public void TestConsumerCancelOkExceptionHandling ( )
85
+ {
86
+ IBasicConsumer consumer = new ConsumerFailingOnCancelOk ( _model ) ;
87
+ TestExceptionHandlingWith ( consumer , ( m , q , c , ct ) => m . BasicCancel ( ct ) ) ;
88
+ }
89
+
90
+ [ Test ]
91
+ public void TestConsumerConsumeOkExceptionHandling ( )
92
+ {
93
+ IBasicConsumer consumer = new ConsumerFailingOnConsumeOk ( _model ) ;
94
+ TestExceptionHandlingWith ( consumer , ( m , q , c , ct ) => { } ) ;
95
+ }
96
+
97
+ [ Test ]
98
+ public void TestConsumerShutdownExceptionHandling ( )
99
+ {
100
+ IBasicConsumer consumer = new ConsumerFailingOnShutdown ( _model ) ;
101
+ TestExceptionHandlingWith ( consumer , ( m , q , c , ct ) => m . Close ( ) ) ;
102
+ }
103
+
104
+ [ Test ]
105
+ public void TestDeliveryExceptionHandling ( )
106
+ {
107
+ IBasicConsumer consumer = new ConsumerFailingOnDelivery ( _model ) ;
108
+ TestExceptionHandlingWith ( consumer , ( m , q , c , ct ) => m . BasicPublish ( "" , q , null , _encoding . GetBytes ( "msg" ) ) ) ;
109
+ }
110
+
43
111
private class ConsumerFailingOnDelivery : AsyncEventingBasicConsumer
44
112
{
45
113
public ConsumerFailingOnDelivery ( IModel model ) : base ( model )
46
114
{
47
115
}
48
116
49
- public override Task HandleBasicDeliver ( string consumerTag ,
117
+ public override async Task HandleBasicDeliver ( string consumerTag ,
50
118
ulong deliveryTag ,
51
119
bool redelivered ,
52
120
string exchange ,
53
121
string routingKey ,
54
122
IBasicProperties properties ,
55
123
ReadOnlyMemory < byte > body )
56
124
{
125
+ await Task . Delay ( 0 ) ;
57
126
throw new Exception ( "oops" ) ;
58
127
}
59
128
}
@@ -64,8 +133,9 @@ public ConsumerFailingOnCancel(IModel model) : base(model)
64
133
{
65
134
}
66
135
67
- public override Task HandleBasicCancel ( string consumerTag )
136
+ public override async Task HandleBasicCancel ( string consumerTag )
68
137
{
138
+ await Task . Delay ( 0 ) ;
69
139
throw new Exception ( "oops" ) ;
70
140
}
71
141
}
@@ -76,8 +146,9 @@ public ConsumerFailingOnShutdown(IModel model) : base(model)
76
146
{
77
147
}
78
148
79
- public override Task HandleModelShutdown ( object model , ShutdownEventArgs reason )
149
+ public override async Task HandleModelShutdown ( object model , ShutdownEventArgs reason )
80
150
{
151
+ await Task . Delay ( 0 ) ;
81
152
throw new Exception ( "oops" ) ;
82
153
}
83
154
}
@@ -88,8 +159,9 @@ public ConsumerFailingOnConsumeOk(IModel model) : base(model)
88
159
{
89
160
}
90
161
91
- public override Task HandleBasicConsumeOk ( string consumerTag )
162
+ public override async Task HandleBasicConsumeOk ( string consumerTag )
92
163
{
164
+ await Task . Delay ( 0 ) ;
93
165
throw new Exception ( "oops" ) ;
94
166
}
95
167
}
@@ -100,66 +172,11 @@ public ConsumerFailingOnCancelOk(IModel model) : base(model)
100
172
{
101
173
}
102
174
103
- public override Task HandleBasicCancelOk ( string consumerTag )
175
+ public override async Task HandleBasicCancelOk ( string consumerTag )
104
176
{
177
+ await Task . Delay ( 0 ) ;
105
178
throw new Exception ( "oops" ) ;
106
179
}
107
180
}
108
-
109
- protected void TestExceptionHandlingWith ( IBasicConsumer consumer ,
110
- Action < IModel , string , IBasicConsumer , string > action )
111
- {
112
- object o = new object ( ) ;
113
- bool notified = false ;
114
- string q = _model . QueueDeclare ( ) ;
115
-
116
-
117
- _model . CallbackException += ( m , evt ) =>
118
- {
119
- notified = true ;
120
- Monitor . PulseAll ( o ) ;
121
- } ;
122
-
123
- string tag = _model . BasicConsume ( q , true , consumer ) ;
124
- action ( _model , q , consumer , tag ) ;
125
- WaitOn ( o ) ;
126
-
127
- Assert . IsTrue ( notified ) ;
128
- }
129
-
130
- [ Test ]
131
- public void TestCancelNotificationExceptionHandling ( )
132
- {
133
- IBasicConsumer consumer = new ConsumerFailingOnCancel ( _model ) ;
134
- TestExceptionHandlingWith ( consumer , ( m , q , c , ct ) => m . QueueDelete ( q ) ) ;
135
- }
136
-
137
- [ Test ]
138
- public void TestConsumerCancelOkExceptionHandling ( )
139
- {
140
- IBasicConsumer consumer = new ConsumerFailingOnCancelOk ( _model ) ;
141
- TestExceptionHandlingWith ( consumer , ( m , q , c , ct ) => m . BasicCancel ( ct ) ) ;
142
- }
143
-
144
- [ Test ]
145
- public void TestConsumerConsumeOkExceptionHandling ( )
146
- {
147
- IBasicConsumer consumer = new ConsumerFailingOnConsumeOk ( _model ) ;
148
- TestExceptionHandlingWith ( consumer , ( m , q , c , ct ) => { } ) ;
149
- }
150
-
151
- [ Test ]
152
- public void TestConsumerShutdownExceptionHandling ( )
153
- {
154
- IBasicConsumer consumer = new ConsumerFailingOnShutdown ( _model ) ;
155
- TestExceptionHandlingWith ( consumer , ( m , q , c , ct ) => m . Close ( ) ) ;
156
- }
157
-
158
- [ Test ]
159
- public void TestDeliveryExceptionHandling ( )
160
- {
161
- IBasicConsumer consumer = new ConsumerFailingOnDelivery ( _model ) ;
162
- TestExceptionHandlingWith ( consumer , ( m , q , c , ct ) => m . BasicPublish ( "" , q , null , _encoding . GetBytes ( "msg" ) ) ) ;
163
- }
164
181
}
165
182
}
0 commit comments