Skip to content

Commit a61ba94

Browse files
committed
Added support for clients and services
1 parent b7cedd9 commit a61ba94

File tree

22 files changed

+913
-17
lines changed

22 files changed

+913
-17
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
[package]
2+
name = "examples_rclrs_minimal_client_service"
3+
version = "0.2.0"
4+
authors = ["Esteve Fernandez <[email protected]>"]
5+
edition = "2021"
6+
7+
[[bin]]
8+
name = "minimal_client"
9+
path = "src/minimal_client.rs"
10+
11+
[[bin]]
12+
name = "minimal_client_async"
13+
path = "src/minimal_client_async.rs"
14+
15+
[[bin]]
16+
name = "minimal_service"
17+
path = "src/minimal_service.rs"
18+
19+
[dependencies]
20+
anyhow = {version = "1", features = ["backtrace"]}
21+
22+
[dependencies.rclrs]
23+
version = "*"
24+
25+
[dependencies.rosidl_runtime_rs]
26+
version = "*"
27+
28+
[dependencies.std_msgs]
29+
version = "*"
30+
31+
[dependencies.example_interfaces]
32+
version = "*"
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?xml version="1.0"?>
2+
<?xml-model
3+
href="http://download.ros.org/schema/package_format3.xsd"
4+
schematypens="http://www.w3.org/2001/XMLSchema"?>
5+
<package format="3">
6+
<name>examples_rclrs_minimal_client_service</name>
7+
<version>0.2.0</version>
8+
<description>Package containing an example of the client-service mechanism in rclrs.</description>
9+
<maintainer email="[email protected]">Esteve Fernandez</maintainer>
10+
<license>Apache License 2.0</license>
11+
12+
<build_depend>example_interfaces</build_depend>
13+
<build_depend>rclrs</build_depend>
14+
<build_depend>rosidl_runtime_rs</build_depend>
15+
<build_depend>std_msgs</build_depend>
16+
17+
<exec_depend>example_interfaces</exec_depend>
18+
<exec_depend>rclrs</exec_depend>
19+
<exec_depend>rosidl_runtime_rs</exec_depend>
20+
<exec_depend>std_msgs</exec_depend>
21+
22+
<export>
23+
<build_type>ament_cargo</build_type>
24+
</export>
25+
</package>
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
use anyhow::{Error, Result};
2+
use std::env;
3+
4+
fn main() -> Result<(), Error> {
5+
let context = rclrs::Context::new(env::args()).unwrap();
6+
7+
let mut node = context.create_node("minimal_client")?;
8+
9+
let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;
10+
11+
let request = example_interfaces::srv::AddTwoInts_Request { a: 41, b: 1 };
12+
13+
println!("Starting client");
14+
15+
std::thread::sleep(std::time::Duration::from_millis(500));
16+
17+
client.async_send_request_with_callback(
18+
&request,
19+
move |response: &example_interfaces::srv::AddTwoInts_Response| {
20+
println!(
21+
"Result of {} + {} is: {}",
22+
request.a, request.b, response.sum
23+
);
24+
},
25+
)?;
26+
27+
std::thread::sleep(std::time::Duration::from_millis(500));
28+
29+
println!("Waiting for response");
30+
rclrs::spin(&node).map_err(|err| err.into())
31+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
use anyhow::{Error, Result};
2+
use std::env;
3+
4+
fn main() -> Result<(), Error> {
5+
let context = rclrs::Context::new(env::args()).unwrap();
6+
7+
let mut node = context.create_node("minimal_client")?;
8+
9+
let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;
10+
11+
let request = example_interfaces::srv::AddTwoInts_Request { a: 41, b: 1 };
12+
13+
println!("Starting client");
14+
15+
std::thread::sleep(std::time::Duration::from_millis(500));
16+
17+
let future = client.call_async(&request)?;
18+
19+
println!("Waiting for response");
20+
let response = rclrs::spin_until_future_complete(&node, future)?;
21+
22+
println!(
23+
"Result of {} + {} is: {}",
24+
request.a, request.b, response.sum
25+
);
26+
Ok(())
27+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use anyhow::{Error, Result};
2+
use std::env;
3+
4+
fn handle_service(
5+
_request_header: &rclrs::rmw_request_id_t,
6+
request: &example_interfaces::srv::AddTwoInts_Request,
7+
response: &mut example_interfaces::srv::AddTwoInts_Response,
8+
) {
9+
println!("request: {} + {}", request.a, request.b);
10+
response.sum = request.a + request.b;
11+
}
12+
13+
fn main() -> Result<(), Error> {
14+
let context = rclrs::Context::new(env::args()).unwrap();
15+
16+
let mut node = context.create_node("minimal_service")?;
17+
18+
let _server = node
19+
.create_service::<example_interfaces::srv::AddTwoInts, _>("add_two_ints", handle_service)?;
20+
21+
println!("Starting server");
22+
rclrs::spin(&node).map_err(|err| err.into())
23+
}

rclrs/src/future.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/// Based on https://www.viget.com/articles/understanding-futures-in-rust-part-1/
2+
use std::future::Future;
3+
use std::pin::Pin;
4+
use std::sync::Arc;
5+
use std::task::Context;
6+
use std::task::Poll;
7+
use std::task::RawWaker;
8+
use std::task::RawWakerVTable;
9+
use std::task::Waker;
10+
11+
#[derive(Default)]
12+
pub struct RclFuture<T> {
13+
value: Option<T>,
14+
}
15+
16+
impl<T: Default + Clone> RclFuture<T> {
17+
pub fn new() -> RclFuture<T> {
18+
Self { value: None }
19+
}
20+
21+
pub fn set_value(&mut self, msg: T) {
22+
self.value = Some(msg);
23+
}
24+
}
25+
26+
impl<T: Clone> Future for RclFuture<T> {
27+
type Output = T;
28+
29+
fn poll(self: Pin<&mut Self>, _ctx: &mut Context) -> Poll<Self::Output> {
30+
if let Some(value) = &self.value {
31+
Poll::Ready(value.clone())
32+
} else {
33+
Poll::Pending
34+
}
35+
}
36+
}
37+
38+
#[derive(Clone)]
39+
pub struct RclWaker {}
40+
41+
fn rclwaker_wake(_s: &RclWaker) {}
42+
43+
fn rclwaker_wake_by_ref(_s: &RclWaker) {}
44+
45+
fn rclwaker_clone(s: &RclWaker) -> RawWaker {
46+
let arc = unsafe { Arc::from_raw(s) };
47+
std::mem::forget(arc.clone());
48+
RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
49+
}
50+
51+
const VTABLE: RawWakerVTable = unsafe {
52+
RawWakerVTable::new(
53+
|s| rclwaker_clone(&*(s as *const RclWaker)),
54+
|s| rclwaker_wake(&*(s as *const RclWaker)),
55+
|s| rclwaker_wake_by_ref(&*(s as *const RclWaker)),
56+
|s| drop(Arc::from_raw(s as *const RclWaker)),
57+
)
58+
};
59+
60+
pub fn rclwaker_into_waker(s: *const RclWaker) -> Waker {
61+
let raw_waker = RawWaker::new(s as *const (), &VTABLE);
62+
unsafe { Waker::from_raw(raw_waker) }
63+
}

rclrs/src/lib.rs

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,15 @@
55
//!
66
//! [1]: https://github.com/ros2-rust/ros2_rust/blob/main/README.md
77
8+
use std::future::Future;
9+
use std::pin::Pin;
10+
use std::sync::Arc;
11+
use std::task::Poll;
12+
use std::time::Duration;
13+
814
mod context;
915
mod error;
16+
mod future;
1017
mod node;
1118
mod qos;
1219
mod wait;
@@ -20,37 +27,68 @@ pub use qos::*;
2027
pub use wait::*;
2128

2229
use rcl_bindings::rcl_context_is_valid;
23-
use std::time::Duration;
30+
31+
pub use rcl_bindings::rmw_request_id_t;
32+
33+
use parking_lot::Mutex;
2434

2535
/// Polls the node for new messages and executes the corresponding callbacks.
2636
///
2737
/// See [`WaitSet::wait`] for the meaning of the `timeout` parameter.
2838
///
2939
/// This may under some circumstances return
30-
/// [`SubscriptionTakeFailed`][1] when the wait set spuriously wakes up.
40+
/// [`SubscriptionTakeFailed`][1], [`ClientTakeFailed`][1], [`ServiceTakeFailed`][1] when the wait
41+
/// set spuriously wakes up.
3142
/// This can usually be ignored.
3243
///
3344
/// [1]: crate::RclReturnCode
3445
pub fn spin_once(node: &Node, timeout: Option<Duration>) -> Result<(), RclrsError> {
3546
let live_subscriptions = node.live_subscriptions();
47+
let live_clients = node.live_clients();
48+
let live_services = node.live_services();
3649
let ctx = Context {
3750
rcl_context_mtx: node.rcl_context_mtx.clone(),
3851
};
39-
let mut wait_set = WaitSet::new(live_subscriptions.len(), &ctx)?;
52+
let mut wait_set = WaitSet::new(
53+
live_subscriptions.len(),
54+
0,
55+
0,
56+
live_clients.len(),
57+
live_services.len(),
58+
0,
59+
&ctx,
60+
)?;
4061

4162
for live_subscription in &live_subscriptions {
4263
wait_set.add_subscription(live_subscription.clone())?;
4364
}
4465

66+
for live_client in &live_clients {
67+
wait_set.add_client(live_client.clone())?;
68+
}
69+
70+
for live_service in &live_services {
71+
wait_set.add_service(live_service.clone())?;
72+
}
73+
4574
let ready_entities = wait_set.wait(timeout)?;
75+
4676
for ready_subscription in ready_entities.subscriptions {
4777
ready_subscription.execute()?;
4878
}
4979

80+
for ready_client in ready_entities.clients {
81+
ready_client.execute()?;
82+
}
83+
84+
for ready_service in ready_entities.services {
85+
ready_service.execute()?;
86+
}
87+
5088
Ok(())
5189
}
5290

53-
/// Convenience function for calling [`spin_once`] in a loop.
91+
/// Convenience function for calling [`rclrs::spin_once`] in a loop.
5492
///
5593
/// This function additionally checks that the context is still valid.
5694
pub fn spin(node: &Node) -> Result<(), RclrsError> {
@@ -72,6 +110,33 @@ pub fn spin(node: &Node) -> Result<(), RclrsError> {
72110
error => return error,
73111
}
74112
}
75-
76113
Ok(())
77114
}
115+
116+
pub fn spin_until_future_complete<T: Unpin + Clone>(
117+
node: &node::Node,
118+
future: Arc<Mutex<Box<crate::future::RclFuture<T>>>>,
119+
) -> Result<<future::RclFuture<T> as Future>::Output, RclrsError> {
120+
let rclwaker = Arc::new(crate::future::RclWaker {});
121+
let waker = crate::future::rclwaker_into_waker(Arc::into_raw(rclwaker));
122+
let mut cx = std::task::Context::from_waker(&waker);
123+
124+
loop {
125+
let context_valid = unsafe { rcl_context_is_valid(&mut *node.context.lock()) };
126+
if context_valid {
127+
if let Some(error) = spin_once(node, None).err() {
128+
match error {
129+
RclrsError {
130+
code: RclReturnCode::Timeout,
131+
..
132+
} => continue,
133+
error => return Err(error),
134+
};
135+
};
136+
match Future::poll(Pin::new(&mut *future.lock()), &mut cx) {
137+
Poll::Ready(val) => break Ok(val),
138+
Poll::Pending => continue,
139+
};
140+
}
141+
}
142+
}

0 commit comments

Comments
 (0)