Skip to content

Commit d3e48b7

Browse files
committed
Serialize sequence number as string.
Fixes issue raised in #1805
1 parent 85fd42f commit d3e48b7

File tree

3 files changed

+79
-18
lines changed

3 files changed

+79
-18
lines changed

projects/Applications/PublisherConfirms/PublisherConfirms.cs

Lines changed: 67 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33-
using System.Buffers.Binary;
3433
using System.Collections.Generic;
3534
using System.Diagnostics;
3635
using System.Text;
3736
using System.Threading;
3837
using System.Threading.Tasks;
3938
using RabbitMQ.Client;
39+
using RabbitMQ.Client.Exceptions;
4040

4141
const ushort MAX_OUTSTANDING_CONFIRMS = 256;
4242

@@ -124,11 +124,43 @@ async Task PublishMessagesInBatchAsync()
124124
var sw = new Stopwatch();
125125
sw.Start();
126126

127+
channel.BasicReturnAsync += (sender, ea) =>
128+
{
129+
string sequenceNumber = string.Empty;
130+
131+
IReadOnlyBasicProperties props = ea.BasicProperties;
132+
if (props.Headers is not null)
133+
{
134+
object? maybeSeqNum = props.Headers[Constants.PublishSequenceNumberHeader];
135+
if (maybeSeqNum is not null)
136+
{
137+
switch (maybeSeqNum)
138+
{
139+
case byte[] seqNumBytes:
140+
sequenceNumber = Encoding.ASCII.GetString(seqNumBytes);
141+
break;
142+
case string seqNumStr:
143+
sequenceNumber = (string)seqNumStr;
144+
break;
145+
}
146+
}
147+
}
148+
149+
Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number {sequenceNumber} has been basic.return-ed");
150+
151+
return Task.CompletedTask;
152+
};
153+
127154
var publishTasks = new List<ValueTask>();
128155
for (int i = 0; i < MESSAGE_COUNT; i++)
129156
{
157+
string rk = queueName;
158+
if (i % 1000 == 0)
159+
{
160+
rk = Guid.NewGuid().ToString();
161+
}
130162
byte[] body = Encoding.UTF8.GetBytes(i.ToString());
131-
publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body, mandatory: true, basicProperties: props));
163+
publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: rk, body: body, mandatory: true, basicProperties: props));
132164
outstandingMessageCount++;
133165

134166
if (outstandingMessageCount == batchSize)
@@ -139,9 +171,13 @@ async Task PublishMessagesInBatchAsync()
139171
{
140172
await pt;
141173
}
174+
catch (PublishException pex)
175+
{
176+
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, pex.IsReturn: '{pex.IsReturn}', seq no: '{pex.PublishSequenceNumber}'");
177+
}
142178
catch (Exception ex)
143179
{
144-
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'");
180+
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw exception, ex: '{ex}'");
145181
}
146182
}
147183
publishTasks.Clear();
@@ -157,9 +193,13 @@ async Task PublishMessagesInBatchAsync()
157193
{
158194
await pt;
159195
}
196+
catch (PublishException pex)
197+
{
198+
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, pex.IsReturn: '{pex.IsReturn}', seq no: '{pex.PublishSequenceNumber}'");
199+
}
160200
catch (Exception ex)
161201
{
162-
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'");
202+
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw exception, ex: '{ex}'");
163203
}
164204
}
165205
publishTasks.Clear();
@@ -238,20 +278,29 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
238278

239279
channel.BasicReturnAsync += (sender, ea) =>
240280
{
241-
ulong sequenceNumber = 0;
281+
string sequenceNumber = string.Empty;
242282

243283
IReadOnlyBasicProperties props = ea.BasicProperties;
244284
if (props.Headers is not null)
245285
{
246286
object? maybeSeqNum = props.Headers[Constants.PublishSequenceNumberHeader];
247287
if (maybeSeqNum is not null)
248288
{
249-
sequenceNumber = BinaryPrimitives.ReadUInt64BigEndian((byte[])maybeSeqNum);
289+
switch (maybeSeqNum)
290+
{
291+
case byte[] seqNumBytes:
292+
sequenceNumber = Encoding.ASCII.GetString(seqNumBytes);
293+
break;
294+
case string seqNumStr:
295+
sequenceNumber = (string)seqNumStr;
296+
break;
297+
}
250298
}
251299
}
252300

253-
Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number {sequenceNumber} has been basic.return-ed");
254-
return CleanOutstandingConfirms(sequenceNumber, false);
301+
Console.WriteLine($"{DateTime.Now} [INFO] message sequence number '{sequenceNumber}' has been basic.return-ed");
302+
ulong seq = ulong.Parse(sequenceNumber);
303+
return CleanOutstandingConfirms(seq, false);
255304
};
256305

