Skip to content

Commit 4a6ae00

Browse files
committed
Track sequence number for basic.return
1 parent 84492c3 commit 4a6ae00

File tree

2 files changed

+36
-5
lines changed

2 files changed

+36
-5
lines changed

projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,14 @@
3737
using System.Threading.Tasks;
3838
using RabbitMQ.Client;
3939

40-
const int MESSAGE_COUNT = 50_000;
40+
// const int MESSAGE_COUNT = 50_000;
41+
const int MESSAGE_COUNT = 5;
4142
bool debug = false;
4243

4344
#pragma warning disable CS8321 // Local function is declared but never used
4445

45-
await PublishMessagesIndividuallyAsync();
46-
await PublishMessagesInBatchAsync();
46+
// await PublishMessagesIndividuallyAsync();
47+
// await PublishMessagesInBatchAsync();
4748
await HandlePublishConfirmsAsynchronously();
4849

4950
static Task<IConnection> CreateConnectionAsync()
@@ -190,6 +191,24 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
190191
}
191192
}
192193

194+
channel.BasicReturnAsync += (sender, ea) =>
195+
{
196+
long sequenceNumber = 0;
197+
198+
IReadOnlyBasicProperties props = ea.BasicProperties;
199+
if (props.Headers is not null)
200+
{
201+
object? maybeSeqNum = props.Headers["x-sequence-number"];
202+
if (maybeSeqNum is not null)
203+
{
204+
sequenceNumber = (long)maybeSeqNum;
205+
}
206+
}
207+
208+
Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number {sequenceNumber} has been basic.return-ed");
209+
return CleanOutstandingConfirms((ulong)sequenceNumber, false);
210+
};
211+
193212
channel.BasicAcksAsync += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
194213
channel.BasicNacksAsync += (sender, ea) =>
195214
{
@@ -220,7 +239,19 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
220239
semaphore.Release();
221240
}
222241

223-
ValueTask pt = channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body);
242+
var props = new BasicProperties
243+
{
244+
Headers = new Dictionary<string, object?>()
245+
{
246+
["x-sequence-number"] = (long)nextPublishSeqNo
247+
}
248+
};
249+
250+
// string rk = queueName;
251+
string rk = Guid.NewGuid().ToString();
252+
ValueTask pt = channel.BasicPublishAsync(exchange: string.Empty, routingKey: rk, body: body,
253+
mandatory: true, basicProperties: props);
254+
224255
publishTasks.Add(pt.AsTask());
225256
}
226257

projects/Test/Applications/PublisherConfirms/PublisherConfirms.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
</PropertyGroup>
1414

1515
<ItemGroup>
16-
<ProjectReference Include="../../../RabbitMQ.Client\RabbitMQ.Client.csproj" />
16+
<ProjectReference Include="../../../RabbitMQ.Client/RabbitMQ.Client.csproj" />
1717
</ItemGroup>
1818

1919
</Project>

0 commit comments

Comments
 (0)