Skip to content

Commit 5fa0ea1

Browse files
committed
* Move BasicPublishAsync to its own file.
1 parent d7aa279 commit 5fa0ea1

File tree

2 files changed

+152
-110
lines changed

2 files changed

+152
-110
lines changed
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System;
33+
using System.Diagnostics;
34+
using System.Threading;
35+
using System.Threading.Tasks;
36+
using RabbitMQ.Client.Framing;
37+
38+
namespace RabbitMQ.Client.Impl
39+
{
40+
internal partial class Channel : IChannel, IRecoverable
41+
{
42+
public async ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
43+
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
44+
CancellationToken cancellationToken = default)
45+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
46+
{
47+
PublisherConfirmationInfo? publisherConfirmationInfo = null;
48+
try
49+
{
50+
publisherConfirmationInfo =
51+
await MaybeStartPublisherConfirmationTracking(cancellationToken)
52+
.ConfigureAwait(false);
53+
54+
await EnforceFlowControlAsync(cancellationToken)
55+
.ConfigureAwait(false);
56+
57+
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
58+
59+
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
60+
? RabbitMQActivitySource.Send(routingKey, exchange, body.Length)
61+
: default;
62+
63+
ulong publishSequenceNumber = 0;
64+
if (publisherConfirmationInfo is not null)
65+
{
66+
publishSequenceNumber = publisherConfirmationInfo.PublishSequenceNumber;
67+
}
68+
69+
BasicProperties? props = PopulateBasicPropertiesHeaders(basicProperties, sendActivity, publishSequenceNumber);
70+
if (props is null)
71+
{
72+
await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken)
73+
.ConfigureAwait(false);
74+
}
75+
else
76+
{
77+
await ModelSendAsync(in cmd, in props, body, cancellationToken)
78+
.ConfigureAwait(false);
79+
}
80+
}
81+
catch (Exception ex)
82+
{
83+
bool exceptionWasHandled =
84+
MaybeHandleExceptionWithEnabledPublisherConfirmations(publisherConfirmationInfo, ex);
85+
if (!exceptionWasHandled)
86+
{
87+
throw;
88+
}
89+
}
90+
finally
91+
{
92+
await MaybeEndPublisherConfirmationTracking(publisherConfirmationInfo, cancellationToken)
93+
.ConfigureAwait(false);
94+
}
95+
}
96+
97+
public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
98+
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
99+
CancellationToken cancellationToken = default)
100+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
101+
{
102+
PublisherConfirmationInfo? publisherConfirmationInfo = null;
103+
try
104+
{
105+
publisherConfirmationInfo =
106+
await MaybeStartPublisherConfirmationTracking(cancellationToken)
107+
.ConfigureAwait(false);
108+
109+
await EnforceFlowControlAsync(cancellationToken)
110+
.ConfigureAwait(false);
111+
112+
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
113+
114+
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
115+
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length)
116+
: default;
117+
118+
ulong publishSequenceNumber = 0;
119+
if (publisherConfirmationInfo is not null)
120+
{
121+
publishSequenceNumber = publisherConfirmationInfo.PublishSequenceNumber;
122+
}
123+
124+
BasicProperties? props = PopulateBasicPropertiesHeaders(basicProperties, sendActivity, publishSequenceNumber);
125+
if (props is null)
126+
{
127+
await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken)
128+
.ConfigureAwait(false);
129+
}
130+
else
131+
{
132+
await ModelSendAsync(in cmd, in props, body, cancellationToken)
133+
.ConfigureAwait(false);
134+
}
135+
}
136+
catch (Exception ex)
137+
{
138+
bool exceptionWasHandled =
139+
MaybeHandleExceptionWithEnabledPublisherConfirmations(publisherConfirmationInfo, ex);
140+
if (!exceptionWasHandled)
141+
{
142+
throw;
143+
}
144+
}
145+
finally
146+
{
147+
await MaybeEndPublisherConfirmationTracking(publisherConfirmationInfo, cancellationToken)
148+
.ConfigureAwait(false);
149+
}
150+
}
151+
}
152+
}

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 0 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -944,116 +944,6 @@ await ModelSendAsync(in method, k.CancellationToken)
944944
}
945945
}
946946

