1
1
using RabbitMQ . Client ;
2
+ using RabbitMQ . Client . Exceptions ;
2
3
using System . Diagnostics ;
3
4
using System . Text ;
4
5
5
- const int MESSAGE_COUNT = 50_000 ;
6
+ const int MessageCount = 50_000 ;
7
+ const int MaxOutstandingConfirms = 128 ;
8
+
9
+ var channelOptions = new CreateChannelOptions
10
+ {
11
+ PublisherConfirmationsEnabled = true ,
12
+ PublisherConfirmationTrackingEnabled = true ,
13
+ OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter ( MaxOutstandingConfirms )
14
+ } ;
15
+
16
+ var props = new BasicProperties
17
+ {
18
+ Persistent = true
19
+ } ;
6
20
7
21
await PublishMessagesIndividuallyAsync ( ) ;
8
22
await PublishMessagesInBatchAsync ( ) ;
@@ -14,110 +28,140 @@ static Task<IConnection> CreateConnectionAsync()
14
28
return factory . CreateConnectionAsync ( ) ;
15
29
}
16
30
17
- static async Task PublishMessagesIndividuallyAsync ( )
31
+ async Task PublishMessagesIndividuallyAsync ( )
18
32
{
19
- Console . WriteLine ( $ "{ DateTime . Now } [INFO] publishing { MESSAGE_COUNT : N0} messages individually " +
20
- "and handling confirms all at once " ) ;
33
+ Console . WriteLine ( $ "{ DateTime . Now } [INFO] publishing { MessageCount : N0} messages individually " +
34
+ "and handling confirms individually (i.e., the slowest way) " ) ;
21
35
22
36
using IConnection connection = await CreateConnectionAsync ( ) ;
23
- using IChannel channel = await connection . CreateChannelAsync ( ) ;
37
+ using IChannel channel = await connection . CreateChannelAsync ( channelOptions ) ;
24
38
25
39
// declare a server-named queue
26
40
QueueDeclareOk queueDeclareResult = await channel . QueueDeclareAsync ( ) ;
27
41
string queueName = queueDeclareResult . QueueName ;
28
- await channel . ConfirmSelectAsync ( ) ;
29
42
30
43
var sw = new Stopwatch ( ) ;
31
44
sw . Start ( ) ;
32
45
33
- for ( int i = 0 ; i < MESSAGE_COUNT ; i ++ )
46
+ for ( int i = 0 ; i < MessageCount ; i ++ )
34
47
{
35
48
byte [ ] body = Encoding . UTF8 . GetBytes ( i . ToString ( ) ) ;
36
- await channel . BasicPublishAsync ( exchange : string . Empty , routingKey : queueName , body : body ) ;
49
+ try
50
+ {
51
+ await channel . BasicPublishAsync ( exchange : string . Empty , routingKey : queueName , body : body ,
52
+ mandatory : true , basicProperties : props ) ;
53
+ }
54
+ catch ( PublishException pubEx )
55
+ {
56
+ Console . Error . WriteLine ( "{0} [ERROR] publish exception: {1}" , DateTime . Now , pubEx ) ;
57
+ }
58
+ catch ( Exception ex )
59
+ {
60
+ Console . Error . WriteLine ( "{0} [ERROR] other exception: {1}" , DateTime . Now , ex ) ;
61
+ }
37
62
}
38
63
39
- await channel . WaitForConfirmsOrDieAsync ( ) ;
40
-
41
64
sw . Stop ( ) ;
42
65
43
- Console . WriteLine ( $ "{ DateTime . Now } [INFO] published { MESSAGE_COUNT : N0} messages individually " +
66
+ Console . WriteLine ( $ "{ DateTime . Now } [INFO] published { MessageCount : N0} messages individually " +
44
67
$ "in { sw . ElapsedMilliseconds : N0} ms") ;
45
68
}
46
69
47
- static async Task PublishMessagesInBatchAsync ( )
70
+ async Task PublishMessagesInBatchAsync ( )
48
71
{
49
- Console . WriteLine ( $ "{ DateTime . Now } [INFO] publishing { MESSAGE_COUNT : N0} messages and handling " +
72
+ Console . WriteLine ( $ "{ DateTime . Now } [INFO] publishing { MessageCount : N0} messages and handling " +
50
73
$ "confirms in batches") ;
51
74
52
75
using IConnection connection = await CreateConnectionAsync ( ) ;
53
- using IChannel channel = await connection . CreateChannelAsync ( ) ;
76
+ using IChannel channel = await connection . CreateChannelAsync ( channelOptions ) ;
54
77
55
78
// declare a server-named queue
56
79
QueueDeclareOk queueDeclareResult = await channel . QueueDeclareAsync ( ) ;
57
80
string queueName = queueDeclareResult . QueueName ;
58
- await channel . ConfirmSelectAsync ( ) ;
59
81
60
- int batchSize = 100 ;
82
+ /*
83
+ * Note: since throttling happens when 50% of the outstanding confirms are reached,
84
+ * each batch size should not be greater than this value
85
+ */
86
+ int batchSize = MaxOutstandingConfirms / 2 ;
61
87
int outstandingMessageCount = 0 ;
62
88
63
89
var sw = new Stopwatch ( ) ;
64
90
sw . Start ( ) ;
65
91
66
- var publishTasks = new List < Task > ( ) ;
67
- for ( int i = 0 ; i < MESSAGE_COUNT ; i ++ )
92
+ static async Task AwaitPublishTasks ( IEnumerable < ValueTask > publishTasks )
93
+ {
94
+ foreach ( ValueTask pt in publishTasks )
95
+ {
96
+ try
97
+ {
98
+ await pt ;
99
+ }
100
+ catch ( PublishException pubEx )
101
+ {
102
+ Console . Error . WriteLine ( "{0} [ERROR] publish exception: {1}" , DateTime . Now , pubEx ) ;
103
+ }
104
+ catch ( Exception ex )
105
+ {
106
+ Console . Error . WriteLine ( "{0} [ERROR] other exception: {1}" , DateTime . Now , ex ) ;
107
+ }
108
+ }
109
+ }
110
+
111
+ var publishTasks = new List < ValueTask > ( ) ;
112
+ for ( int i = 0 ; i < MessageCount ; i ++ )
68
113
{
69
114
byte [ ] body = Encoding . UTF8 . GetBytes ( i . ToString ( ) ) ;
70
- var pt = channel . BasicPublishAsync ( exchange : string . Empty ,
71
- routingKey : queueName , body : body ) ;
72
- publishTasks . Add ( pt . AsTask ( ) ) ;
115
+
116
+ var pt0 = channel . BasicPublishAsync ( exchange : string . Empty , routingKey : queueName , body : body ,
117
+ mandatory : true , basicProperties : props ) ;
118
+ publishTasks . Add ( pt0 ) ;
119
+
73
120
outstandingMessageCount ++ ;
74
121
75
122
if ( outstandingMessageCount == batchSize )
76
123
{
77
- using var cts = new CancellationTokenSource ( TimeSpan . FromSeconds ( 5 ) ) ;
78
- await Task . WhenAll ( publishTasks ) . WaitAsync ( cts . Token ) ;
124
+ await AwaitPublishTasks ( publishTasks ) ;
79
125
publishTasks . Clear ( ) ;
80
-
81
- await channel . WaitForConfirmsOrDieAsync ( cts . Token ) ;
82
126
outstandingMessageCount = 0 ;
83
127
}
84
128
}
85
129
86
- if ( outstandingMessageCount > 0 )
130
+ if ( publishTasks . Count > 0 )
87
131
{
88
- using var cts = new CancellationTokenSource ( TimeSpan . FromSeconds ( 5 ) ) ;
89
- await channel . WaitForConfirmsOrDieAsync ( cts . Token ) ;
132
+ await AwaitPublishTasks ( publishTasks ) ;
90
133
}
91
134
92
135
sw . Stop ( ) ;
93
- Console . WriteLine ( $ "{ DateTime . Now } [INFO] published { MESSAGE_COUNT : N0} messages in batch in " +
136
+ Console . WriteLine ( $ "{ DateTime . Now } [INFO] published { MessageCount : N0} messages in batch in " +
94
137
$ "{ sw . ElapsedMilliseconds : N0} ms") ;
95
138
}
96
139
97
140
async Task HandlePublishConfirmsAsynchronously ( )
98
141
{
99
- Console . WriteLine ( $ "{ DateTime . Now } [INFO] publishing { MESSAGE_COUNT : N0} messages and " +
142
+ Console . WriteLine ( $ "{ DateTime . Now } [INFO] publishing { MessageCount : N0} messages and " +
100
143
$ "handling confirms asynchronously") ;
101
144
145
+ // NOTE: setting trackConfirmations to false because this program
146
+ // is tracking them itself.
147
+ channelOptions . PublisherConfirmationTrackingEnabled = false ;
148
+ channelOptions . OutstandingPublisherConfirmationsRateLimiter = null ;
149
+
102
150
using IConnection connection = await CreateConnectionAsync ( ) ;
103
- using IChannel channel = await connection . CreateChannelAsync ( ) ;
151
+ using IChannel channel = await connection . CreateChannelAsync ( channelOptions ) ;
104
152
105
153
// declare a server-named queue
106
154
QueueDeclareOk queueDeclareResult = await channel . QueueDeclareAsync ( ) ;
107
155
string queueName = queueDeclareResult . QueueName ;
108
156
109
- // NOTE: setting trackConfirmations to false because this program
110
- // is tracking them itself.
111
- await channel . ConfirmSelectAsync ( trackConfirmations : false ) ;
112
-
113
157
bool publishingCompleted = false ;
114
158
var allMessagesConfirmedTcs =
115
159
new TaskCompletionSource < bool > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
116
160
var outstandingConfirms = new LinkedList < ulong > ( ) ;
117
161
var semaphore = new SemaphoreSlim ( 1 , 1 ) ;
118
- void CleanOutstandingConfirms ( ulong deliveryTag , bool multiple )
162
+ async Task CleanOutstandingConfirms ( ulong deliveryTag , bool multiple )
119
163
{
120
- semaphore . Wait ( ) ;
164
+ await semaphore . WaitAsync ( ) ;
121
165
try
122
166
{
123
167
if ( multiple )
@@ -155,23 +199,25 @@ void CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
155
199
}
156
200
}
157
201
158
- channel . BasicAcks += ( sender , ea ) => CleanOutstandingConfirms ( ea . DeliveryTag , ea . Multiple ) ;
159
- channel . BasicNacks += ( sender , ea ) =>
202
+ channel . BasicAcksAsync += ( sender , ea ) =>
203
+ CleanOutstandingConfirms ( ea . DeliveryTag , ea . Multiple ) ;
204
+
205
+ channel . BasicNacksAsync += ( sender , ea ) =>
160
206
{
161
207
Console . WriteLine ( $ "{ DateTime . Now } [WARNING] message sequence number: { ea . DeliveryTag } " +
162
208
$ "has been nacked (multiple: { ea . Multiple } )") ;
163
- CleanOutstandingConfirms ( ea . DeliveryTag , ea . Multiple ) ;
209
+ return CleanOutstandingConfirms ( ea . DeliveryTag , ea . Multiple ) ;
164
210
} ;
165
211
166
212
var sw = new Stopwatch ( ) ;
167
213
sw . Start ( ) ;
168
214
169
215
var publishTasks = new List < ValueTask > ( ) ;
170
- for ( int i = 0 ; i < MESSAGE_COUNT ; i ++ )
216
+ for ( int i = 0 ; i < MessageCount ; i ++ )
171
217
{
172
218
string msg = i . ToString ( ) ;
173
219
byte [ ] body = Encoding . UTF8 . GetBytes ( msg ) ;
174
- ulong nextPublishSeqNo = channel . NextPublishSeqNo ;
220
+ ulong nextPublishSeqNo = await channel . GetNextPublishSequenceNumberAsync ( ) ;
175
221
if ( ( ulong ) ( i + 1 ) != nextPublishSeqNo )
176
222
{
177
223
Console . WriteLine ( $ "{ DateTime . Now } [WARNING] i { i + 1 } does not equal next sequence " +
@@ -215,6 +261,6 @@ void CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
215
261
}
216
262
217
263
sw . Stop ( ) ;
218
- Console . WriteLine ( $ "{ DateTime . Now } [INFO] published { MESSAGE_COUNT : N0} messages and handled " +
264
+ Console . WriteLine ( $ "{ DateTime . Now } [INFO] published { MessageCount : N0} messages and handled " +
219
265
$ "confirm asynchronously { sw . ElapsedMilliseconds : N0} ms") ;
220
266
}
0 commit comments