1
+ // Copyright (C) 2023-2025 RabbitMQ Core Team ([email protected] )
2
+ //
3
+ // Licensed under the Apache License, Version 2.0 (the "License");
4
+ // you may not use this file except in compliance with the License.
5
+ // You may obtain a copy of the License at
6
+ //
7
+ // http://www.apache.org/licenses/LICENSE-2.0
8
+ //
9
+ // Unless required by applicable law or agreed to in writing, software
10
+ // distributed under the License is distributed on an "AS IS" BASIS,
11
+ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
+ // See the License for the specific language governing permissions and
13
+ // limitations under the License.
14
+ use predicates:: prelude:: * ;
15
+ use rabbitmq_http_client:: requests:: { FederationUpstreamParams , QueueFederationParams } ;
16
+
17
+ mod test_helpers;
18
+ use crate :: test_helpers:: { amqp_endpoint_with_vhost, delete_vhost} ;
19
+ use test_helpers:: { run_fails, run_succeeds} ;
20
+
21
+ #[ test]
22
+ fn test_federation_upstream_declaration_for_queue_federation_case0 ( )
23
+ -> Result < ( ) , Box < dyn std:: error:: Error > > {
24
+ let vh = "rust.federation.0" ;
25
+ let name = "up.for_queue_federation" ;
26
+
27
+ let amqp_endpoint = amqp_endpoint_with_vhost ( vh) ;
28
+ let q = "federation.cq.1" ;
29
+ let ctag = "federation.custom-consumer-tag" ;
30
+ let qfp = QueueFederationParams :: new_with_consumer_tag ( q, ctag) ;
31
+ let endpoint1 = amqp_endpoint. clone ( ) ;
32
+ let upstream =
33
+ FederationUpstreamParams :: new_queue_federation_upstream ( vh, name, & endpoint1, qfp) ;
34
+
35
+ run_succeeds ( [ "declare" , "vhost" , "--name" , vh] ) ;
36
+ let qfp = upstream. queue_federation . unwrap ( ) ;
37
+
38
+ run_succeeds ( [
39
+ "-V" ,
40
+ vh,
41
+ "federation" ,
42
+ "declare_upstream_for_queues" ,
43
+ "--name" ,
44
+ & upstream. name ,
45
+ "--uri" ,
46
+ & upstream. uri ,
47
+ "--queue-name" ,
48
+ & q,
49
+ "--consumer-tag" ,
50
+ & qfp. consumer_tag . unwrap ( ) ,
51
+ ] ) ;
52
+
53
+ delete_vhost ( vh) . expect ( "failed to delete a virtual host" ) ;
54
+
55
+ Ok ( ( ) )
56
+ }
57
+
58
+ #[ test]
59
+ fn test_federation_upstream_declaration_for_queue_federation_case1 ( )
60
+ -> Result < ( ) , Box < dyn std:: error:: Error > > {
61
+ let vh = "rust.federation.1" ;
62
+ let name = "up.for_queue_federation" ;
63
+
64
+ let amqp_endpoint = amqp_endpoint_with_vhost ( vh) ;
65
+ let q = "federation.cq.1" ;
66
+ let ctag = "federation.custom-consumer-tag" ;
67
+ let qfp = QueueFederationParams :: new_with_consumer_tag ( q, ctag) ;
68
+ let endpoint1 = amqp_endpoint. clone ( ) ;
69
+ let upstream =
70
+ FederationUpstreamParams :: new_queue_federation_upstream ( vh, name, & endpoint1, qfp) ;
71
+
72
+ run_succeeds ( [ "declare" , "vhost" , "--name" , vh] ) ;
73
+ let qfp = upstream. queue_federation . unwrap ( ) ;
74
+
75
+ run_succeeds ( [
76
+ "-V" ,
77
+ vh,
78
+ "federation" ,
79
+ "declare_upstream_for_queues" ,
80
+ "--name" ,
81
+ & upstream. name ,
82
+ "--uri" ,
83
+ & upstream. uri ,
84
+ "--ack-mode" ,
85
+ "on-confirm" ,
86
+ "--queue-name" ,
87
+ & q,
88
+ "--consumer-tag" ,
89
+ & qfp. consumer_tag . unwrap ( ) ,
90
+ ] ) ;
91
+
92
+ delete_vhost ( vh) . expect ( "failed to delete a virtual host" ) ;
93
+
94
+ Ok ( ( ) )
95
+ }
96
+
97
+ #[ test]
98
+ fn test_federation_upstream_declaration_for_queue_federation_case2 ( )
99
+ -> Result < ( ) , Box < dyn std:: error:: Error > > {
100
+ let vh = "rust.federation.2" ;
101
+ let name = "up.for_queue_federation" ;
102
+
103
+ let amqp_endpoint = amqp_endpoint_with_vhost ( vh) ;
104
+ let q = "federation.cq.2" ;
105
+ let ctag = "federation.custom-consumer-tag.2" ;
106
+ let qfp = QueueFederationParams :: new_with_consumer_tag ( q, ctag) ;
107
+ let endpoint1 = amqp_endpoint. clone ( ) ;
108
+ let upstream =
109
+ FederationUpstreamParams :: new_queue_federation_upstream ( vh, name, & endpoint1, qfp) ;
110
+
111
+ run_succeeds ( [ "declare" , "vhost" , "--name" , vh] ) ;
112
+
113
+ run_succeeds ( [
114
+ "-V" ,
115
+ vh,
116
+ "federation" ,
117
+ "declare_upstream_for_queues" ,
118
+ "--name" ,
119
+ & upstream. name ,
120
+ "--uri" ,
121
+ & upstream. uri ,
122
+ "--ack-mode" ,
123
+ "on-publish"
124
+ ] ) ;
125
+
126
+ delete_vhost ( vh) . expect ( "failed to delete a virtual host" ) ;
127
+
128
+ Ok ( ( ) )
129
+ }
130
+
131
+ #[ test]
132
+ fn test_federation_upstream_declaration_for_queue_federation_case3 ( )
133
+ -> Result < ( ) , Box < dyn std:: error:: Error > > {
134
+ let vh = "rust.federation.3" ;
135
+ let name = "up.for_queue_federation" ;
136
+
137
+ let amqp_endpoint = amqp_endpoint_with_vhost ( vh) ;
138
+ let q = "federation.cq.3" ;
139
+ let ctag = "federation.custom-consumer-tag.3" ;
140
+ let qfp = QueueFederationParams :: new_with_consumer_tag ( q, ctag) ;
141
+ let endpoint1 = amqp_endpoint. clone ( ) ;
142
+ let upstream =
143
+ FederationUpstreamParams :: new_queue_federation_upstream ( vh, name, & endpoint1, qfp) ;
144
+
145
+ run_succeeds ( [ "declare" , "vhost" , "--name" , vh] ) ;
146
+
147
+ // missing --uri
148
+ run_fails ( [
149
+ "-V" ,
150
+ vh,
151
+ "federation" ,
152
+ "declare_upstream_for_queues" ,
153
+ "--name" ,
154
+ & upstream. name ,
155
+ ] ) . stderr ( predicate:: str:: contains (
156
+ "required arguments were not provided" ,
157
+ ) ) ;
158
+
159
+ delete_vhost ( vh) . expect ( "failed to delete a virtual host" ) ;
160
+
161
+ Ok ( ( ) )
162
+ }
163
+
164
+ #[ test]
165
+ fn test_federation_upstream_declaration_for_queue_federation_case4 ( )
166
+ -> Result < ( ) , Box < dyn std:: error:: Error > > {
167
+ let vh = "rust.federation.4" ;
168
+ let name = "up.for_queue_federation" ;
169
+
170
+ let amqp_endpoint = amqp_endpoint_with_vhost ( vh) ;
171
+ let q = "federation.cq.3" ;
172
+ let ctag = "federation.custom-consumer-tag.3" ;
173
+ let qfp = QueueFederationParams :: new_with_consumer_tag ( q, ctag) ;
174
+ let endpoint1 = amqp_endpoint. clone ( ) ;
175
+ let upstream =
176
+ FederationUpstreamParams :: new_queue_federation_upstream ( vh, name, & endpoint1, qfp) ;
177
+
178
+ run_succeeds ( [ "declare" , "vhost" , "--name" , vh] ) ;
179
+
180
+ // missing --name
181
+ run_fails ( [
182
+ "-V" ,
183
+ vh,
184
+ "federation" ,
185
+ "declare_upstream_for_queues" ,
186
+ "--uri" ,
187
+ & upstream. uri ,
188
+ "--ack-mode" ,
189
+ "on-publish"
190
+ ] ) . stderr ( predicate:: str:: contains (
191
+ "required arguments were not provided" ,
192
+ ) ) ;
193
+
194
+ delete_vhost ( vh) . expect ( "failed to delete a virtual host" ) ;
195
+
196
+ Ok ( ( ) )
197
+ }
198
+
199
+ #[ test]
200
+ fn test_federation_list_all_upstreams ( )
201
+ -> Result < ( ) , Box < dyn std:: error:: Error > > {
202
+ let vh = "rust.federation.5" ;
203
+ let name = "up.for_queue_federation/5" ;
204
+
205
+ let amqp_endpoint = amqp_endpoint_with_vhost ( vh) ;
206
+ let q = "federation.cq.5" ;
207
+ let ctag = "federation.custom-consumer-tag" ;
208
+ let qfp = QueueFederationParams :: new_with_consumer_tag ( q, ctag) ;
209
+ let endpoint1 = amqp_endpoint. clone ( ) ;
210
+ let upstream =
211
+ FederationUpstreamParams :: new_queue_federation_upstream ( vh, name, & endpoint1, qfp) ;
212
+
213
+ run_succeeds ( [ "declare" , "vhost" , "--name" , vh] ) ;
214
+ let qfp = upstream. queue_federation . unwrap ( ) ;
215
+
216
+ run_succeeds ( [
217
+ "-V" ,
218
+ vh,
219
+ "federation" ,
220
+ "declare_upstream_for_queues" ,
221
+ "--name" ,
222
+ & upstream. name ,
223
+ "--uri" ,
224
+ & upstream. uri ,
225
+ "--queue-name" ,
226
+ & q,
227
+ "--consumer-tag" ,
228
+ & qfp. consumer_tag . unwrap ( ) ,
229
+ ] ) ;
230
+
231
+ run_succeeds ( [
232
+ "-V" ,
233
+ vh,
234
+ "federation" ,
235
+ "list_all_upstreams" ,
236
+ ] ) . stdout ( predicate:: str:: contains (
237
+ name,
238
+ ) ) . stdout ( predicate:: str:: contains (
239
+ endpoint1. clone ( ) ,
240
+ ) ) . stdout ( predicate:: str:: contains (
241
+ q,
242
+ ) ) . stdout ( predicate:: str:: contains (
243
+ ctag,
244
+ ) ) ;
245
+
246
+ delete_vhost ( vh) . expect ( "failed to delete a virtual host" ) ;
247
+
248
+ Ok ( ( ) )
249
+ }
250
+
251
+ #[ test]
252
+ fn test_federation_delete_an_upstream_with_queue_federation_settings ( )
253
+ -> Result < ( ) , Box < dyn std:: error:: Error > > {
254
+ let vh = "rust.federation.6" ;
255
+ let name = "up.for_queue_federation.6" ;
256
+
257
+ let amqp_endpoint = amqp_endpoint_with_vhost ( vh) ;
258
+ let q = "federation.cq.6" ;
259
+ let ctag = "federation.custom-consumer-tag.6" ;
260
+ let qfp = QueueFederationParams :: new_with_consumer_tag ( q, ctag) ;
261
+ let endpoint1 = amqp_endpoint. clone ( ) ;
262
+ let upstream =
263
+ FederationUpstreamParams :: new_queue_federation_upstream ( vh, name, & endpoint1, qfp) ;
264
+
265
+ run_succeeds ( [ "declare" , "vhost" , "--name" , vh] ) ;
266
+ let qfp = upstream. queue_federation . unwrap ( ) ;
267
+
268
+ run_succeeds ( [
269
+ "-V" ,
270
+ vh,
271
+ "federation" ,
272
+ "declare_upstream_for_queues" ,
273
+ "--name" ,
274
+ & upstream. name ,
275
+ "--uri" ,
276
+ & upstream. uri ,
277
+ "--queue-name" ,
278
+ & q,
279
+ "--consumer-tag" ,
280
+ & qfp. consumer_tag . unwrap ( ) ,
281
+ ] ) ;
282
+
283
+ run_succeeds ( [
284
+ "federation" ,
285
+ "list_all_upstreams" ,
286
+ ] ) . stdout ( predicate:: str:: contains (
287
+ name,
288
+ ) ) . stdout ( predicate:: str:: contains (
289
+ endpoint1. clone ( ) ,
290
+ ) ) ;
291
+
292
+ run_succeeds ( [
293
+ "-V" ,
294
+ vh,
295
+ "federation" ,
296
+ "delete_upstream" ,
297
+ "--name" ,
298
+ & upstream. name ,
299
+ ] ) ;
300
+
301
+ run_succeeds ( [
302
+ "federation" ,
303
+ "list_all_upstreams" ,
304
+ ] ) . stdout ( predicate:: str:: contains (
305
+ name,
306
+ ) . not ( ) ) . stdout ( predicate:: str:: contains (
307
+ endpoint1. clone ( ) ,
308
+ ) . not ( ) ) ;
309
+
310
+ delete_vhost ( vh) . expect ( "failed to delete a virtual host" ) ;
311
+
312
+ Ok ( ( ) )
313
+ }
0 commit comments