Skip to content

Commit 92f303e

Browse files
Merge pull request #55 from rabbitmq/federation-support
Federation upstream operations
2 parents 9281d63 + 9c89414 commit 92f303e

File tree

10 files changed

+1431
-96
lines changed

10 files changed

+1431
-96
lines changed

Cargo.lock

Lines changed: 9 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/cli.rs

Lines changed: 407 additions & 85 deletions
Large diffs are not rendered by default.

src/commands.rs

Lines changed: 253 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ use rabbitmq_http_client::commons::{PolicyTarget, VirtualHostLimitTarget};
2323
use rabbitmq_http_client::requests::{
2424
Amqp10ShovelDestinationParams, Amqp10ShovelParams, Amqp10ShovelSourceParams,
2525
Amqp091ShovelDestinationParams, Amqp091ShovelParams, Amqp091ShovelSourceParams,
26-
EnforcedLimitParams,
26+
EnforcedLimitParams, ExchangeFederationParams, FEDERATION_UPSTREAM_COMPONENT,
27+
FederationResourceCleanupMode, FederationUpstreamParams, QueueFederationParams,
28+
RuntimeParameterDefinition,
2729
};
2830
use std::fs;
2931
use std::process;
@@ -362,6 +364,248 @@ pub fn delete_shovel(
362364
client.delete_shovel(vhost, &name, true)
363365
}
364366

