Skip to content

Commit 31c4623

Browse files
Test coverage for the exchange federation operations
1 parent dfefbd4 commit 31c4623

File tree

2 files changed

+350
-42
lines changed

2 files changed

+350
-42
lines changed

tests/exchange_federation_tests.rs

Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
// Copyright (C) 2023-2025 RabbitMQ Core Team ([email protected])
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
use predicates::prelude::*;
15+
use rabbitmq_http_client::commons::QueueType;
16+
use rabbitmq_http_client::requests::{ExchangeFederationParams, FederationUpstreamParams};
17+
18+
mod test_helpers;
19+
use crate::test_helpers::{amqp_endpoint_with_vhost, delete_vhost};
20+
use test_helpers::{run_fails, run_succeeds};
21+
22+
#[test]
23+
fn test_federation_upstream_declaration_for_exchange_federation_case0()
24+
-> Result<(), Box<dyn std::error::Error>> {
25+
let vh = "rust.federation.0";
26+
let name = "up.for_exchange_federation.0";
27+
28+
let amqp_endpoint = amqp_endpoint_with_vhost(vh);
29+
let queue_type = QueueType::Quorum;
30+
let xfp = ExchangeFederationParams::new(queue_type);
31+
let endpoint1 = amqp_endpoint.clone();
32+
let upstream =
33+
FederationUpstreamParams::new_exchange_federation_upstream(vh, name, &endpoint1, xfp);
34+
35+
run_succeeds(["declare", "vhost", "--name", vh]);
36+
37+
run_succeeds([
38+
"-V",
39+
vh,
40+
"federation",
41+
"declare_upstream_for_exchanges",
42+
"--name",
43+
&upstream.name,
44+
"--uri",
45+
&upstream.uri,
46+
]);
47+
48+
delete_vhost(vh).expect("failed to delete a virtual host");
49+
50+
Ok(())
51+
}
52+
53+
#[test]
54+
fn test_federation_upstream_declaration_for_exchange_federation_case1()
55+
-> Result<(), Box<dyn std::error::Error>> {
56+
let vh = "rust.federation.1";
57+
let name = "up.for_exchange_federation.1";
58+
59+
let amqp_endpoint = amqp_endpoint_with_vhost(vh);
60+
let x = "federation.x.1";
61+
let queue_type = QueueType::Quorum;
62+
let xfp = ExchangeFederationParams::new(queue_type);
63+
let endpoint1 = amqp_endpoint.clone();
64+
let upstream =
65+
FederationUpstreamParams::new_exchange_federation_upstream(vh, name, &endpoint1, xfp);
66+
67+
run_succeeds(["declare", "vhost", "--name", vh]);
68+
let xfp = upstream.exchange_federation.unwrap();
69+
70+
run_succeeds([
71+
"-V",
72+
vh,
73+
"federation",
74+
"declare_upstream_for_exchanges",
75+
"--name",
76+
&upstream.name,
77+
"--uri",
78+
&upstream.uri,
79+
"--exchange-name",
80+
&x,
81+
"--queue-type",
82+
&xfp.queue_type.to_string(),
83+
]);
84+
85+
delete_vhost(vh).expect("failed to delete a virtual host");
86+
87+
Ok(())
88+
}
89+
90+
#[test]
91+
fn test_federation_upstream_declaration_for_exchange_federation_case2()
92+
-> Result<(), Box<dyn std::error::Error>> {
93+
let vh = "rust.federation.2";
94+
let name = "up.for_exchange_federation.2";
95+
96+
let amqp_endpoint = amqp_endpoint_with_vhost(vh);
97+
let x = "federation.x.2";
98+
let queue_type = QueueType::Quorum;
99+
let xfp = ExchangeFederationParams::new(queue_type);
100+
let endpoint1 = amqp_endpoint.clone();
101+
let upstream =
102+
FederationUpstreamParams::new_exchange_federation_upstream(vh, name, &endpoint1, xfp);
103+
104+
run_succeeds(["declare", "vhost", "--name", vh]);
105+
let xfp = upstream.exchange_federation.unwrap();
106+
107+
run_succeeds([
108+
"-V",
109+
vh,
110+
"federation",
111+
"declare_upstream_for_exchanges",
112+
"--name",
113+
&upstream.name,
114+
"--uri",
115+
&upstream.uri,
116+
"--exchange-name",
117+
&x,
118+
"--queue-type",
119+
&xfp.queue_type.to_string(),
120+
"--max-hops",
121+
"2",
122+
"--ttl",
123+
"900000000",
124+
"--message-ttl",
125+
"450000000",
126+
]);
127+
128+
delete_vhost(vh).expect("failed to delete a virtual host");
129+
130+
Ok(())
131+
}
132+
133+
#[test]
134+
fn test_federation_upstream_declaration_for_exchange_federation_case3()
135+
-> Result<(), Box<dyn std::error::Error>> {
136+
let vh = "rust.federation.3";
137+
let name = "up.for_exchange_federation.3";
138+
139+
let amqp_endpoint = amqp_endpoint_with_vhost(vh);
140+
let x = "federation.x.3";
141+
let queue_type = QueueType::Quorum;
142+
let xfp = ExchangeFederationParams::new(queue_type);
143+
let endpoint1 = amqp_endpoint.clone();
144+
let upstream =
145+
FederationUpstreamParams::new_exchange_federation_upstream(vh, name, &endpoint1, xfp);
146+
147+
run_succeeds(["declare", "vhost", "--name", vh]);
148+
let xfp = upstream.exchange_federation.unwrap();
149+
150+
// missing --name
151+
run_fails([
152+
"-V",
153+
vh,
154+
"federation",
155+
"declare_upstream_for_exchanges",
156+
"--uri",
157+
&upstream.uri,
158+
"--exchange-name",
159+
&x,
160+
"--queue-type",
161+
&xfp.queue_type.to_string(),
162+
])
163+
.stderr(predicate::str::contains(
164+
"required arguments were not provided",
165+
));
166+
167+
delete_vhost(vh).expect("failed to delete a virtual host");
168+
169+
Ok(())
170+
}
171+
172+
#[test]
173+
fn test_federation_upstream_declaration_for_exchange_federation_case4()
174+
-> Result<(), Box<dyn std::error::Error>> {
175+
let vh = "rust.federation.4";
176+
let name = "up.for_exchange_federation.4";
177+
178+
let amqp_endpoint = amqp_endpoint_with_vhost(vh);
179+
let x = "federation.x.4";
180+
let queue_type = QueueType::Quorum;
181+
let xfp = ExchangeFederationParams::new(queue_type);
182+
let endpoint1 = amqp_endpoint.clone();
183+
let upstream =
184+
FederationUpstreamParams::new_exchange_federation_upstream(vh, name, &endpoint1, xfp);
185+
186+
run_succeeds(["declare", "vhost", "--name", vh]);
187+
let xfp = upstream.exchange_federation.unwrap();
188+
189+
// missing --uri
190+
run_fails([
191+
"-V",
192+
vh,
193+
"federation",
194+
"declare_upstream_for_exchanges",
195+
"--name",
196+
&upstream.name,
197+
"--exchange-name",
198+
&x,
199+
"--queue-type",
200+
&xfp.queue_type.to_string(),
201+
"--max-hops",
202+
"2",
203+
"--ttl",
204+
"900000000",
205+
"--message-ttl",
206+
"450000000",
207+
]);
208+
209+
delete_vhost(vh).expect("failed to delete a virtual host");
210+
211+
Ok(())
212+
}
213+
214+
#[test]
215+
fn test_federation_list_all_upstreams_with_exchange_federation()
216+
-> Result<(), Box<dyn std::error::Error>> {
217+
let vh = "rust.federation.5";
218+
let name = "up.for_exchange_federation.5";
219+
220+
let amqp_endpoint = amqp_endpoint_with_vhost(vh);
221+
let x = "federation.x.5";
222+
let queue_type = QueueType::Classic;
223+
let xfp = ExchangeFederationParams::new(queue_type.clone());
224+
let endpoint1 = amqp_endpoint.clone();
225+
let upstream =
226+
FederationUpstreamParams::new_exchange_federation_upstream(vh, name, &endpoint1, xfp);
227+
228+
run_succeeds(["declare", "vhost", "--name", vh]);
229+
let xfp = upstream.exchange_federation.unwrap();
230+
231+
run_succeeds([
232+
"-V",
233+
vh,
234+
"federation",
235+
"declare_upstream_for_exchanges",
236+
"--name",
237+
&upstream.name,
238+
"--uri",
239+
&upstream.uri,
240+
"--exchange-name",
241+
&x,
242+
"--queue-type",
243+
&xfp.queue_type.to_string(),
244+
"--max-hops",
245+
"2",
246+
"--ttl",
247+
"900000000",
248+
"--message-ttl",
249+
"450000000",
250+
]);
251+
252+
run_succeeds(["-V", vh, "federation", "list_all_upstreams"])
253+
.stdout(predicate::str::contains(name))
254+
.stdout(predicate::str::contains(endpoint1.clone()))
255+
.stdout(predicate::str::contains(x))
256+
.stdout(predicate::str::contains(queue_type.to_string()));
257+
258+
delete_vhost(vh).expect("failed to delete a virtual host");
259+
260+
Ok(())
261+
}
262+
263+
#[test]
264+
fn test_federation_delete_an_upstream_with_exchange_federation_settings()
265+
-> Result<(), Box<dyn std::error::Error>> {
266+
let vh = "rust.federation.6";
267+
let name = "up.for_exchange_federation.6";
268+
269+
let amqp_endpoint = amqp_endpoint_with_vhost(vh);
270+
let x = "federation.x.6";
271+
let queue_type = QueueType::Classic;
272+
let xfp = ExchangeFederationParams::new(queue_type.clone());
273+
let endpoint1 = amqp_endpoint.clone();
274+
let upstream =
275+
FederationUpstreamParams::new_exchange_federation_upstream(vh, name, &endpoint1, xfp);
276+
277+
run_succeeds(["declare", "vhost", "--name", vh]);
278+
let xfp = upstream.exchange_federation.unwrap();
279+
280+
run_succeeds([
281+
"-V",
282+
vh,
283+
"federation",
284+
"declare_upstream_for_exchanges",
285+
"--name",
286+
&upstream.name,
287+
"--uri",
288+
&upstream.uri,
289+
"--exchange-name",
290+
&x,
291+
"--queue-type",
292+
&xfp.queue_type.to_string(),
293+
"--max-hops",
294+
"2",
295+
"--ttl",
296+
"900000000",
297+
"--message-ttl",
298+
"450000000",
299+
]);
300+
301+
run_succeeds(["-V", vh, "federation", "list_all_upstreams"])
302+
.stdout(predicate::str::contains(name))
303+
.stdout(predicate::str::contains(endpoint1.clone()))
304+
.stdout(predicate::str::contains(x))
305+
.stdout(predicate::str::contains(queue_type.to_string()));
306+
307+
run_succeeds([
308+
"-V",
309+
vh,
310+
"federation",
311+
"delete_upstream",
312+
"--name",
313+
&upstream.name,
314+
]);
315+
316+
run_succeeds(["-V", vh, "federation", "list_all_upstreams"])
317+
.stdout(predicate::str::contains(name).not())
318+
.stdout(predicate::str::contains(endpoint1.clone()).not())
319+
.stdout(predicate::str::contains(x).not())
320+
.stdout(predicate::str::contains(queue_type.to_string()).not());
321+
322+
delete_vhost(vh).expect("failed to delete a virtual host");
323+
324+
Ok(())
325+
}

0 commit comments

Comments
 (0)