Skip to content

Commit 9880fdc

Browse files
committed
Added support for clients and services
1 parent 7daf8cc commit 9880fdc

File tree

22 files changed

+919
-16
lines changed

22 files changed

+919
-16
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
[package]
2+
name = "examples_rclrs_minimal_client_service"
3+
version = "0.2.0"
4+
authors = ["Esteve Fernandez <[email protected]>"]
5+
edition = "2021"
6+
7+
[[bin]]
8+
name = "minimal_client"
9+
path = "src/minimal_client.rs"
10+
11+
[[bin]]
12+
name = "minimal_client_async"
13+
path = "src/minimal_client_async.rs"
14+
15+
[[bin]]
16+
name = "minimal_service"
17+
path = "src/minimal_service.rs"
18+
19+
[dependencies]
20+
anyhow = {version = "1", features = ["backtrace"]}
21+
22+
[dependencies.rclrs]
23+
version = "*"
24+
25+
[dependencies.rosidl_runtime_rs]
26+
version = "*"
27+
28+
[dependencies.std_msgs]
29+
version = "*"
30+
31+
[dependencies.example_interfaces]
32+
version = "*"
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?xml version="1.0"?>
2+
<?xml-model
3+
href="http://download.ros.org/schema/package_format3.xsd"
4+
schematypens="http://www.w3.org/2001/XMLSchema"?>
5+
<package format="3">
6+
<name>examples_rclrs_minimal_client_service</name>
7+
<version>0.2.0</version>
8+
<description>Package containing an example of the client-service mechanism in rclrs.</description>
9+
<maintainer email="[email protected]">Esteve Fernandez</maintainer>
10+
<license>Apache License 2.0</license>
11+
12+
<build_depend>example_interfaces</build_depend>
13+
<build_depend>rclrs</build_depend>
14+
<build_depend>rosidl_runtime_rs</build_depend>
15+
<build_depend>std_msgs</build_depend>
16+
17+
<exec_depend>example_interfaces</exec_depend>
18+
<exec_depend>rclrs</exec_depend>
19+
<exec_depend>rosidl_runtime_rs</exec_depend>
20+
<exec_depend>std_msgs</exec_depend>
21+
22+
<export>
23+
<build_type>ament_cargo</build_type>
24+
</export>
25+
</package>
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use anyhow::{Error, Result};
2+
use std::env;
3+
4+
fn main() -> Result<(), Error> {
5+
let context = rclrs::Context::new(env::args()).unwrap();
6+
7+
let mut node = context.create_node("minimal_client")?;
8+
9+
let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;
10+
11+
let mut request = example_interfaces::srv::AddTwoInts_Request::default();
12+
request.a = 41;
13+
request.b = 1;
14+
15+
println!("Starting client");
16+
17+
std::thread::sleep(std::time::Duration::from_millis(500));
18+
19+
client.async_send_request_with_callback(
20+
&request,
21+
move |response: &example_interfaces::srv::AddTwoInts_Response| {
22+
println!(
23+
"Result of {} + {} is: {}",
24+
request.a, request.b, response.sum
25+
);
26+
},
27+
)?;
28+
29+
std::thread::sleep(std::time::Duration::from_millis(500));
30+
31+
println!("Waiting for response");
32+
rclrs::spin(&node).map_err(|err| err.into())
33+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use anyhow::{Error, Result};
2+
use std::env;
3+
4+
fn main() -> Result<(), Error> {
5+
let context = rclrs::Context::new(env::args()).unwrap();
6+
7+
let mut node = context.create_node("minimal_client")?;
8+
9+
let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;
10+
11+
let mut request = example_interfaces::srv::AddTwoInts_Request::default();
12+
request.a = 41;
13+
request.b = 1;
14+
15+
println!("Starting client");
16+
17+
std::thread::sleep(std::time::Duration::from_millis(500));
18+
19+
let future = client.call_async(&request)?;
20+
21+
println!("Waiting for response");
22+
let response = rclrs::spin_until_future_complete(&node, future.clone())?;
23+
24+
println!(
25+
"Result of {} + {} is: {}",
26+
request.a, request.b, response.sum
27+
);
28+
Ok(())
29+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use anyhow::{Error, Result};
2+
use std::env;
3+
4+
fn handle_service(
5+
_request_header: &rclrs::rmw_request_id_t,
6+
request: &example_interfaces::srv::AddTwoInts_Request,
7+
response: &mut example_interfaces::srv::AddTwoInts_Response,
8+
) {
9+
println!("request: {} + {}", request.a, request.b);
10+
response.sum = request.a + request.b;
11+
}
12+
13+
fn main() -> Result<(), Error> {
14+
let context = rclrs::Context::new(env::args()).unwrap();
15+
16+
let mut node = context.create_node("minimal_service")?;
17+
18+
let _server = node
19+
.create_service::<example_interfaces::srv::AddTwoInts, _>("add_two_ints", handle_service)?;
20+
21+
println!("Starting server");
22+
rclrs::spin(&node).map_err(|err| err.into())
23+
}

rclrs/src/future.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/// Based on https://www.viget.com/articles/understanding-futures-in-rust-part-1/
2+
use std::future::Future;
3+
use std::marker::PhantomData;
4+
use std::pin::Pin;
5+
use std::sync::Arc;
6+
use std::task::Context;
7+
use std::task::Poll;
8+
use std::task::RawWaker;
9+
use std::task::RawWakerVTable;
10+
use std::task::Waker;
11+
12+
use parking_lot::Mutex;
13+
14+
#[derive(Default)]
15+
pub struct RclFuture<T> {
16+
value: Option<T>,
17+
}
18+
19+
impl<T: Default + Clone> RclFuture<T> {
20+
pub fn new() -> RclFuture<T> {
21+
Self { value: None }
22+
}
23+
24+
pub fn set_value(&mut self, msg: T) {
25+
self.value = Some(msg);
26+
}
27+
}
28+
29+
impl<T: Clone> Future for RclFuture<T> {
30+
type Output = T;
31+
32+
fn poll(self: Pin<&mut Self>, _ctx: &mut Context) -> Poll<Self::Output> {
33+
if let Some(value) = &self.value {
34+
Poll::Ready(value.clone())
35+
} else {
36+
Poll::Pending
37+
}
38+
}
39+
}
40+
41+
#[derive(Clone)]
42+
pub struct RclWaker {}
43+
44+
fn rclwaker_wake(_s: &RclWaker) {}
45+
46+
fn rclwaker_wake_by_ref(_s: &RclWaker) {}
47+
48+
fn rclwaker_clone(s: &RclWaker) -> RawWaker {
49+
let arc = unsafe { Arc::from_raw(s) };
50+
std::mem::forget(arc.clone());
51+
RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
52+
}
53+
54+
const VTABLE: RawWakerVTable = unsafe {
55+
RawWakerVTable::new(
56+
|s| rclwaker_clone(&*(s as *const RclWaker)),
57+
|s| rclwaker_wake(&*(s as *const RclWaker)),
58+
|s| rclwaker_wake_by_ref(&*(s as *const RclWaker)),
59+
|s| drop(Arc::from_raw(s as *const RclWaker)),
60+
)
61+
};
62+
63+
pub fn rclwaker_into_waker(s: *const RclWaker) -> Waker {
64+
let raw_waker = RawWaker::new(s as *const (), &VTABLE);
65+
unsafe { Waker::from_raw(raw_waker) }
66+
}

rclrs/src/lib.rs

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,15 @@
55
//!
66
//! [1]: https://github.com/ros2-rust/ros2_rust/blob/master/README.md
77
8+
use std::future::Future;
9+
use std::pin::Pin;
10+
use std::sync::Arc;
11+
use std::task::Poll;
12+
use std::time::Duration;
13+
814
mod context;
915
mod error;
16+
mod future;
1017
mod node;
1118
mod qos;
1219
mod wait;
@@ -20,37 +27,70 @@ pub use qos::*;
2027
pub use wait::*;
2128

2229
use rcl_bindings::rcl_context_is_valid;
23-
use std::time::Duration;
30+
31+
pub use rcl_bindings::rmw_request_id_t;
32+
33+
use parking_lot::Mutex;
2434

2535
/// Polls the node for new messages and executes the corresponding callbacks.
2636
///
2737
/// See [`WaitSet::wait`] for the meaning of the `timeout` parameter.
2838
///
2939
/// This may under some circumstances return
30-
/// [`SubscriptionTakeFailed`][1] when the wait set spuriously wakes up.
40+
/// [`SubscriptionTakeFailed`][1], [`ClientTakeFailed`][2], [`ServiceTakeFailed`][3] when the wait
41+
/// set spuriously wakes up.
3142
/// This can usually be ignored.
3243
///
3344
/// [1]: crate::SubscriberErrorCode
45+
/// [2]: crate::ClientErrorCode
46+
/// [3]: crate::ServiceErrorCode
3447
pub fn spin_once(node: &Node, timeout: Option<Duration>) -> Result<(), RclrsError> {
3548
let live_subscriptions = node.live_subscriptions();
49+
let live_clients = node.live_clients();
50+
let live_services = node.live_services();
3651
let ctx = Context {
3752
handle: node.context.clone(),
3853
};
39-
let mut wait_set = WaitSet::new(live_subscriptions.len(), &ctx)?;
54+
let mut wait_set = WaitSet::new(
55+
live_subscriptions.len(),
56+
0,
57+
0,
58+
live_clients.len(),
59+
live_services.len(),
60+
0,
61+
&ctx,
62+
)?;
4063

4164
for live_subscription in &live_subscriptions {
4265
wait_set.add_subscription(live_subscription.clone())?;
4366
}
4467

68+
for live_client in &live_clients {
69+
wait_set.add_client(live_client.clone())?;
70+
}
71+
72+
for live_service in &live_services {
73+
wait_set.add_service(live_service.clone())?;
74+
}
75+
4576
let ready_entities = wait_set.wait(timeout)?;
77+
4678
for ready_subscription in ready_entities.subscriptions {
4779
ready_subscription.execute()?;
4880
}
4981

82+
for ready_client in ready_entities.clients {
83+
ready_client.execute()?;
84+
}
85+
86+
for ready_service in ready_entities.services {
87+
ready_service.execute()?;
88+
}
89+
5090
Ok(())
5191
}
5292

53-
/// Convenience function for calling [`spin_once`] in a loop.
93+
/// Convenience function for calling [`rclrs::spin_once`] in a loop.
5494
///
5595
/// This function additionally checks that the context is still valid.
5696
pub fn spin(node: &Node) -> Result<(), RclrsError> {
@@ -70,6 +110,33 @@ pub fn spin(node: &Node) -> Result<(), RclrsError> {
70110
};
71111
}
72112
}
73-
74113
Ok(())
75114
}
115+
116+
pub fn spin_until_future_complete<T: Unpin + Clone>(
117+
node: &node::Node,
118+
mut future: Arc<Mutex<Box<crate::future::RclFuture<T>>>>,
119+
) -> Result<<future::RclFuture<T> as Future>::Output, RclrsError> {
120+
let rclwaker = Arc::new(crate::future::RclWaker {});
121+
let waker = crate::future::rclwaker_into_waker(Arc::into_raw(rclwaker));
122+
let mut cx = std::task::Context::from_waker(&waker);
123+
124+
loop {
125+
let context_valid = unsafe { rcl_context_is_valid(&mut *node.context.lock()) };
126+
if context_valid {
127+
if let Some(error) = spin_once(node, None).err() {
128+
match error {
129+
RclrsError {
130+
code: RclReturnCode::Timeout,
131+
..
132+
} => continue,
133+
error => return Err(error),
134+
};
135+
};
136+
match Future::poll(Pin::new(&mut *future.lock()), &mut cx) {
137+
Poll::Ready(val) => break Ok(val),
138+
Poll::Pending => continue,
139+
};
140+
}
141+
}
142+
}

rclrs/src/node/builder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,8 @@ impl NodeBuilder {
184184
Ok(Node {
185185
handle,
186186
context: self.context.clone(),
187+
clients: std::vec![],
188+
services: std::vec![],
187189
subscriptions: std::vec![],
188190
})
189191
}

0 commit comments

Comments
 (0)