Skip to content

Commit e50f6ed

Browse files
committed
* Make AmqpManagementParameters internal
1 parent df00074 commit e50f6ed

File tree

6 files changed

+71
-26
lines changed

6 files changed

+71
-26
lines changed

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,9 @@ await _semaphoreClose.WaitAsync()
100100
.ConfigureAwait(false);
101101
try
102102
{
103-
await CloseAllPublishers()
103+
await CloseAllPublishersAsync()
104104
.ConfigureAwait(false);
105-
await CloseAllConsumers()
105+
await CloseAllConsumersAsync()
106106
.ConfigureAwait(false);
107107

108108
_recordingTopologyListener.Clear();
@@ -141,9 +141,12 @@ public override string ToString()
141141
return info;
142142
}
143143

144-
internal Connection? NativeConnection()
144+
internal Connection? NativeConnection
145145
{
146-
return _nativeConnection;
146+
get
147+
{
148+
return _nativeConnection;
149+
}
147150
}
148151

149152
protected override void Dispose(bool disposing)
@@ -165,7 +168,8 @@ protected override void Dispose(bool disposing)
165168
/// <summary>
166169
/// Closes all the publishers. It is called when the connection is closed.
167170
/// </summary>
168-
private async Task CloseAllPublishers()
171+
// TODO cancellation token, parallel?
172+
private async Task CloseAllPublishersAsync()
169173
{
170174
var cloned = new List<IPublisher>(Publishers.Values);
171175
foreach (IPublisher publisher in cloned)
@@ -175,7 +179,8 @@ await publisher.CloseAsync()
175179
}
176180
}
177181

