Skip to content

Commit d8d00e8

Browse files
committed
Fix the code that adds OTel and publish sequence number headers.
1 parent 857cbde commit d8d00e8

File tree

2 files changed

+46
-69
lines changed

2 files changed

+46
-69
lines changed

projects/RabbitMQ.Client/Impl/ChannelBase.cs

Lines changed: 45 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1885,93 +1885,70 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
18851885
return null;
18861886
}
18871887

1888-
BasicProperties? rv = null;
1889-
if (sendActivity is not null)
1890-
{
1891-
// This activity is marked as recorded, so let's propagate the trace and span ids.
1892-
if (sendActivity.IsAllDataRequested)
1893-
{
1894-
if (!string.IsNullOrEmpty(basicProperties.CorrelationId))
1895-
{
1896-
sendActivity.SetTag(RabbitMQActivitySource.MessageConversationId, basicProperties.CorrelationId);
1897-
}
1898-
}
1888+
var newHeaders = new Dictionary<string, object?>();
1889+
MaybeAddActivityToHeaders(newHeaders, basicProperties.CorrelationId, sendActivity);
1890+
MaybeAddPublishSequenceNumberToHeaders(newHeaders);
18991891

1900-
IDictionary<string, object?>? headers = basicProperties.Headers;
1901-
if (headers is null)
1902-
{
1903-
rv = AddActivityHeaders(basicProperties, sendActivity);
1904-
}
1905-
else
1906-
{
1907-
// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
1908-
RabbitMQActivitySource.ContextInjector(sendActivity, headers);
1909-
rv = null;
1910-
}
1892+
switch (basicProperties)
1893+
{
1894+
case BasicProperties writableProperties:
1895+
MergeHeaders(newHeaders, writableProperties);
1896+
return null;
1897+
case EmptyBasicProperty:
1898+
return new BasicProperties { Headers = newHeaders };
1899+
default:
1900+
return new BasicProperties(basicProperties) { Headers = newHeaders };
19111901
}
19121902

1913-
if (_publisherConfirmationTrackingEnabled)
1903+
static void MergeHeaders(IDictionary<string, object?> newHeaders, BasicProperties props)
19141904
{
1915-
byte[] publishSequenceNumberBytes;
1916-
if (BitConverter.IsLittleEndian)
1917-
{
1918-
publishSequenceNumberBytes = BitConverter.GetBytes(BinaryPrimitives.ReverseEndianness(publishSequenceNumber));
1919-
}
1920-
else
1905+
if (props.Headers is null)
19211906
{
1922-
publishSequenceNumberBytes = BitConverter.GetBytes(publishSequenceNumber);
1923-
}
1924-
1925-
IDictionary<string, object?>? headers = basicProperties.Headers;
1926-
if (headers is null)
1927-
{
1928-
rv = AddPublishSequenceNumberHeader(basicProperties, publishSequenceNumberBytes);
1907+
props.Headers = newHeaders;
19291908
}
19301909
else
19311910
{
1932-
headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumberBytes;
1933-
rv = null;
1911+
foreach (KeyValuePair<string, object?> val in newHeaders)
1912+
{
1913+
props.Headers[val.Key] = val.Value;
1914+
}
19341915
}
19351916
}
19361917

1937-
return rv;
1938-
1939-
static BasicProperties? AddActivityHeaders(TProperties basicProperties, Activity sendActivity)
1918+
void MaybeAddActivityToHeaders(IDictionary<string, object?> headers,
1919+
string? correlationId, Activity? sendActivity)
19401920
{
1941-
var headers = new Dictionary<string, object?>();
1942-
1943-
// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
1944-
RabbitMQActivitySource.ContextInjector(sendActivity, headers);
1945-
1946-
switch (basicProperties)
1921+
if (sendActivity is not null)
19471922
{
1948-
case BasicProperties writableProperties:
1949-
writableProperties.Headers = headers;
1950-
return null;
1951-
case EmptyBasicProperty:
1952-
return new BasicProperties { Headers = headers };
1953-
default:
1954-
return new BasicProperties(basicProperties) { Headers = headers };
1923+
// This activity is marked as recorded, so let's propagate the trace and span ids.
1924+
if (sendActivity.IsAllDataRequested)
1925+
{
1926+
if (!string.IsNullOrEmpty(correlationId))
1927+
{
1928+
sendActivity.SetTag(RabbitMQActivitySource.MessageConversationId, correlationId);
1929+
}
1930+
}
1931+
1932+
// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
1933+
RabbitMQActivitySource.ContextInjector(sendActivity, headers);
19551934
}
19561935
}
19571936

1958-
static BasicProperties? AddPublishSequenceNumberHeader(TProperties basicProperties, byte[] publishSequenceNumberBytes)
1937+
void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers)
19591938
{
1960-
var headers = new Dictionary<string, object?>()
1939+
if (_publisherConfirmationTrackingEnabled)
19611940
{
1962-
[Constants.PublishSequenceNumberHeader] = publishSequenceNumberBytes
1963-
};
1964-
1965-
switch (basicProperties)
1966-
{
1967-
case BasicProperties writableProperties:
1968-
writableProperties.Headers = headers;
1969-
return null;
1970-
case EmptyBasicProperty:
1971-
return new BasicProperties { Headers = headers };
1972-
default:
1973-
return new BasicProperties(basicProperties) { Headers = headers };
1941+
byte[] publishSequenceNumberBytes;
1942+
if (BitConverter.IsLittleEndian)
1943+
{
1944+
publishSequenceNumberBytes = BitConverter.GetBytes(BinaryPrimitives.ReverseEndianness(publishSequenceNumber));
1945+
}
1946+
else
1947+
{
1948+
publishSequenceNumberBytes = BitConverter.GetBytes(publishSequenceNumber);
1949+
}
19741950

1951+
headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumberBytes;
19751952
}
19761953
}
19771954
}

projects/Test/SequentialIntegration/TestActivitySource.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera
9999
};
100100

101101
string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
102-
await _channel.BasicPublishAsync("", q.QueueName, true, sendBody);
102+
Assert.True(await _channel.BasicPublishAsync("", q.QueueName, true, sendBody));
103103

104104
await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
105105
Assert.True(await consumerReceivedTcs.Task);

0 commit comments

Comments
 (0)