Skip to content

Federation upstream operations #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 23, 2025
Merged
18 changes: 9 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

492 changes: 407 additions & 85 deletions src/cli.rs

Large diffs are not rendered by default.

254 changes: 253 additions & 1 deletion src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use rabbitmq_http_client::commons::{PolicyTarget, VirtualHostLimitTarget};
use rabbitmq_http_client::requests::{
Amqp10ShovelDestinationParams, Amqp10ShovelParams, Amqp10ShovelSourceParams,
Amqp091ShovelDestinationParams, Amqp091ShovelParams, Amqp091ShovelSourceParams,
EnforcedLimitParams,
EnforcedLimitParams, ExchangeFederationParams, FEDERATION_UPSTREAM_COMPONENT,
FederationResourceCleanupMode, FederationUpstreamParams, QueueFederationParams,
RuntimeParameterDefinition,
};
use std::fs;
use std::process;
Expand Down Expand Up @@ -362,6 +364,248 @@ pub fn delete_shovel(
client.delete_shovel(vhost, &name, true)
}

//
// Federation
//

pub fn list_federation_upstreams(
client: APIClient,
) -> ClientResult<Vec<responses::FederationUpstream>> {
client.list_federation_upstreams()
}

pub fn declare_federation_upstream(
client: APIClient,
vhost: &str,
command_args: &ArgMatches,
) -> ClientResult<()> {
// common settings
let name = command_args.get_one::<String>("name").cloned().unwrap();
let uri = command_args.get_one::<String>("uri").cloned().unwrap();
let reconnect_delay = command_args
.get_one::<u16>("reconnect_delay")
.cloned()
.unwrap();
let trust_user_id = command_args
.get_one::<bool>("trust_user_id")
.cloned()
.unwrap();
let prefetch_count = command_args
.get_one::<u16>("prefetch_count")
.cloned()
.unwrap();
let ack_mode = command_args
.get_one::<MessageTransferAcknowledgementMode>("ack_mode")
.cloned()
.unwrap();

// optional queue federation settings
let queue_name = command_args.get_one::<String>("queue_name").cloned();
let consumer_tag = command_args.get_one::<String>("consumer_tag").cloned();
let qn: String;
let ct: String;
let qfp = match (queue_name, consumer_tag) {
(Some(queue_name), Some(consumer_tag)) => {
qn = queue_name.clone();
ct = consumer_tag.clone();
let qfp = QueueFederationParams::new_with_consumer_tag(&qn, &ct);
Some(qfp)
}
(Some(queue_name), None) => {
qn = queue_name.clone();
let qfp = QueueFederationParams::new(&qn);
Some(qfp)
}
(None, Some(_)) => None,
(None, None) => None,
};

// optional exchange federation settings
let exchange_name = command_args
.get_one::<String>("exchange_name")
.map(|s| s.as_str());
let queue_type = command_args
.get_one::<String>("queue_type")
.map(|s| Into::<QueueType>::into(s.as_str()))
.unwrap_or_default();
let max_hops = command_args.get_one::<u8>("max_hops").copied();
let resource_cleanup_mode = command_args
.get_one::<FederationResourceCleanupMode>("resource_cleanup_mode")
.cloned()
.unwrap_or_default();
let bind_using_nowait = command_args
.get_one::<bool>("bind_nowait")
.cloned()
.unwrap_or_default();
let ttl = command_args.get_one::<u32>("ttl").cloned();
let message_ttl = command_args.get_one::<u32>("message_ttl").cloned();
let efp = Some(ExchangeFederationParams {
exchange: exchange_name,
max_hops,
queue_type,
ttl,
message_ttl,
resource_cleanup_mode,
});

// putting it all together
let upstream = FederationUpstreamParams {
name: &name,
vhost,
uri: &uri,
reconnect_delay,
trust_user_id,
prefetch_count,
ack_mode,
bind_using_nowait,
queue_federation: qfp,
exchange_federation: efp,
};
let param = RuntimeParameterDefinition::from(upstream);
client.upsert_runtime_parameter(&param)
}

pub fn declare_federation_upstream_for_exchange_federation(
client: APIClient,
vhost: &str,
command_args: &ArgMatches,
) -> ClientResult<()> {
let name = command_args.get_one::<String>("name").cloned().unwrap();
let uri = command_args.get_one::<String>("uri").cloned().unwrap();
let reconnect_delay = command_args
.get_one::<u16>("reconnect_delay")
.cloned()
.unwrap();
let trust_user_id = command_args
.get_one::<bool>("trust_user_id")
.cloned()
.unwrap();
let prefetch_count = command_args
.get_one::<u16>("prefetch_count")
.cloned()
.unwrap();
let ack_mode = command_args
.get_one::<MessageTransferAcknowledgementMode>("ack_mode")
.cloned()
.unwrap();

let exchange_name = command_args
.get_one::<String>("exchange_name")
.map(|s| s.as_str());
let queue_type = command_args
.get_one::<String>("queue_type")
.map(|s| Into::<QueueType>::into(s.as_str()))
.unwrap_or_default();
let max_hops = command_args.get_one::<u8>("max_hops").copied();
let resource_cleanup_mode = command_args
.get_one::<FederationResourceCleanupMode>("resource_cleanup_mode")
.cloned()
.unwrap_or_default();
let bind_using_nowait = command_args
.get_one::<bool>("bind_nowait")
.cloned()
.unwrap_or_default();
let ttl = command_args.get_one::<u32>("ttl").cloned();
let message_ttl = command_args.get_one::<u32>("message_ttl").cloned();
let efp = Some(ExchangeFederationParams {
exchange: exchange_name,
max_hops,
queue_type,
ttl,
message_ttl,
resource_cleanup_mode,
});

// putting it all together
let upstream = FederationUpstreamParams {
name: &name,
vhost,
uri: &uri,
reconnect_delay,
trust_user_id,
prefetch_count,
ack_mode,
bind_using_nowait,
queue_federation: None,
exchange_federation: efp,
};
let param = RuntimeParameterDefinition::from(upstream);
client.upsert_runtime_parameter(&param)
}

