Skip to content

Commit 9914a64

Browse files
committed
Use tokio runtime for examples. Use futures::Future as future implementation for clients
1 parent a61ba94 commit 9914a64

File tree

7 files changed

+93
-34
lines changed

7 files changed

+93
-34
lines changed

examples/minimal_client_service/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ path = "src/minimal_service.rs"
1818

1919
[dependencies]
2020
anyhow = {version = "1", features = ["backtrace"]}
21+
tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread", "time"] }
22+
# Needed for clients
23+
futures = "0.3"
2124

2225
[dependencies.rclrs]
2326
version = "*"
Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,38 @@
11
use anyhow::{Error, Result};
2+
use futures::join;
3+
use futures::Future;
4+
use rclrs::Node;
25
use std::env;
6+
use std::thread;
7+
use tokio;
38

4-
fn main() -> Result<(), Error> {
9+
#[tokio::main]
10+
async fn main() -> Result<(), Error> {
511
let context = rclrs::Context::new(env::args()).unwrap();
612

713
let mut node = context.create_node("minimal_client")?;
814

915
let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;
1016

11-
let request = example_interfaces::srv::AddTwoInts_Request { a: 41, b: 1 };
12-
1317
println!("Starting client");
1418

1519
std::thread::sleep(std::time::Duration::from_millis(500));
1620

17-
let future = client.call_async(&request)?;
21+
let request = example_interfaces::srv::AddTwoInts_Request { a: 41, b: 1 };
22+
23+
let future = client.call_async(&request);
1824

1925
println!("Waiting for response");
20-
let response = rclrs::spin_until_future_complete(&node, future)?;
2126

27+
let spin_thread = std::thread::spawn(move || {
28+
rclrs::spin(&node);
29+
});
30+
31+
let response = future.await;
2232
println!(
23-
"Result of {} + {} is: {}",
24-
request.a, request.b, response.sum
25-
);
33+
"Result of {} + {} is: {}",
34+
request.a, request.b, response.unwrap().sum
35+
);
36+
spin_thread.join();
2637
Ok(())
2738
}

rclrs/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ libc = "0.2.43"
1717
parking_lot = "0.11.2"
1818
# Needed for the Message trait, among others
1919
rosidl_runtime_rs = "*"
20+
# Needed for clients
21+
futures = "0.3"
2022

2123
[build-dependencies]
2224
# Needed for FFI

rclrs/src/lib.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,29 @@ pub fn spin(node: &Node) -> Result<(), RclrsError> {
113113
Ok(())
114114
}
115115

