@@ -47,8 +47,10 @@ public TestExchangeDeclare(ITestOutputHelper output) : base(output)
47
47
{
48
48
}
49
49
50
- [ Fact ]
51
- public async Task TestConcurrentExchangeDeclareAndBind ( )
50
+ [ Theory ]
51
+ [ InlineData ( true ) ]
52
+ [ InlineData ( false ) ]
53
+ public async Task TestConcurrentExchangeDeclareAndBind ( bool useDedicatedChannelPerTask )
52
54
{
53
55
var exchangeNames = new ConcurrentBag < ExchangeName > ( ) ;
54
56
var tasks = new List < Task > ( ) ;
@@ -61,27 +63,35 @@ public async Task TestConcurrentExchangeDeclareAndBind()
61
63
{
62
64
async Task f ( )
63
65
{
64
- using ( IChannel ch = await _conn . CreateChannelAsync ( ) )
66
+ IChannel ch = _channel ;
67
+ if ( useDedicatedChannelPerTask )
65
68
{
66
- try
67
- {
68
- await Task . Delay ( S_Random . Next ( 5 , 50 ) ) ;
69
- ExchangeName exchangeName = GenerateExchangeName ( ) ;
70
- await ch . ExchangeDeclareAsync ( exchange : exchangeName , type : ExchangeType . Fanout , false , false ) ;
71
- await ch . ExchangeBindAsync ( destination : ex_destination , source : exchangeName ,
72
- routingKey : new RoutingKey ( "unused" ) ) ;
73
- exchangeNames . Add ( exchangeName ) ;
74
- }
75
- catch ( NotSupportedException e )
76
- {
77
- nse = e ;
78
- }
79
- finally
69
+ ch = await _conn . CreateChannelAsync ( ) ;
70
+ }
71
+
72
+ try
73
+ {
74
+ await Task . Delay ( S_Random . Next ( 5 , 50 ) ) ;
75
+ ExchangeName exchangeName = GenerateExchangeName ( ) ;
76
+ await ch . ExchangeDeclareAsync ( exchange : exchangeName , type : ExchangeType . Fanout , false , false ) ;
77
+ await ch . ExchangeBindAsync ( destination : ex_destination , source : exchangeName ,
78
+ routingKey : new RoutingKey ( "unused" ) ) ;
79
+ exchangeNames . Add ( exchangeName ) ;
80
+ }
81
+ catch ( NotSupportedException e )
82
+ {
83
+ nse = e ;
84
+ }
85
+ finally
86
+ {
87
+ if ( useDedicatedChannelPerTask )
80
88
{
81
89
await ch . CloseAsync ( ) ;
90
+ ch . Dispose ( ) ;
82
91
}
83
92
}
84
93
}
94
+
85
95
var t = Task . Run ( f ) ;
86
96
tasks . Add ( t ) ;
87
97
}
@@ -94,22 +104,29 @@ await ch.ExchangeBindAsync(destination: ex_destination, source: exchangeName,
94
104
{
95
105
async Task f ( )
96
106
{
97
- using ( IChannel ch = await _conn . CreateChannelAsync ( ) )
107
+ IChannel ch = _channel ;
108
+ if ( useDedicatedChannelPerTask )
98
109
{
99
- try
100
- {
101
- await Task . Delay ( S_Random . Next ( 5 , 50 ) ) ;
102
- await _channel . ExchangeUnbindAsync ( destination : ex_destination , source : exchangeName , routingKey : ( RoutingKey ) "unused" ,
103
- noWait : false , arguments : null ) ;
104
- await _channel . ExchangeDeleteAsync ( exchange : exchangeName , ifUnused : false ) ;
105
- }
106
- catch ( NotSupportedException e )
107
- {
108
- nse = e ;
109
- }
110
- finally
110
+ ch = await _conn . CreateChannelAsync ( ) ;
111
+ }
112
+
113
+ try
114
+ {
115
+ await Task . Delay ( S_Random . Next ( 5 , 50 ) ) ;
116
+ await _channel . ExchangeUnbindAsync ( destination : ex_destination , source : exchangeName , routingKey : ( RoutingKey ) "unused" ,
117
+ noWait : false , arguments : null ) ;
118
+ await _channel . ExchangeDeleteAsync ( exchange : exchangeName , ifUnused : false ) ;
119
+ }
120
+ catch ( NotSupportedException e )
121
+ {
122
+ nse = e ;
123
+ }
124
+ finally
125
+ {
126
+ if ( useDedicatedChannelPerTask )
111
127
{
112
128
await ch . CloseAsync ( ) ;
129
+ ch . Dispose ( ) ;
113
130
}
114
131
}
115
132
}
@@ -122,8 +139,10 @@ await _channel.ExchangeUnbindAsync(destination: ex_destination, source: exchange
122
139
await _channel . ExchangeDeleteAsync ( exchange : ex_destination ) ;
123
140
}
124
141
125
- [ Fact ]
126
- public async Task TestConcurrentExchangeDeclareAndDelete ( )
142
+ [ Theory ]
143
+ [ InlineData ( true ) ]
144
+ [ InlineData ( false ) ]
145
+ public async Task TestConcurrentExchangeDeclareAndDelete ( bool useDedicatedChannelPerTask )
127
146
{
128
147
var exchangeNames = new ConcurrentBag < ExchangeName > ( ) ;
129
148
var tasks = new List < Task > ( ) ;
@@ -132,24 +151,31 @@ public async Task TestConcurrentExchangeDeclareAndDelete()
132
151
{
133
152
var t = Task . Run ( async ( ) =>
134
153
{
135
- using ( IChannel ch = await _conn . CreateChannelAsync ( ) )
154
+ IChannel ch = _channel ;
155
+ if ( useDedicatedChannelPerTask )
136
156
{
137
- try
138
- {
139
- // sleep for a random amount of time to increase the chances
140
- // of thread interleaving. MK.
141
- await Task . Delay ( _rnd . Next ( 5 , 500 ) ) ;
142
- ExchangeName exchangeName = GenerateExchangeName ( ) ;
143
- await ch . ExchangeDeclareAsync ( exchange : exchangeName , ExchangeType . Fanout , false , false ) ;
144
- exchangeNames . Add ( exchangeName ) ;
145
- }
146
- catch ( NotSupportedException e )
147
- {
148
- nse = e ;
149
- }
150
- finally
157
+ ch = await _conn . CreateChannelAsync ( ) ;
158
+ }
159
+
160
+ try
161
+ {
162
+ // sleep for a random amount of time to increase the chances
163
+ // of thread interleaving. MK.
164
+ await Task . Delay ( _rnd . Next ( 5 , 500 ) ) ;
165
+ ExchangeName exchangeName = GenerateExchangeName ( ) ;
166
+ await ch . ExchangeDeclareAsync ( exchange : exchangeName , ExchangeType . Fanout , false , false ) ;
167
+ exchangeNames . Add ( exchangeName ) ;
168
+ }
169
+ catch ( NotSupportedException e )
170
+ {
171
+ nse = e ;
172
+ }
173
+ finally
174
+ {
175
+ if ( useDedicatedChannelPerTask )
151
176
{
152
177
await ch . CloseAsync ( ) ;
178
+ ch . Dispose ( ) ;
153
179
}
154
180
}
155
181
} ) ;
@@ -166,22 +192,29 @@ public async Task TestConcurrentExchangeDeclareAndDelete()
166
192
ExchangeName ex = exchangeName ;
167
193
var t = Task . Run ( async ( ) =>
168
194
{
169
- using ( IChannel ch = await _conn . CreateChannelAsync ( ) )
195
+ IChannel ch = _channel ;
196
+ if ( useDedicatedChannelPerTask )
170
197
{
171
- try
172
- {
173
- // sleep for a random amount of time to increase the chances
174
- // of thread interleaving. MK.
175
- await Task . Delay ( _rnd . Next ( 5 , 500 ) ) ;
176
- await ch . ExchangeDeleteAsync ( ex ) ;
177
- }
178
- catch ( NotSupportedException e )
179
- {
180
- nse = e ;
181
- }
182
- finally
198
+ ch = await _conn . CreateChannelAsync ( ) ;
199
+ }
200
+
201
+ try
202
+ {
203
+ // sleep for a random amount of time to increase the chances
204
+ // of thread interleaving. MK.
205
+ await Task . Delay ( _rnd . Next ( 5 , 500 ) ) ;
206
+ await ch . ExchangeDeleteAsync ( ex ) ;
207
+ }
208
+ catch ( NotSupportedException e )
209
+ {
210
+ nse = e ;
211
+ }
212
+ finally
213
+ {
214
+ if ( useDedicatedChannelPerTask )
183
215
{
184
216
await ch . CloseAsync ( ) ;
217
+ ch . Dispose ( ) ;
185
218
}
186
219
}
187
220
} ) ;
0 commit comments