Skip to content

Commit 4847c4d

Browse files
committed
* Move more code to Channel.BasicPublish.cs
1 parent 5fa0ea1 commit 4847c4d

File tree

3 files changed

+76
-76
lines changed

3 files changed

+76
-76
lines changed

projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using System.Collections.Generic;
3334
using System.Diagnostics;
3435
using System.Threading;
3536
using System.Threading.Tasks;
3637
using RabbitMQ.Client.Framing;
38+
using RabbitMQ.Client.Util;
3739

3840
namespace RabbitMQ.Client.Impl
3941
{
@@ -148,5 +150,75 @@ await MaybeEndPublisherConfirmationTracking(publisherConfirmationInfo, cancellat
148150
.ConfigureAwait(false);
149151
}
150152
}
153+
154+
private BasicProperties? PopulateBasicPropertiesHeaders<TProperties>(TProperties basicProperties,
155+
Activity? sendActivity, ulong publishSequenceNumber)
156+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
157+
{
158+
/*
159+
* Note: there is nothing to do in this method if *both* of these
160+
* conditions are true:
161+
*
162+
* sendActivity is null - there is no activity to add as a header
163+
* publisher confirmations are NOT enabled
164+
*/
165+
if (sendActivity is null && !_publisherConfirmationsEnabled)
166+
{
167+
return null;
168+
}
169+
170+
bool newHeaders = false;
171+
IDictionary<string, object?>? headers = basicProperties.Headers;
172+
if (headers is null)
173+
{
174+
headers = new Dictionary<string, object?>();
175+
newHeaders = true;
176+
}
177+
MaybeAddActivityToHeaders(headers, basicProperties.CorrelationId, sendActivity);
178+
MaybeAddPublishSequenceNumberToHeaders(headers);
179+
180+
switch (basicProperties)
181+
{
182+
case BasicProperties writableProperties:
183+
if (newHeaders)
184+
{
185+
writableProperties.Headers = headers;
186+
}
187+
return null;
188+
case EmptyBasicProperty:
189+
return new BasicProperties { Headers = headers };
190+
default:
191+
return new BasicProperties(basicProperties) { Headers = headers };
192+
}
193+
194+
void MaybeAddActivityToHeaders(IDictionary<string, object?> headers,
195+
string? correlationId, Activity? sendActivity)
196+
{
197+
if (sendActivity is not null)
198+
{
199+
// This activity is marked as recorded, so let's propagate the trace and span ids.
200+
if (sendActivity.IsAllDataRequested)
201+
{
202+
if (!string.IsNullOrEmpty(correlationId))
203+
{
204+
sendActivity.SetTag(RabbitMQActivitySource.MessageConversationId, correlationId);
205+
}
206+
}
207+
208+
// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
209+
RabbitMQActivitySource.ContextInjector(sendActivity, headers);
210+
}
211+
}
212+
213+
void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers)
214+
{
215+
if (_publisherConfirmationsEnabled)
216+
{
217+
byte[] publishSequenceNumberBytes = new byte[8];
218+
NetworkOrderSerializer.WriteUInt64(ref publishSequenceNumberBytes.GetStart(), publishSequenceNumber);
219+
headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumberBytes;
220+
}
221+
}
222+
}
151223
}
152224
}

projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33-
using System.Buffers.Binary;
3433
using System.Collections.Concurrent;
3534
using System.Collections.Generic;
3635
using System.Diagnostics;
@@ -40,13 +39,12 @@
4039
using RabbitMQ.Client.Events;
4140
using RabbitMQ.Client.Exceptions;
4241
using RabbitMQ.Client.Framing;
42+
using RabbitMQ.Client.Util;
4343