pub fn declare_federation_upstream_for_queue_federation(
client: APIClient,
vhost: &str,
command_args: &ArgMatches,
) -> ClientResult<()> {
let name = command_args.get_one::<String>("name").cloned().unwrap();
let uri = command_args.get_one::<String>("uri").cloned().unwrap();
let reconnect_delay = command_args
.get_one::<u16>("reconnect_delay")
.cloned()
.unwrap();
let trust_user_id = command_args
.get_one::<bool>("trust_user_id")
.cloned()
.unwrap();
let prefetch_count = command_args
.get_one::<u16>("prefetch_count")
.cloned()
.unwrap();
let ack_mode = command_args
.get_one::<MessageTransferAcknowledgementMode>("ack_mode")
.cloned()
.unwrap();

let queue_name = command_args.get_one::<String>("queue_name").cloned();
let consumer_tag = command_args.get_one::<String>("consumer_tag").cloned();
let qn: String;
let ct: String;
let qfp = match (queue_name, consumer_tag) {
(Some(queue_name), Some(consumer_tag)) => {
qn = queue_name.clone();
ct = consumer_tag.clone();
let qfp = QueueFederationParams::new_with_consumer_tag(&qn, &ct);
Some(qfp)
}
(Some(queue_name), None) => {
qn = queue_name.clone();
let qfp = QueueFederationParams::new(&qn);
Some(qfp)
}
(None, Some(_)) => None,
(None, None) => None,
};

let upstream = FederationUpstreamParams {
name: &name,
vhost,
uri: &uri,
reconnect_delay,
trust_user_id,
prefetch_count,
ack_mode,
bind_using_nowait: false,
queue_federation: qfp,
exchange_federation: None,
};
let param = RuntimeParameterDefinition::from(upstream);
client.upsert_runtime_parameter(&param)
}

pub fn delete_federation_upstream(
client: APIClient,
vhost: &str,
command_args: &ArgMatches,
) -> ClientResult<()> {
let name = command_args.get_one::<String>("name").cloned().unwrap();
client.clear_runtime_parameter(FEDERATION_UPSTREAM_COMPONENT, vhost, &name)
}

//
// Feature flags
//

pub fn enable_feature_flag(client: APIClient, command_args: &ArgMatches) -> ClientResult<()> {
let name = command_args.get_one::<String>("name").cloned().unwrap();
client.enable_feature_flag(&name)
Expand All @@ -371,6 +615,10 @@ pub fn enable_all_stable_feature_flags(client: APIClient) -> ClientResult<()> {
client.enable_all_stable_feature_flags()
}

//
// Deprecated features
//

pub fn list_deprecated_features(
client: APIClient,
) -> ClientResult<responses::DeprecatedFeatureList> {
Expand All @@ -383,6 +631,10 @@ pub fn list_deprecated_features_in_use(
client.list_deprecated_features_in_use()
}

//
// Declaration of core resources
//

pub fn declare_vhost(client: APIClient, command_args: &ArgMatches) -> ClientResult<()> {
// the flag is required
let name = command_args.get_one::<String>("name").unwrap();
Expand Down
7 changes: 6 additions & 1 deletion src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use rabbitmq_http_client::error::Error as ApiClientError;
use rabbitmq_http_client::error::{ConversionError, Error as ApiClientError};
use rabbitmq_http_client::{blocking_api::HttpClientError, responses::HealthCheckFailureDetails};
use reqwest::{
StatusCode,
Expand Down Expand Up @@ -79,6 +79,8 @@ pub enum CommandRunError {
UnsupportedArgumentValue { property: String },
#[error("This request produces an invalid HTTP header value")]
InvalidHeaderValue { error: InvalidHeaderValue },
#[error("Response is incompatible with the target data type")]
IncompatibleBody { error: ConversionError },
#[error("encountered an error when performing an HTTP request")]
RequestError { error: reqwest::Error },
#[error("an unspecified error")]
Expand Down Expand Up @@ -118,6 +120,9 @@ impl From<HttpClientError> for CommandRunError {
ApiClientError::MissingProperty { argument } => {
Self::MissingArgumentValue { property: argument }
},
ApiClientError::IncompatibleBody { error, .. } => {
Self::IncompatibleBody { error }
},
}
}
}
Loading