947-
public async ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
948-
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
949-
CancellationToken cancellationToken = default)
950-
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
951-
{
952-
PublisherConfirmationInfo? publisherConfirmationInfo = null;
953-
try
954-
{
955-
publisherConfirmationInfo =
956-
await MaybeStartPublisherConfirmationTracking(cancellationToken)
957-
.ConfigureAwait(false);
958-
959-
await EnforceFlowControlAsync(cancellationToken)
960-
.ConfigureAwait(false);
961-
962-
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
963-
964-
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
965-
? RabbitMQActivitySource.Send(routingKey, exchange, body.Length)
966-
: default;
967-
968-
ulong publishSequenceNumber = 0;
969-
if (publisherConfirmationInfo is not null)
970-
{
971-
publishSequenceNumber = publisherConfirmationInfo.PublishSequenceNumber;
972-
}
973-
974-
BasicProperties? props = PopulateBasicPropertiesHeaders(basicProperties, sendActivity, publishSequenceNumber);
975-
if (props is null)
976-
{
977-
await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken)
978-
.ConfigureAwait(false);
979-
}
980-
else
981-
{
982-
await ModelSendAsync(in cmd, in props, body, cancellationToken)
983-
.ConfigureAwait(false);
984-
}
985-
}
986-
catch (Exception ex)
987-
{
988-
bool exceptionWasHandled =
989-
MaybeHandleExceptionWithEnabledPublisherConfirmations(publisherConfirmationInfo, ex);
990-
if (!exceptionWasHandled)
991-
{
992-
throw;
993-
}
994-
}
995-
finally
996-
{
997-
await MaybeEndPublisherConfirmationTracking(publisherConfirmationInfo, cancellationToken)
998-
.ConfigureAwait(false);
999-
}
1000-
}
1001-
1002-
public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
1003-
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
1004-
CancellationToken cancellationToken = default)
1005-
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
1006-
{
1007-
PublisherConfirmationInfo? publisherConfirmationInfo = null;
1008-
try
1009-
{
1010-
publisherConfirmationInfo =
1011-
await MaybeStartPublisherConfirmationTracking(cancellationToken)
1012-
.ConfigureAwait(false);
1013-
1014-
await EnforceFlowControlAsync(cancellationToken)
1015-
.ConfigureAwait(false);
1016-
1017-
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
1018-
1019-
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
1020-
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length)
1021-
: default;
1022-
1023-
ulong publishSequenceNumber = 0;
1024-
if (publisherConfirmationInfo is not null)
1025-
{
1026-
publishSequenceNumber = publisherConfirmationInfo.PublishSequenceNumber;
1027-
}
1028-
1029-
BasicProperties? props = PopulateBasicPropertiesHeaders(basicProperties, sendActivity, publishSequenceNumber);
1030-
if (props is null)
1031-
{
1032-
await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken)
1033-
.ConfigureAwait(false);
1034-
}
1035-
else
1036-
{
1037-
await ModelSendAsync(in cmd, in props, body, cancellationToken)
1038-
.ConfigureAwait(false);
1039-
}
1040-
}
1041-
catch (Exception ex)
1042-
{
1043-
bool exceptionWasHandled =
1044-
MaybeHandleExceptionWithEnabledPublisherConfirmations(publisherConfirmationInfo, ex);
1045-
if (!exceptionWasHandled)
1046-
{
1047-
throw;
1048-
}
1049-
}
1050-
finally
1051-
{
1052-
await MaybeEndPublisherConfirmationTracking(publisherConfirmationInfo, cancellationToken)
1053-
.ConfigureAwait(false);
1054-
}
1055-
}
1056-
1057947
public async Task UpdateSecretAsync(string newSecret, string reason,
1058948
CancellationToken cancellationToken)
1059949
{

0 commit comments

Comments
 (0)