367+
//
368+
// Federation
369+
//
370+
371+
pub fn list_federation_upstreams(
372+
client: APIClient,
373+
) -> ClientResult<Vec<responses::FederationUpstream>> {
374+
client.list_federation_upstreams()
375+
}
376+
377+
pub fn declare_federation_upstream(
378+
client: APIClient,
379+
vhost: &str,
380+
command_args: &ArgMatches,
381+
) -> ClientResult<()> {
382+
// common settings
383+
let name = command_args.get_one::<String>("name").cloned().unwrap();
384+
let uri = command_args.get_one::<String>("uri").cloned().unwrap();
385+
let reconnect_delay = command_args
386+
.get_one::<u16>("reconnect_delay")
387+
.cloned()
388+
.unwrap();
389+
let trust_user_id = command_args
390+
.get_one::<bool>("trust_user_id")
391+
.cloned()
392+
.unwrap();
393+
let prefetch_count = command_args
394+
.get_one::<u16>("prefetch_count")
395+
.cloned()
396+
.unwrap();
397+
let ack_mode = command_args
398+
.get_one::<MessageTransferAcknowledgementMode>("ack_mode")
399+
.cloned()
400+
.unwrap();
401+
402+
// optional queue federation settings
403+
let queue_name = command_args.get_one::<String>("queue_name").cloned();
404+
let consumer_tag = command_args.get_one::<String>("consumer_tag").cloned();
405+
let qn: String;
406+
let ct: String;
407+
let qfp = match (queue_name, consumer_tag) {
408+
(Some(queue_name), Some(consumer_tag)) => {
409+
qn = queue_name.clone();
410+
ct = consumer_tag.clone();
411+
let qfp = QueueFederationParams::new_with_consumer_tag(&qn, &ct);
412+
Some(qfp)
413+
}
414+
(Some(queue_name), None) => {
415+
qn = queue_name.clone();
416+
let qfp = QueueFederationParams::new(&qn);
417+
Some(qfp)
418+
}
419+
(None, Some(_)) => None,
420+
(None, None) => None,
421+
};
422+
423+
// optional exchange federation settings
424+
let exchange_name = command_args
425+
.get_one::<String>("exchange_name")
426+
.map(|s| s.as_str());
427+
let queue_type = command_args
428+
.get_one::<String>("queue_type")
429+
.map(|s| Into::<QueueType>::into(s.as_str()))
430+
.unwrap_or_default();
431+
let max_hops = command_args.get_one::<u8>("max_hops").copied();
432+
let resource_cleanup_mode = command_args
433+
.get_one::<FederationResourceCleanupMode>("resource_cleanup_mode")
434+
.cloned()
435+
.unwrap_or_default();
436+
let bind_using_nowait = command_args
437+
.get_one::<bool>("bind_nowait")
438+
.cloned()
439+
.unwrap_or_default();
440+
let ttl = command_args.get_one::<u32>("ttl").cloned();
441+
let message_ttl = command_args.get_one::<u32>("message_ttl").cloned();
442+
let efp = Some(ExchangeFederationParams {
443+
exchange: exchange_name,
444+
max_hops,
445+
queue_type,
446+
ttl,
447+
message_ttl,
448+
resource_cleanup_mode,
449+
});
450+
451+
// putting it all together
452+
let upstream = FederationUpstreamParams {
453+
name: &name,
454+
vhost,
455+
uri: &uri,
456+
reconnect_delay,
457+
trust_user_id,
458+
prefetch_count,
459+
ack_mode,
460+
bind_using_nowait,
461+
queue_federation: qfp,
462+
exchange_federation: efp,
463+
};
464+
let param = RuntimeParameterDefinition::from(upstream);
465+
client.upsert_runtime_parameter(&param)
466+
}
467+
468+
pub fn declare_federation_upstream_for_exchange_federation(
469+
client: APIClient,
470+
vhost: &str,
471+
command_args: &ArgMatches,
472+
) -> ClientResult<()> {
473+
let name = command_args.get_one::<String>("name").cloned().unwrap();
474+
let uri = command_args.get_one::<String>("uri").cloned().unwrap();
475+
let reconnect_delay = command_args
476+
.get_one::<u16>("reconnect_delay")
477+
.cloned()
478+
.unwrap();
479+
let trust_user_id = command_args
480+
.get_one::<bool>("trust_user_id")
481+
.cloned()
482+
.unwrap();
483+
let prefetch_count = command_args
484+
.get_one::<u16>("prefetch_count")
485+
.cloned()
486+
.unwrap();
487+
let ack_mode = command_args
488+
.get_one::<MessageTransferAcknowledgementMode>("ack_mode")
489+
.cloned()
490+
.unwrap();
491+
492+
let exchange_name = command_args
493+
.get_one::<String>("exchange_name")
494+
.map(|s| s.as_str());
495+
let queue_type = command_args
496+
.get_one::<String>("queue_type")
497+
.map(|s| Into::<QueueType>::into(s.as_str()))
498+
.unwrap_or_default();
499+
let max_hops = command_args.get_one::<u8>("max_hops").copied();
500+
let resource_cleanup_mode = command_args
501+
.get_one::<FederationResourceCleanupMode>("resource_cleanup_mode")
502+
.cloned()
503+
.unwrap_or_default();
504+
let bind_using_nowait = command_args
505+
.get_one::<bool>("bind_nowait")
506+
.cloned()
507+
.unwrap_or_default();
508+
let ttl = command_args.get_one::<u32>("ttl").cloned();
509+
let message_ttl = command_args.get_one::<u32>("message_ttl").cloned();
510+
let efp = Some(ExchangeFederationParams {
511+
exchange: exchange_name,
512+
max_hops,
513+
queue_type,
514+
ttl,
515+
message_ttl,
516+
resource_cleanup_mode,
517+
});
518+
519+
// putting it all together
520+
let upstream = FederationUpstreamParams {
521+
name: &name,
522+
vhost,
523+
uri: &uri,
524+
reconnect_delay,
525+
trust_user_id,
526+
prefetch_count,
527+
ack_mode,
528+
bind_using_nowait,
529+
queue_federation: None,
530+
exchange_federation: efp,
531+
};
532+
let param = RuntimeParameterDefinition::from(upstream);
533+
client.upsert_runtime_parameter(&param)
534+
}
535+
536+
pub fn declare_federation_upstream_for_queue_federation(
537+
client: APIClient,
538+
vhost: &str,
539+
command_args: &ArgMatches,
540+
) -> ClientResult<()> {
541+
let name = command_args.get_one::<String>("name").cloned().unwrap();
542+
let uri = command_args.get_one::<String>("uri").cloned().unwrap();
543+
let reconnect_delay = command_args
544+
.get_one::<u16>("reconnect_delay")
545+
.cloned()
546+
.unwrap();
547+
let trust_user_id = command_args
548+
.get_one::<bool>("trust_user_id")
549+
.cloned()
550+
.unwrap();
551+
let prefetch_count = command_args
552+
.get_one::<u16>("prefetch_count")
553+
.cloned()
554+
.unwrap();
555+
let ack_mode = command_args
556+
.get_one::<MessageTransferAcknowledgementMode>("ack_mode")
557+
.cloned()
558+
.unwrap();
559+
560+
let queue_name = command_args.get_one::<String>("queue_name").cloned();
561+
let consumer_tag = command_args.get_one::<String>("consumer_tag").cloned();
562+
let qn: String;
563+
let ct: String;
564+
let qfp = match (queue_name, consumer_tag) {
565+
(Some(queue_name), Some(consumer_tag)) => {
566+
qn = queue_name.clone();
567+
ct = consumer_tag.clone();
568+
let qfp = QueueFederationParams::new_with_consumer_tag(&qn, &ct);
569+
Some(qfp)
570+
}
571+
(Some(queue_name), None) => {
572+
qn = queue_name.clone();
573+
let qfp = QueueFederationParams::new(&qn);
574+
Some(qfp)
575+
}
576+
(None, Some(_)) => None,
577+
(None, None) => None,
578+
};
579+
580+
let upstream = FederationUpstreamParams {
581+
name: &name,
582+
vhost,
583+
uri: &uri,
584+
reconnect_delay,
585+
trust_user_id,
586+
prefetch_count,
587+
ack_mode,
588+
bind_using_nowait: false,
589+
queue_federation: qfp,
590+
exchange_federation: None,
591+
};
592+
let param = RuntimeParameterDefinition::from(upstream);
593+
client.upsert_runtime_parameter(&param)
594+
}
595+
596+
pub fn delete_federation_upstream(
597+
client: APIClient,
598+
vhost: &str,
599+
command_args: &ArgMatches,
600+
) -> ClientResult<()> {
601+
let name = command_args.get_one::<String>("name").cloned().unwrap();
602+
client.clear_runtime_parameter(FEDERATION_UPSTREAM_COMPONENT, vhost, &name)
603+
}
604+
605+
//
606+
// Feature flags
607+
//
608+
365609
pub fn enable_feature_flag(client: APIClient, command_args: &ArgMatches) -> ClientResult<()> {
366610
let name = command_args.get_one::<String>("name").cloned().unwrap();
367611
client.enable_feature_flag(&name)
@@ -371,6 +615,10 @@ pub fn enable_all_stable_feature_flags(client: APIClient) -> ClientResult<()> {
371615
client.enable_all_stable_feature_flags()
372616
}
373617

618+
//
619+
// Deprecated features
620+
//
621+
374622
pub fn list_deprecated_features(
375623
client: APIClient,
376624
) -> ClientResult<responses::DeprecatedFeatureList> {
@@ -383,6 +631,10 @@ pub fn list_deprecated_features_in_use(
383631
client.list_deprecated_features_in_use()
384632
}
385633

634+
//
635+
// Declaration of core resources
636+
//
637+
386638
pub fn declare_vhost(client: APIClient, command_args: &ArgMatches) -> ClientResult<()> {
387639
// the flag is required
388640
let name = command_args.get_one::<String>("name").unwrap();

src/errors.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use rabbitmq_http_client::error::Error as ApiClientError;
15+
use rabbitmq_http_client::error::{ConversionError, Error as ApiClientError};
1616
use rabbitmq_http_client::{blocking_api::HttpClientError, responses::HealthCheckFailureDetails};
1717
use reqwest::{
1818
StatusCode,
@@ -79,6 +79,8 @@ pub enum CommandRunError {
7979
UnsupportedArgumentValue { property: String },
8080
#[error("This request produces an invalid HTTP header value")]
8181
InvalidHeaderValue { error: InvalidHeaderValue },
82+
#[error("Response is incompatible with the target data type")]
83+
IncompatibleBody { error: ConversionError },
8284
#[error("encountered an error when performing an HTTP request")]
8385
RequestError { error: reqwest::Error },
8486
#[error("an unspecified error")]
@@ -118,6 +120,9 @@ impl From<HttpClientError> for CommandRunError {
118120
ApiClientError::MissingProperty { argument } => {
119121
Self::MissingArgumentValue { property: argument }
120122
},
123+
ApiClientError::IncompatibleBody { error, .. } => {
124+
Self::IncompatibleBody { error }
125+
},
121126
}
122127
}
123128
}

0 commit comments

Comments
 (0)