116+
/// Convenience function for calling [`rclrs::spin_once`] in a loop.
117+
///
118+
/// This function additionally checks that the context is still valid.
119+
pub fn spin_some(node: &Node) -> Result<(), RclrsError> {
120+
// The context_is_valid functions exists only to abstract away ROS distro differences
121+
#[cfg(ros_distro = "foxy")]
122+
// SAFETY: No preconditions for this function.
123+
let context_is_valid = || unsafe { rcl_context_is_valid(&mut *node.context.lock()) };
124+
#[cfg(not(ros_distro = "foxy"))]
125+
// SAFETY: No preconditions for this function.
126+
let context_is_valid = || unsafe { rcl_context_is_valid(&*node.context.lock()) };
127+
128+
if context_is_valid() {
129+
if let Some(error) = spin_once(node, Some(std::time::Duration::from_millis(500))).err() {
130+
match error.code {
131+
RclReturnCode::Timeout => (),
132+
_ => return Err(error),
133+
}
134+
}
135+
}
136+
Ok(())
137+
}
138+
116139
pub fn spin_until_future_complete<T: Unpin + Clone>(
117140
node: &node::Node,
118141
future: Arc<Mutex<Box<crate::future::RclFuture<T>>>>,

rclrs/src/node.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ impl Node {
233233
) -> Result<Arc<crate::node::service::Service<T>>, RclrsError>
234234
where
235235
T: rosidl_runtime_rs::Service + 'static,
236-
F: FnMut(&rmw_request_id_t, &T::Request, &mut T::Response) + Sized + 'static,
236+
F: FnMut(&rmw_request_id_t, &T::Request, &mut T::Response) + 'static + Send,
237237
{
238238
let service = Arc::new(crate::node::service::Service::<T>::new(
239239
self, topic, callback,

rclrs/src/node/client.rs

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
#![warn(missing_docs)]
2+
use crate::node::client::oneshot::Canceled;
3+
use futures::channel::{mpsc, oneshot};
24
use std::borrow::Borrow;
35
use std::boxed::Box;
46
use std::collections::HashMap;
@@ -15,6 +17,10 @@ use crate::{rcl_bindings::*, RclrsError};
1517
use parking_lot::{Mutex, MutexGuard};
1618
use rosidl_runtime_rs::Message;
1719

20+
// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
21+
// they are running in. Therefore, this type can be safely sent to another thread.
22+
unsafe impl Send for rcl_client_t {}
23+
1824
pub struct ClientHandle {
1925
handle: Mutex<rcl_client_t>,
2026
node_handle: Arc<Mutex<rcl_node_t>>,
@@ -36,9 +42,18 @@ impl Drop for ClientHandle {
3642
}
3743
}
3844

45+
impl From<Canceled> for RclrsError {
46+
fn from(_: Canceled) -> Self {
47+
RclrsError {
48+
code: RclReturnCode::Error,
49+
msg: None,
50+
}
51+
}
52+
}
53+
3954
/// Trait to be implemented by concrete Client structs
4055
/// See [`Client<T>`] for an example
41-
pub trait ClientBase {
56+
pub trait ClientBase: Send + Sync {
4257
fn handle(&self) -> &ClientHandle;
4358
fn execute(&self) -> Result<(), RclrsError>;
4459
}
@@ -49,8 +64,8 @@ where
4964
T: rosidl_runtime_rs::Service,
5065
{
5166
pub(crate) handle: Arc<ClientHandle>,
52-
requests: Mutex<HashMap<i64, Mutex<Box<dyn FnMut(&T::Response) + 'static>>>>,
53-
futures: Mutex<HashMap<i64, Arc<Mutex<Box<RclFuture<T::Response>>>>>>,
67+
requests: Mutex<HashMap<i64, Mutex<Box<dyn FnMut(&T::Response) + 'static + Send>>>>,
68+
futures: Arc<Mutex<HashMap<i64, oneshot::Sender<T::Response>>>>,
5469
sequence_number: AtomicI64,
5570
}
5671

@@ -89,7 +104,9 @@ where
89104
Ok(Self {
90105
handle,
91106
requests: Mutex::new(HashMap::new()),
92-
futures: Mutex::new(HashMap::new()),
107+
futures: Arc::new(Mutex::new(
108+
HashMap::<i64, oneshot::Sender<T::Response>>::new(),
109+
)),
93110
sequence_number: AtomicI64::new(0),
94111
})
95112
}
@@ -112,14 +129,13 @@ where
112129
callback: F,
113130
) -> Result<(), RclrsError>
114131
where
115-
F: FnMut(&T::Response) + Sized + 'static,
132+
F: FnMut(&T::Response) + 'static + Send,
116133
{
117134
let rmw_message = T::Request::into_rmw_message(message.into_cow());
118-
let handle = &mut *self.handle.lock();
119135
let mut sequence_number = self.sequence_number.load(Ordering::SeqCst);
120136
let ret = unsafe {
121137
rcl_send_request(
122-
handle as *mut _,
138+
&*self.handle.lock() as *const _,
123139
rmw_message.as_ref() as *const <T::Request as Message>::RmwMsg as *mut _,
124140
&mut sequence_number,
125141
)
@@ -130,7 +146,7 @@ where
130146
ret.ok()
131147
}
132148

133-
/// Send a requests with a callback as a parameter.
149+
/// Send a request with a callback as a parameter.
134150
///
135151
/// The [`MessageCow`] trait is implemented by any
136152
/// [`Message`] as well as any reference to a `Message`.
@@ -142,31 +158,25 @@ where
142158
///
143159
/// Hence, when a message will not be needed anymore after publishing, pass it by value.
144160
/// When a message will be needed again after publishing, pass it by reference, instead of cloning and passing by value.
145-
pub fn call_async<'a, R: MessageCow<'a, T::Request>>(
161+
pub async fn call_async<'a, R: MessageCow<'a, T::Request>>(
146162
&self,
147163
request: R,
148-
) -> Result<Arc<Mutex<Box<RclFuture<T::Response>>>>, RclrsError>
164+
) -> Result<T::Response, RclrsError>
149165
where
150166
T: rosidl_runtime_rs::Service + 'static,
151167
{
152168
let rmw_message = T::Request::into_rmw_message(request.into_cow());
153-
let handle = &mut *self.handle.lock();
154169
let mut sequence_number = self.sequence_number.load(Ordering::SeqCst);
155170
let ret = unsafe {
156171
rcl_send_request(
157-
handle as *mut _,
172+
&*self.handle.lock() as *const _,
158173
rmw_message.as_ref() as *const <T::Request as Message>::RmwMsg as *mut _,
159174
&mut sequence_number,
160175
)
161176
};
162-
let response = Arc::new(Mutex::new(Box::new(RclFuture::<T::Response>::new())));
163-
{
164-
let futures = &mut *self.futures.lock();
165-
futures.insert(sequence_number, response.clone());
166-
}
167-
self.sequence_number.swap(sequence_number, Ordering::SeqCst);
168-
ret.ok()?;
169-
Ok(response)
177+
let (tx, rx) = oneshot::channel::<T::Response>();
178+
self.futures.lock().insert(sequence_number, tx);
179+
Ok(rx.await?)
170180
}
171181

172182
/// Ask RMW for the data
@@ -232,8 +242,13 @@ where
232242
let callback = requests.remove(&req_id.sequence_number).unwrap();
233243
(*callback.lock())(&res);
234244
} else if futures.contains_key(&req_id.sequence_number) {
235-
let future = futures.remove(&req_id.sequence_number).unwrap();
236-
(&mut *future.lock()).set_value(res);
245+
futures
246+
.remove(&req_id.sequence_number)
247+
.unwrap_or_else(|| panic!("fail to find key in Client::process_requests"))
248+
.send(res)
249+
.unwrap_or_else(|_| {
250+
panic!("fail to send response via channel in Client::process_requests")
251+
});
237252
}
238253
Ok(())
239254
}

rclrs/src/node/service.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ use crate::node::publisher::MessageCow;
1313

1414
use parking_lot::{Mutex, MutexGuard};
1515

16+
// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
17+
// they are running in. Therefore, this type can be safely sent to another thread.
18+
unsafe impl Send for rcl_service_t {}
19+
1620
pub struct ServiceHandle {
1721
handle: Mutex<rcl_service_t>,
1822
node_handle: Arc<Mutex<rcl_node_t>>,
@@ -36,7 +40,7 @@ impl Drop for ServiceHandle {
3640

3741
/// Trait to be implemented by concrete Service structs
3842
/// See [`Service<T>`] for an example
39-
pub trait ServiceBase {
43+
pub trait ServiceBase: Send + Sync {
4044
fn handle(&self) -> &ServiceHandle;
4145
fn execute(&self) -> Result<(), RclrsError>;
4246
}
@@ -48,7 +52,8 @@ where
4852
{
4953
pub handle: Arc<ServiceHandle>,
5054
// The callback's lifetime should last as long as we need it to
51-
pub callback: Mutex<Box<dyn FnMut(&rmw_request_id_t, &T::Request, &mut T::Response) + 'static>>,
55+
pub callback:
56+
Mutex<Box<dyn FnMut(&rmw_request_id_t, &T::Request, &mut T::Response) + 'static + Send>>,
5257
}
5358

5459
impl<T> Service<T>
@@ -58,7 +63,7 @@ where
5863
pub fn new<F>(node: &Node, topic: &str, callback: F) -> Result<Self, RclrsError>
5964
where
6065
T: rosidl_runtime_rs::Service,
61-
F: FnMut(&rmw_request_id_t, &T::Request, &mut T::Response) + Sized + 'static,
66+
F: FnMut(&rmw_request_id_t, &T::Request, &mut T::Response) + 'static + Send,
6267
{
6368
let mut service_handle = unsafe { rcl_get_zero_initialized_service() };
6469
let type_support = <T as rosidl_runtime_rs::Service>::get_type_support()

0 commit comments

Comments
 (0)