@@ -51,82 +51,87 @@ namespace RabbitMQ.Client.Unit
51
51
[ TestFixture ]
52
52
public class TestAsyncConsumer
53
53
{
54
+ private static readonly byte [ ] body = System . Text . Encoding . UTF8 . GetBytes ( "async-hi" ) ;
55
+ private static readonly ConnectionFactory cf = new ConnectionFactory { DispatchConsumersAsync = true } ;
56
+
54
57
[ Test ]
55
58
public void TestBasicRoundtrip ( )
56
59
{
57
- var cf = new ConnectionFactory { DispatchConsumersAsync = true } ;
58
- using ( IConnection c = cf . CreateConnection ( ) )
59
- using ( IModel m = c . CreateModel ( ) )
60
+ using ( AutoResetEvent are = new AutoResetEvent ( false ) )
60
61
{
61
- QueueDeclareOk q = m . QueueDeclare ( ) ;
62
- IBasicProperties bp = m . CreateBasicProperties ( ) ;
63
- byte [ ] body = System . Text . Encoding . UTF8 . GetBytes ( "async-hi" ) ;
64
- m . BasicPublish ( "" , q . QueueName , bp , body ) ;
65
- var consumer = new AsyncEventingBasicConsumer ( m ) ;
66
- var are = new AutoResetEvent ( false ) ;
67
- consumer . Received += async ( o , a ) =>
62
+ using ( IConnection c = cf . CreateConnection ( ) )
63
+ {
64
+ using ( IModel m = c . CreateModel ( ) )
68
65
{
69
- are . Set ( ) ;
70
- await Task . Yield ( ) ;
71
- } ;
72
- string tag = m . BasicConsume ( q . QueueName , true , consumer ) ;
73
- // ensure we get a delivery
74
- bool waitRes = are . WaitOne ( 2000 ) ;
75
- Assert . IsTrue ( waitRes ) ;
76
- // unsubscribe and ensure no further deliveries
77
- m . BasicCancel ( tag ) ;
78
- m . BasicPublish ( "" , q . QueueName , bp , body ) ;
79
- bool waitResFalse = are . WaitOne ( 2000 ) ;
80
- Assert . IsFalse ( waitResFalse ) ;
66
+ IBasicProperties bp = m . CreateBasicProperties ( ) ;
67
+ QueueDeclareOk q = m . QueueDeclare ( ) ;
68
+ m . BasicPublish ( "" , q . QueueName , bp , body ) ;
69
+ var consumer = new AsyncEventingBasicConsumer ( m ) ;
70
+ consumer . Received += async ( o , a ) =>
71
+ {
72
+ are . Set ( ) ;
73
+ await Task . Yield ( ) ;
74
+ } ;
75
+ string tag = m . BasicConsume ( q . QueueName , true , consumer ) ;
76
+ // ensure we get a delivery
77
+ bool waitRes = are . WaitOne ( 2000 ) ;
78
+ Assert . IsTrue ( waitRes ) ;
79
+ // unsubscribe and ensure no further deliveries
80
+ m . BasicCancel ( tag ) ;
81
+ m . BasicPublish ( "" , q . QueueName , bp , body ) ;
82
+ bool waitResFalse = are . WaitOne ( 2000 ) ;
83
+ Assert . IsFalse ( waitResFalse ) ;
84
+ }
85
+ }
81
86
}
82
87
}
83
88
84
89
[ Test ]
85
90
public void TestBasicRoundtripNoWait ( )
86
91
{
87
- var cf = new ConnectionFactory { DispatchConsumersAsync = true } ;
88
- using ( IConnection c = cf . CreateConnection ( ) )
92
+ using ( AutoResetEvent are = new AutoResetEvent ( false ) )
89
93
{
90
- using ( IModel m = c . CreateModel ( ) )
94
+ using ( IConnection c = cf . CreateConnection ( ) )
91
95
{
92
- QueueDeclareOk q = m . QueueDeclare ( ) ;
93
- IBasicProperties bp = m . CreateBasicProperties ( ) ;
94
- byte [ ] body = System . Text . Encoding . UTF8 . GetBytes ( "async-hi" ) ;
95
- m . BasicPublish ( "" , q . QueueName , bp , body ) ;
96
- var consumer = new AsyncEventingBasicConsumer ( m ) ;
97
- var are = new AutoResetEvent ( false ) ;
98
- consumer . Received += async ( o , a ) =>
99
- {
100
- are . Set ( ) ;
101
- await Task . Yield ( ) ;
102
- } ;
103
- string tag = m . BasicConsume ( q . QueueName , true , consumer ) ;
104
- // ensure we get a delivery
105
- bool waitRes = are . WaitOne ( 5000 ) ;
106
- Assert . IsTrue ( waitRes ) ;
107
- // unsubscribe and ensure no further deliveries
108
- m . BasicCancelNoWait ( tag ) ;
109
- Thread . Sleep ( 1000 ) ;
110
- m . BasicPublish ( "" , q . QueueName , bp , body ) ;
111
- bool waitResFalse = are . WaitOne ( 5000 ) ;
112
- Assert . IsFalse ( waitResFalse ) ;
96
+ using ( IModel m = c . CreateModel ( ) )
97
+ {
98
+ IBasicProperties bp = m . CreateBasicProperties ( ) ;
99
+ QueueDeclareOk q = m . QueueDeclare ( ) ;
100
+ m . BasicPublish ( "" , q . QueueName , bp , body ) ;
101
+ var consumer = new AsyncEventingBasicConsumer ( m ) ;
102
+ consumer . Received += async ( o , a ) =>
103
+ {
104
+ are . Set ( ) ;
105
+ await Task . Yield ( ) ;
106
+ } ;
107
+ string tag = m . BasicConsume ( q . QueueName , true , consumer ) ;
108
+ // ensure we get a delivery
109
+ bool waitRes = are . WaitOne ( 2000 ) ;
110
+ Assert . IsTrue ( waitRes ) ;
111
+ // unsubscribe and ensure no further deliveries
112
+ m . BasicCancelNoWait ( tag ) ;
113
+ Thread . Sleep ( 1000 ) ;
114
+ m . BasicPublish ( "" , q . QueueName , bp , body ) ;
115
+ bool waitResFalse = are . WaitOne ( 2000 ) ;
116
+ Assert . IsFalse ( waitResFalse ) ;
117
+ }
113
118
}
114
119
}
115
120
}
116
121
117
122
[ Test ]
118
123
public void NonAsyncConsumerShouldThrowInvalidOperationException ( )
119
124
{
120
- var cf = new ConnectionFactory { DispatchConsumersAsync = true } ;
121
- using ( IConnection c = cf . CreateConnection ( ) )
122
- using ( IModel m = c . CreateModel ( ) )
125
+ using ( IConnection c = cf . CreateConnection ( ) )
123
126
{
124
- QueueDeclareOk q = m . QueueDeclare ( ) ;
125
- IBasicProperties bp = m . CreateBasicProperties ( ) ;
126
- byte [ ] body = System . Text . Encoding . UTF8 . GetBytes ( "async-hi" ) ;
127
- m . BasicPublish ( "" , q . QueueName , bp , body ) ;
128
- var consumer = new EventingBasicConsumer ( m ) ;
129
- Assert . Throws < InvalidOperationException > ( ( ) => m . BasicConsume ( q . QueueName , false , consumer ) ) ;
127
+ using ( IModel m = c . CreateModel ( ) )
128
+ {
129
+ IBasicProperties bp = m . CreateBasicProperties ( ) ;
130
+ QueueDeclareOk q = m . QueueDeclare ( ) ;
131
+ m . BasicPublish ( "" , q . QueueName , bp , body ) ;
132
+ var consumer = new EventingBasicConsumer ( m ) ;
133
+ Assert . Throws < InvalidOperationException > ( ( ) => m . BasicConsume ( q . QueueName , false , consumer ) ) ;
134
+ }
130
135
}
131
136
}
132
137
}
0 commit comments