257306
channel.BasicAcksAsync += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
@@ -290,13 +339,21 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
290339
// This will cause a basic.return, for fun
291340
rk = Guid.NewGuid().ToString();
292341
}
342+
343+
var msgProps = new BasicProperties
344+
{
345+
Persistent = true,
346+
Headers = new Dictionary<string, object?>()
347+
};
348+
349+
msgProps.Headers.Add(Constants.PublishSequenceNumberHeader, nextPublishSeqNo.ToString());
350+
293351
(ulong, ValueTask) data =
294-
(nextPublishSeqNo, channel.BasicPublishAsync(exchange: string.Empty, routingKey: rk, body: body, mandatory: true, basicProperties: props));
352+
(nextPublishSeqNo, channel.BasicPublishAsync(exchange: string.Empty, routingKey: rk, body: body, mandatory: true, basicProperties: msgProps));
295353
publishTasks.Add(data);
296354
}
297355

298356
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
299-
// await Task.WhenAll(publishTasks).WaitAsync(cts.Token);
300357
foreach ((ulong SeqNo, ValueTask PublishTask) datum in publishTasks)
301358
{
302359
try

projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
using System.Threading;
3737
using System.Threading.Tasks;
3838
using RabbitMQ.Client.Framing;
39-
using RabbitMQ.Client.Util;
4039

4140
namespace RabbitMQ.Client.Impl
4241
{
@@ -217,9 +216,7 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers
217216
{
218217
if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled)
219218
{
220-
byte[] publishSequenceNumberBytes = new byte[8];
221-
NetworkOrderSerializer.WriteUInt64(ref publishSequenceNumberBytes.GetStart(), publishSequenceNumber);
222-
headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumberBytes;
219+
headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumber.ToString();
223220
}
224221
}
225222
}

projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@
3434
using System.Collections.Generic;
3535
using System.Diagnostics;
3636
using System.Runtime.CompilerServices;
37+
using System.Text;
3738
using System.Threading;
3839
using System.Threading.RateLimiting;
3940
using System.Threading.Tasks;
4041
using RabbitMQ.Client.Events;
4142
using RabbitMQ.Client.Exceptions;
4243
using RabbitMQ.Client.Framing;
43-
using RabbitMQ.Client.Util;
4444

4545
namespace RabbitMQ.Client.Impl
4646
{
@@ -233,10 +233,17 @@ private void HandleReturn(BasicReturnEventArgs basicReturnEvent)
233233
ulong publishSequenceNumber = 0;
234234
IReadOnlyBasicProperties props = basicReturnEvent.BasicProperties;
235235
object? maybeSeqNum = props.Headers?[Constants.PublishSequenceNumberHeader];
236-
if (maybeSeqNum != null &&
237-
maybeSeqNum is byte[] seqNumBytes)
236+
if (maybeSeqNum != null)
238237
{
239-
publishSequenceNumber = NetworkOrderDeserializer.ReadUInt64(seqNumBytes);
238+
switch (maybeSeqNum)
239+
{
240+
case string seqNumString:
241+
publishSequenceNumber = ulong.Parse(seqNumString);
242+
break;
243+
case byte[] seqNumBytes:
244+
publishSequenceNumber = ulong.Parse(Encoding.ASCII.GetString(seqNumBytes));
245+
break;
246+
}
240247
}
241248

242249
HandleNack(publishSequenceNumber, multiple: false, isReturn: true);

0 commit comments

Comments
 (0)