@@ -46,6 +46,8 @@ namespace Test.Integration
46
46
public class TestToxiproxy : IntegrationFixture
47
47
{
48
48
private readonly TimeSpan _heartbeatTimeout = TimeSpan . FromSeconds ( 1 ) ;
49
+ private ToxiproxyManager _toxiproxyManager ;
50
+ private int _proxyPort ;
49
51
50
52
public TestToxiproxy ( ITestOutputHelper output ) : base ( output )
51
53
{
@@ -59,7 +61,15 @@ public override Task InitializeAsync()
59
61
Assert . Null ( _conn ) ;
60
62
Assert . Null ( _channel ) ;
61
63
62
- return Task . CompletedTask ;
64
+ _toxiproxyManager = new ToxiproxyManager ( _testDisplayName , IsRunningInCI , IsWindows ) ;
65
+ _proxyPort = _toxiproxyManager . ProxyPort ;
66
+ return _toxiproxyManager . InitializeAsync ( ) ;
67
+ }
68
+
69
+ public override async Task DisposeAsync ( )
70
+ {
71
+ await _toxiproxyManager . DisposeAsync ( ) ;
72
+ await base . DisposeAsync ( ) ;
63
73
}
64
74
65
75
[ SkippableFact ]
@@ -68,11 +78,8 @@ public async Task TestCloseConnection()
68
78
{
69
79
Skip . IfNot ( AreToxiproxyTestsEnabled , "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test" ) ;
70
80
71
- using var pm = new ToxiproxyManager ( _testDisplayName , IsRunningInCI , IsWindows ) ;
72
- await pm . InitializeAsync ( ) ;
73
-
74
81
ConnectionFactory cf = CreateConnectionFactory ( ) ;
75
- cf . Port = pm . ProxyPort ;
82
+ cf . Port = _proxyPort ;
76
83
cf . AutomaticRecoveryEnabled = true ;
77
84
cf . NetworkRecoveryInterval = TimeSpan . FromSeconds ( 1 ) ;
78
85
cf . RequestedHeartbeat = TimeSpan . FromSeconds ( 1 ) ;
@@ -172,11 +179,11 @@ async Task PublishLoop()
172
179
173
180
Assert . True ( await messagePublishedTcs . Task ) ;
174
181
175
- Task disableProxyTask = pm . DisableAsync ( ) ;
182
+ Task disableProxyTask = _toxiproxyManager . DisableAsync ( ) ;
176
183
177
184
await Task . WhenAll ( disableProxyTask , connectionShutdownTcs . Task ) ;
178
185
179
- Task enableProxyTask = pm . EnableAsync ( ) ;
186
+ Task enableProxyTask = _toxiproxyManager . EnableAsync ( ) ;
180
187
181
188
Task whenAllTask = Task . WhenAll ( enableProxyTask , recoverySucceededTcs . Task ) ;
182
189
await whenAllTask . WaitAsync ( TimeSpan . FromSeconds ( 15 ) ) ;
@@ -193,11 +200,8 @@ public async Task TestThatStoppedSocketResultsInHeartbeatTimeout()
193
200
{
194
201
Skip . IfNot ( AreToxiproxyTestsEnabled , "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test" ) ;
195
202
196
- using var pm = new ToxiproxyManager ( _testDisplayName , IsRunningInCI , IsWindows ) ;
197
- await pm . InitializeAsync ( ) ;
198
-
199
203
ConnectionFactory cf = CreateConnectionFactory ( ) ;
200
- cf . Port = pm . ProxyPort ;
204
+ cf . Port = _proxyPort ;
201
205
cf . RequestedHeartbeat = _heartbeatTimeout ;
202
206
cf . AutomaticRecoveryEnabled = false ;
203
207
@@ -229,7 +233,7 @@ public async Task TestThatStoppedSocketResultsInHeartbeatTimeout()
229
233
timeoutToxic . Attributes . Timeout = 0 ;
230
234
timeoutToxic . Toxicity = 1.0 ;
231
235
232
- Task < TimeoutToxic > addToxicTask = pm . AddToxicAsync ( timeoutToxic ) ;
236
+ Task < TimeoutToxic > addToxicTask = _toxiproxyManager . AddToxicAsync ( timeoutToxic ) ;
233
237
234
238
await Assert . ThrowsAsync < AlreadyClosedException > ( ( ) =>
235
239
{
@@ -243,11 +247,8 @@ public async Task TestTcpReset_GH1464()
243
247
{
244
248
Skip . IfNot ( AreToxiproxyTestsEnabled , "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test" ) ;
245
249
246
- using var pm = new ToxiproxyManager ( _testDisplayName , IsRunningInCI , IsWindows ) ;
247
- await pm . InitializeAsync ( ) ;
248
-
249
250
ConnectionFactory cf = CreateConnectionFactory ( ) ;
250
- cf . Endpoint = new AmqpTcpEndpoint ( IPAddress . Loopback . ToString ( ) , pm . ProxyPort ) ;
251
+ cf . Endpoint = new AmqpTcpEndpoint ( IPAddress . Loopback . ToString ( ) , _proxyPort ) ;
251
252
cf . RequestedHeartbeat = TimeSpan . FromSeconds ( 5 ) ;
252
253
cf . AutomaticRecoveryEnabled = true ;
253
254
@@ -283,11 +284,11 @@ public async Task TestTcpReset_GH1464()
283
284
resetPeerToxic . Attributes . Timeout = 500 ;
284
285
resetPeerToxic . Toxicity = 1.0 ;
285
286
286
- Task < ResetPeerToxic > addToxicTask = pm . AddToxicAsync ( resetPeerToxic ) ;
287
+ Task < ResetPeerToxic > addToxicTask = _toxiproxyManager . AddToxicAsync ( resetPeerToxic ) ;
287
288
288
289
await Task . WhenAll ( addToxicTask , connectionShutdownTcs . Task ) ;
289
290
290
- await pm . RemoveToxicAsync ( toxicName ) ;
291
+ await _toxiproxyManager . RemoveToxicAsync ( toxicName ) ;
291
292
292
293
await recoveryTask ;
293
294
}
@@ -302,11 +303,8 @@ public async Task TestPublisherConfirmationThrottling()
302
303
const int MaxOutstandingConfirms = 8 ;
303
304
const int BatchSize = MaxOutstandingConfirms * 2 ;
304
305
305
- using var pm = new ToxiproxyManager ( _testDisplayName , IsRunningInCI , IsWindows ) ;
306
- await pm . InitializeAsync ( ) ;
307
-
308
306
ConnectionFactory cf = CreateConnectionFactory ( ) ;
309
- cf . Endpoint = new AmqpTcpEndpoint ( IPAddress . Loopback . ToString ( ) , pm . ProxyPort ) ;
307
+ cf . Endpoint = new AmqpTcpEndpoint ( IPAddress . Loopback . ToString ( ) , _proxyPort ) ;
310
308
cf . RequestedHeartbeat = TimeSpan . FromSeconds ( 5 ) ;
311
309
cf . AutomaticRecoveryEnabled = true ;
312
310
@@ -371,7 +369,7 @@ public async Task TestPublisherConfirmationThrottling()
371
369
372
370
await Task . Delay ( TimeSpan . FromSeconds ( 1 ) ) ;
373
371
374
- Task < BandwidthToxic > addToxicTask = pm . AddToxicAsync ( bandwidthToxic ) ;
372
+ Task < BandwidthToxic > addToxicTask = _toxiproxyManager . AddToxicAsync ( bandwidthToxic ) ;
375
373
376
374
while ( true )
377
375
{
@@ -387,7 +385,7 @@ public async Task TestPublisherConfirmationThrottling()
387
385
}
388
386
389
387
await addToxicTask . WaitAsync ( WaitSpan ) ;
390
- await pm . RemoveToxicAsync ( toxicName ) . WaitAsync ( WaitSpan ) ;
388
+ await _toxiproxyManager . RemoveToxicAsync ( toxicName ) . WaitAsync ( WaitSpan ) ;
391
389
392
390
await messagesPublishedTcs . Task . WaitAsync ( WaitSpan ) ;
393
391
await publishTask . WaitAsync ( WaitSpan ) ;
0 commit comments