@@ -37,9 +37,6 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
37
37
private readonly ConnectionSettings _connectionSettings ;
38
38
private readonly IMetricsReporter ? _metricsReporter ;
39
39
40
- // TODO this is coupled with publishers and consumers
41
- internal readonly AmqpSessionManagement _nativePubSubSessions ;
42
-
43
40
private readonly Dictionary < string , object > _connectionProperties = new ( ) ;
44
41
private bool _areFilterExpressionsSupported = false ;
45
42
@@ -59,47 +56,12 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
59
56
/// </summary>
60
57
private readonly ConcurrentDictionary < Guid , IConsumer > _consumersDict = new ( ) ;
61
58
62
- // TODO this couples AmqpConnection with AmqpPublisher, yuck
63
- internal void AddPublisher ( Guid id , IPublisher consumer )
64
- {
65
- if ( false == _publishersDict . TryAdd ( id , consumer ) )
66
- {
67
- // TODO create "internal bug" exception type?
68
- throw new InvalidOperationException ( "could not add publisher, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues" ) ;
69
- }
70
- }
71
-
72
- internal void RemovePublisher ( Guid id )
73
- {
74
- if ( false == _publishersDict . TryRemove ( id , out _ ) )
75
- {
76
- // TODO create "internal bug" exception type?
77
- throw new InvalidOperationException ( "could not remove publisher, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues" ) ;
78
- }
79
- }
80
-
81
- // TODO this couples AmqpConnection with AmqpConsumer, yuck
82
- internal void AddConsumer ( Guid id , IConsumer consumer )
83
- {
84
- if ( false == _consumersDict . TryAdd ( id , consumer ) )
85
- {
86
- // TODO create "internal bug" exception type?
87
- throw new InvalidOperationException ( "could not add consumer, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues" ) ;
88
- }
89
- }
90
-
91
- internal void RemoveConsumer ( Guid id )
92
- {
93
- if ( false == _consumersDict . TryRemove ( id , out _ ) )
94
- {
95
- // TODO create "internal bug" exception type?
96
- throw new InvalidOperationException ( "could not remove consumer, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues" ) ;
97
- }
98
- }
99
-
100
59
private readonly TaskCompletionSource < bool > _connectionClosedTcs =
101
60
Utils . CreateTaskCompletionSource < bool > ( ) ;
102
61
62
+ // TODO this is coupled with publishers and consumers
63
+ internal readonly AmqpSessionManagement _nativePubSubSessions ;
64
+
103
65
public IRpcServerBuilder RpcServerBuilder ( )
104
66
{
105
67
return new AmqpRpcServerBuilder ( this ) ;
@@ -164,6 +126,13 @@ public IConsumerBuilder ConsumerBuilder()
164
126
return new AmqpConsumerBuilder ( this , _metricsReporter ) ;
165
127
}
166
128
129
+ public IPublisherBuilder PublisherBuilder ( )
130
+ {
131
+ ThrowIfClosed ( ) ;
132
+ var publisherBuilder = new AmqpPublisherBuilder ( this , _metricsReporter ) ;
133
+ return publisherBuilder ;
134
+ }
135
+
167
136
// TODO cancellation token
168
137
public override async Task OpenAsync ( )
169
138
{
@@ -174,13 +143,6 @@ await base.OpenAsync()
174
143
. ConfigureAwait ( false ) ;
175
144
}
176
145
177
- public IPublisherBuilder PublisherBuilder ( )
178
- {
179
- ThrowIfClosed ( ) ;
180
- var publisherBuilder = new AmqpPublisherBuilder ( this , _metricsReporter ) ;
181
- return publisherBuilder ;
182
- }
183
-
184
146
public override async Task CloseAsync ( )
185
147
{
186
148
await _semaphoreClose . WaitAsync ( )
@@ -229,10 +191,6 @@ public override string ToString()
229
191
return info ;
230
192
}
231
193
232
- internal Connection ? NativeConnection => _nativeConnection ;
233
-
234
- internal bool AreFilterExpressionsSupported => _areFilterExpressionsSupported ;
235
-
236
194
protected override void Dispose ( bool disposing )
237
195
{
238
196
if ( disposing )
@@ -250,6 +208,48 @@ protected override void Dispose(bool disposing)
250
208
base . Dispose ( disposing ) ;
251
209
}
252
210
211
+ internal Connection ? NativeConnection => _nativeConnection ;
212
+
213
+ internal bool AreFilterExpressionsSupported => _areFilterExpressionsSupported ;
214
+
215
+ // TODO this couples AmqpConnection with AmqpPublisher, yuck
216
+ internal void AddPublisher ( Guid id , IPublisher consumer )
217
+ {
218
+ if ( false == _publishersDict . TryAdd ( id , consumer ) )
219
+ {
220
+ // TODO create "internal bug" exception type?
221
+ throw new InvalidOperationException ( "could not add publisher, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues" ) ;
222
+ }
223
+ }
224
+
225
+ internal void RemovePublisher ( Guid id )
226
+ {
227
+ if ( false == _publishersDict . TryRemove ( id , out _ ) )
228
+ {
229
+ // TODO create "internal bug" exception type?
230
+ throw new InvalidOperationException ( "could not remove publisher, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues" ) ;
231
+ }
232
+ }
233
+
234
+ // TODO this couples AmqpConnection with AmqpConsumer, yuck
235
+ internal void AddConsumer ( Guid id , IConsumer consumer )
236
+ {
237
+ if ( false == _consumersDict . TryAdd ( id , consumer ) )
238
+ {
239
+ // TODO create "internal bug" exception type?
240
+ throw new InvalidOperationException ( "could not add consumer, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues" ) ;
241
+ }
242
+ }
243
+
244
+ internal void RemoveConsumer ( Guid id )
245
+ {
246
+ if ( false == _consumersDict . TryRemove ( id , out _ ) )
247
+ {
248
+ // TODO create "internal bug" exception type?
249
+ throw new InvalidOperationException ( "could not remove consumer, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues" ) ;
250
+ }
251
+ }
252
+
253
253
/// <summary>
254
254
/// Closes all the publishers. It is called when the connection is closed.
255
255
/// </summary>
@@ -619,71 +619,4 @@ private void HandleProperties(Fields properties)
619
619
_areFilterExpressionsSupported = Utils . SupportsFilterExpressions ( brokerVersion ) ;
620
620
}
621
621
}
622
-
623
- internal class Visitor : IVisitor
624
- {
625
- private readonly AmqpManagement _management ;
626
-
627
- internal Visitor ( AmqpManagement management )
628
- {
629
- _management = management ;
630
- }
631
-
632
- public async Task VisitQueuesAsync ( IEnumerable < QueueSpec > queueSpec )
633
- {
634
- // TODO this could be done in parallel
635
- foreach ( QueueSpec spec in queueSpec )
636
- {
637
- Trace . WriteLine ( TraceLevel . Information , $ "Recovering queue { spec . QueueName } ") ;
638
- try
639
- {
640
- await _management . Queue ( spec ) . DeclareAsync ( )
641
- . ConfigureAwait ( false ) ;
642
- }
643
- catch ( Exception e )
644
- {
645
- Trace . WriteLine ( TraceLevel . Error ,
646
- $ "Error recovering queue { spec . QueueName } . Error: { e } . Management Status: { _management } ") ;
647
- }
648
- }
649
- }
650
-
651
- public async Task VisitExchangesAsync ( IEnumerable < ExchangeSpec > exchangeSpec )
652
- {
653
- // TODO this could be done in parallel
654
- foreach ( ExchangeSpec spec in exchangeSpec )
655
- {
656
- Trace . WriteLine ( TraceLevel . Information , $ "Recovering exchange { spec . ExchangeName } ") ;
657
- try
658
- {
659
- await _management . Exchange ( spec ) . DeclareAsync ( )
660
- . ConfigureAwait ( false ) ;
661
- }
662
- catch ( Exception e )
663
- {
664
- Trace . WriteLine ( TraceLevel . Error ,
665
- $ "Error recovering exchange { spec . ExchangeName } . Error: { e } . Management Status: { _management } ") ;
666
- }
667
- }
668
- }
669
-
670
- public async Task VisitBindingsAsync ( IEnumerable < BindingSpec > bindingSpec )
671
- {
672
- // TODO this could be done in parallel
673
- foreach ( BindingSpec spec in bindingSpec )
674
- {
675
- Trace . WriteLine ( TraceLevel . Information , $ "Recovering binding { spec . BindingPath } ") ;
676
- try
677
- {
678
- await _management . Binding ( spec ) . BindAsync ( )
679
- . ConfigureAwait ( false ) ;
680
- }
681
- catch ( Exception e )
682
- {
683
- Trace . WriteLine ( TraceLevel . Error ,
684
- $ "Error recovering binding { spec . BindingPath } . Error: { e } . Management Status: { _management } ") ;
685
- }
686
- }
687
- }
688
- }
689
622
}
0 commit comments