178-
private async Task CloseAllConsumers()
182+
// TODO cancellation token, parallel?
183+
private async Task CloseAllConsumersAsync()
179184
{
180185
var cloned = new List<IConsumer>(Consumers.Values);
181186
foreach (IConsumer consumer in cloned)

RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ namespace RabbitMQ.AMQP.Client.Impl;
1313
/// RabbitMQ uses AMQP end point: "/management" to manage the resources like queues, exchanges, and bindings.
1414
/// The management endpoint works like an HTTP RPC endpoint where the client sends a request to the server
1515
/// </summary>
16-
public class AmqpManagement(AmqpManagementParameters parameters) : AbstractLifeCycle, IManagement
16+
public class AmqpManagement : AbstractLifeCycle, IManagement
1717
{
18+
private readonly AmqpManagementParameters _amqpManagementParameters;
19+
1820
// The requests are stored in a dictionary with the correlationId as the key
1921
// The correlationId is used to match the request with the response
2022
private readonly ConcurrentDictionary<string, TaskCompletionSource<Message>> _requests = new();
@@ -37,6 +39,11 @@ public class AmqpManagement(AmqpManagementParameters parameters) : AbstractLifeC
3739
internal const string Delete = "DELETE";
3840
private const string ReplyTo = "$me";
3941

42+
internal AmqpManagement(AmqpManagementParameters amqpManagementParameters)
43+
{
44+
_amqpManagementParameters = amqpManagementParameters;
45+
}
46+
4047
public IQueueSpecification Queue()
4148
{
4249
ThrowIfClosed();
@@ -94,7 +101,7 @@ public IBindingSpecification Binding()
94101

95102
public ITopologyListener TopologyListener()
96103
{
97-
return parameters.TopologyListener();
104+
return _amqpManagementParameters.TopologyListener();
98105
}
99106

100107
public override async Task OpenAsync()
@@ -106,7 +113,7 @@ public override async Task OpenAsync()
106113

107114
if (_managementSession == null || _managementSession.IsClosed)
108115
{
109-
_managementSession = new Session(parameters.Connection().NativeConnection());
116+
_managementSession = new Session(_amqpManagementParameters.NativeConnection);
110117
}
111118

112119
await EnsureSenderLinkAsync()
@@ -146,7 +153,7 @@ private async Task ProcessResponses()
146153
try
147154
{
148155
while (_managementSession?.IsClosed == false &&
149-
parameters.Connection().NativeConnection()!.IsClosed == false)
156+
_amqpManagementParameters.IsNativeConnectionClosed == false)
150157
{
151158
if (_receiverLink == null)
152159
{
@@ -463,7 +470,7 @@ await _connectionCloseTaskCompletionSource.Task.WaitAsync(closeSpan)
463470
public override string ToString()
464471
{
465472
string info = $"AmqpManagement{{" +
466-
$"AmqpConnection='{parameters.Connection()}', " +
473+
$"AmqpConnection='{_amqpManagementParameters.Connection}', " +
467474
$"Status='{State.ToString()}'" +
468475
$"ReceiverLink closed: {_receiverLink?.IsClosed} " +
469476
$"}}";

RabbitMQ.AMQP.Client/Impl/AmqpManagementParameters.cs

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,59 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
4+
15
namespace RabbitMQ.AMQP.Client.Impl;
26

3-
public class AmqpManagementParameters(AmqpConnection connection)
7+
internal class AmqpManagementParameters
48
{
9+
private readonly AmqpConnection _amqpConnection;
10+
511
private RecordingTopologyListener _topologyListener = null!;
612

7-
public AmqpManagementParameters TopologyListener(RecordingTopologyListener topologyListener)
13+
internal AmqpManagementParameters(AmqpConnection amqpConnection)
14+
{
15+
_amqpConnection = amqpConnection;
16+
}
17+
18+
internal AmqpManagementParameters TopologyListener(RecordingTopologyListener topologyListener)
819
{
920
_topologyListener = topologyListener;
1021
return this;
1122
}
1223

24+
internal AmqpConnection Connection
25+
{
26+
get
27+
{
28+
return _amqpConnection;
29+
}
30+
}
31+
32+
internal Amqp.Connection? NativeConnection
33+
{
34+
get
35+
{
36+
return _amqpConnection.NativeConnection;
37+
}
38+
}
1339

14-
public AmqpConnection Connection()
40+
internal bool IsNativeConnectionClosed
1541
{
16-
return connection;
42+
get
43+
{
44+
if (_amqpConnection.NativeConnection is null)
45+
{
46+
// TODO create "internal bug" exception type?
47+
throw new InvalidOperationException("NativeConnection is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
48+
}
49+
else
50+
{
51+
return _amqpConnection.NativeConnection.IsClosed;
52+
}
53+
}
1754
}
1855

19-
public RecordingTopologyListener TopologyListener()
56+
internal RecordingTopologyListener TopologyListener()
2057
{
2158
return _topologyListener;
2259
}

RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
using Amqp;
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
4+
5+
using Amqp;
26
using Amqp.Framing;
37
using Trace = Amqp.Trace;
48
using TraceLevel = Amqp.TraceLevel;

RabbitMQ.AMQP.Client/Impl/AmqpSessionManagement.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public Session GetOrCreateSession()
1414
return Sessions.First();
1515
}
1616

17-
var session = new Session(amqpConnection.NativeConnection());
17+
var session = new Session(amqpConnection.NativeConnection);
1818
Sessions.Add(session);
1919
return session;
2020
}

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -238,22 +238,15 @@ RabbitMQ.AMQP.Client.Impl.AmqpExchangeSpecification.Name(string! name) -> Rabbit
238238
RabbitMQ.AMQP.Client.Impl.AmqpExchangeSpecification.Type(RabbitMQ.AMQP.Client.ExchangeType type) -> RabbitMQ.AMQP.Client.IExchangeSpecification!
239239
RabbitMQ.AMQP.Client.Impl.AmqpExchangeSpecification.Type(string! type) -> RabbitMQ.AMQP.Client.IExchangeSpecification!
240240
RabbitMQ.AMQP.Client.Impl.AmqpManagement
241-
RabbitMQ.AMQP.Client.Impl.AmqpManagement.AmqpManagement(RabbitMQ.AMQP.Client.Impl.AmqpManagementParameters! parameters) -> void
242241
RabbitMQ.AMQP.Client.Impl.AmqpManagement.Binding() -> RabbitMQ.AMQP.Client.IBindingSpecification!
243242
RabbitMQ.AMQP.Client.Impl.AmqpManagement.Exchange() -> RabbitMQ.AMQP.Client.IExchangeSpecification!
244243
RabbitMQ.AMQP.Client.Impl.AmqpManagement.Exchange(string! name) -> RabbitMQ.AMQP.Client.IExchangeSpecification!
245244
RabbitMQ.AMQP.Client.Impl.AmqpManagement.GetQueueInfoAsync(RabbitMQ.AMQP.Client.IQueueSpecification! queueSpec, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IQueueInfo!>!
246245
RabbitMQ.AMQP.Client.Impl.AmqpManagement.GetQueueInfoAsync(string! queueName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IQueueInfo!>!
247-
RabbitMQ.AMQP.Client.Impl.AmqpManagement.HandleResponseMessage(Amqp.Message! msg) -> void
248246
RabbitMQ.AMQP.Client.Impl.AmqpManagement.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification!
249247
RabbitMQ.AMQP.Client.Impl.AmqpManagement.Queue(RabbitMQ.AMQP.Client.Impl.QueueSpec! spec) -> RabbitMQ.AMQP.Client.IQueueSpecification!
250248
RabbitMQ.AMQP.Client.Impl.AmqpManagement.Queue(string! name) -> RabbitMQ.AMQP.Client.IQueueSpecification!
251249
RabbitMQ.AMQP.Client.Impl.AmqpManagement.TopologyListener() -> RabbitMQ.AMQP.Client.ITopologyListener!
252-
RabbitMQ.AMQP.Client.Impl.AmqpManagementParameters
253-
RabbitMQ.AMQP.Client.Impl.AmqpManagementParameters.AmqpManagementParameters(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection) -> void
254-
RabbitMQ.AMQP.Client.Impl.AmqpManagementParameters.Connection() -> RabbitMQ.AMQP.Client.Impl.AmqpConnection!
255-
RabbitMQ.AMQP.Client.Impl.AmqpManagementParameters.TopologyListener() -> RabbitMQ.AMQP.Client.Impl.RecordingTopologyListener!
256-
RabbitMQ.AMQP.Client.Impl.AmqpManagementParameters.TopologyListener(RabbitMQ.AMQP.Client.Impl.RecordingTopologyListener! topologyListener) -> RabbitMQ.AMQP.Client.Impl.AmqpManagementParameters!
257250
RabbitMQ.AMQP.Client.Impl.AmqpMessage
258251
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage() -> void
259252
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(Amqp.Message! nativeMessage) -> void
@@ -580,4 +573,3 @@ static readonly RabbitMQ.AMQP.Client.SaslMechanism.External -> RabbitMQ.AMQP.Cli
580573
static readonly RabbitMQ.AMQP.Client.SaslMechanism.Plain -> RabbitMQ.AMQP.Client.SaslMechanism!
581574
virtual RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.Dispose(bool disposing) -> void
582575
virtual RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.OpenAsync() -> System.Threading.Tasks.Task!
583-
virtual RabbitMQ.AMQP.Client.Impl.AmqpManagement.InternalSendAsync(Amqp.Message! message, System.TimeSpan timeout) -> System.Threading.Tasks.Task!

0 commit comments

Comments
 (0)