4444
namespace RabbitMQ.Client.Impl
4545
{
4646
internal partial class Channel : IChannel, IRecoverable
4747
{
48-
// private readonly AsyncManualResetEvent _flowControlBlock = new AsyncManualResetEvent(true);
49-
5048
private bool _publisherConfirmationsEnabled = false;
5149
private bool _publisherConfirmationTrackingEnabled = false;
5250
private ulong _nextPublishSeqNo = 0;
@@ -230,9 +228,10 @@ private void HandleReturn(BasicReturnEventArgs basicReturnEvent)
230228
ulong publishSequenceNumber = 0;
231229
IReadOnlyBasicProperties props = basicReturnEvent.BasicProperties;
232230
object? maybeSeqNum = props.Headers?[Constants.PublishSequenceNumberHeader];
233-
if (maybeSeqNum != null)
231+
if (maybeSeqNum != null &&
232+
maybeSeqNum is byte[] seqNumBytes)
234233
{
235-
publishSequenceNumber = BinaryPrimitives.ReadUInt64BigEndian((byte[])maybeSeqNum);
234+
publishSequenceNumber = NetworkOrderDeserializer.ReadUInt64(seqNumBytes);
236235
}
237236

238237
HandleNack(publishSequenceNumber, multiple: false, isReturn: true);

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 0 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
using RabbitMQ.Client.Events;
4343
using RabbitMQ.Client.Exceptions;
4444
using RabbitMQ.Client.Framing;
45-
using RabbitMQ.Client.Util;
4645

4746
namespace RabbitMQ.Client.Impl
4847
{
@@ -1500,76 +1499,6 @@ await ModelSendAsync(in method, k.CancellationToken)
15001499
}
15011500
}
15021501

1503-
private BasicProperties? PopulateBasicPropertiesHeaders<TProperties>(TProperties basicProperties,
1504-
Activity? sendActivity, ulong publishSequenceNumber)
1505-
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
1506-
{
1507-
/*
1508-
* Note: there is nothing to do in this method if *both* of these
1509-
* conditions are true:
1510-
*
1511-
* sendActivity is null - there is no activity to add as a header
1512-
* publisher confirmations are NOT enabled
1513-
*/
1514-
if (sendActivity is null && !_publisherConfirmationsEnabled)
1515-
{
1516-
return null;
1517-
}
1518-
1519-
bool newHeaders = false;
1520-
IDictionary<string, object?>? headers = basicProperties.Headers;
1521-
if (headers is null)
1522-
{
1523-
headers = new Dictionary<string, object?>();
1524-
newHeaders = true;
1525-
}
1526-
MaybeAddActivityToHeaders(headers, basicProperties.CorrelationId, sendActivity);
1527-
MaybeAddPublishSequenceNumberToHeaders(headers);
1528-
1529-
switch (basicProperties)
1530-
{
1531-
case BasicProperties writableProperties:
1532-
if (newHeaders)
1533-
{
1534-
writableProperties.Headers = headers;
1535-
}
1536-
return null;
1537-
case EmptyBasicProperty:
1538-
return new BasicProperties { Headers = headers };
1539-
default:
1540-
return new BasicProperties(basicProperties) { Headers = headers };
1541-
}
1542-
1543-
void MaybeAddActivityToHeaders(IDictionary<string, object?> headers,
1544-
string? correlationId, Activity? sendActivity)
1545-
{
1546-
if (sendActivity is not null)
1547-
{
1548-
// This activity is marked as recorded, so let's propagate the trace and span ids.
1549-
if (sendActivity.IsAllDataRequested)
1550-
{
1551-
if (!string.IsNullOrEmpty(correlationId))
1552-
{
1553-
sendActivity.SetTag(RabbitMQActivitySource.MessageConversationId, correlationId);
1554-
}
1555-
}
1556-
1557-
// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
1558-
RabbitMQActivitySource.ContextInjector(sendActivity, headers);
1559-
}
1560-
}
1561-
1562-
void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers)
1563-
{
1564-
if (_publisherConfirmationsEnabled)
1565-
{
1566-
byte[] publishSequenceNumberBytes = new byte[8];
1567-
NetworkOrderSerializer.WriteUInt64(ref publishSequenceNumberBytes.GetStart(), publishSequenceNumber);
1568-
headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumberBytes;
1569-
}
1570-
}
1571-
}
1572-
15731502
/// <summary>
15741503
/// Returning <c>true</c> from this method means that the command was server-originated,
15751504
/// and handled already.

0 commit comments

Comments
 (0)