Skip to content

Commit 55fc519

Browse files
committed
Added rclcpp-like and rclpy-like executor
1 parent 7450810 commit 55fc519

File tree

7 files changed

+197
-38
lines changed

7 files changed

+197
-38
lines changed

examples/minimal_pub_sub/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ path = "src/minimal_subscriber.rs"
1313
name = "minimal_publisher"
1414
path = "src/minimal_publisher.rs"
1515

16+
[[bin]]
17+
name = "minimal_two_nodes"
18+
path = "src/minimal_two_nodes.rs"
19+
1620
[[bin]]
1721
name = "zero_copy_subscriber"
1822
path = "src/zero_copy_subscriber.rs"

examples/minimal_pub_sub/src/minimal_subscriber.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,9 @@ fn main() -> Result<(), Error> {
1919
},
2020
)?;
2121

22-
rclrs::spin(node).map_err(|err| err.into())
22+
let executor = rclrs::SingleThreadedExecutor::new();
23+
24+
executor.add_node(&node)?;
25+
26+
executor.spin().map_err(|err| err.into())
2327
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
use std::env;
2+
use std::sync::{Arc, RwLock};
3+
4+
use anyhow::{Error, Result};
5+
6+
struct MinimalSubscriberInner {
7+
num_messages: u32,
8+
node: Arc<rclrs::Node>,
9+
subscription: Option<Arc<rclrs::Subscription<std_msgs::msg::String>>>,
10+
}
11+
12+
struct MinimalSubscriber {
13+
inner: RwLock<MinimalSubscriberInner>,
14+
}
15+
16+
impl MinimalSubscriber {
17+
pub fn new(name: &str, topic: &str) -> Result<Arc<Self>, rclrs::RclrsError> {
18+
let context = rclrs::Context::new(env::args())?;
19+
let node = rclrs::create_node(&context, name)?;
20+
let minimal_subscriber = Arc::new(MinimalSubscriber {
21+
inner: RwLock::new(MinimalSubscriberInner {
22+
num_messages: 0,
23+
node: Arc::clone(&node),
24+
subscription: None,
25+
}),
26+
});
27+
28+
let minimal_subscriber_aux = Arc::clone(&minimal_subscriber);
29+
let subscription = node.create_subscription::<std_msgs::msg::String, _>(
30+
topic,
31+
rclrs::QOS_PROFILE_DEFAULT,
32+
move |msg: std_msgs::msg::String| {
33+
minimal_subscriber_aux.callback(msg);
34+
},
35+
)?;
36+
minimal_subscriber.inner.write().unwrap().subscription = Some(subscription);
37+
Ok(minimal_subscriber)
38+
}
39+
40+
fn callback(&self, msg: std_msgs::msg::String) {
41+
self.inner.write().unwrap().num_messages += 1;
42+
println!("[{}] I heard: '{}'", self.node().name(), msg.data);
43+
println!(
44+
"[{}] (Got {} messages so far)",
45+
self.node().name(),
46+
self.inner.read().unwrap().num_messages
47+
);
48+
}
49+
50+
pub fn node(&self) -> Arc<rclrs::Node> {
51+
Arc::clone(&self.inner.read().unwrap().node)
52+
}
53+
}
54+
55+
fn main() -> Result<(), Error> {
56+
let publisher_context = rclrs::Context::new(env::args())?;
57+
let publisher_node = rclrs::create_node(&publisher_context, "minimal_publisher")?;
58+
59+
let subscriber_node_one = MinimalSubscriber::new("minimal_subscriber_one", "topic")?;
60+
let subscriber_node_two = MinimalSubscriber::new("minimal_subscriber_two", "topic")?;
61+
62+
let publisher = publisher_node
63+
.create_publisher::<std_msgs::msg::String>("topic", rclrs::QOS_PROFILE_DEFAULT)?;
64+
65+
std::thread::spawn(move || -> Result<(), rclrs::RclrsError> {
66+
let mut message = std_msgs::msg::String::default();
67+
let mut publish_count: u32 = 1;
68+
loop {
69+
message.data = format!("Hello, world! {}", publish_count);
70+
println!("Publishing: [{}]", message.data);
71+
publisher.publish(&message)?;
72+
publish_count += 1;
73+
std::thread::sleep(std::time::Duration::from_millis(500));
74+
}
75+
});
76+
77+
let executor = rclrs::SingleThreadedExecutor::new();
78+
79+
executor.add_node(&publisher_node)?;
80+
executor.add_node(&subscriber_node_one.node())?;
81+
executor.add_node(&subscriber_node_two.node())?;
82+
83+
executor.spin().map_err(|err| err.into())
84+
}

rclrs/src/executor.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
use crate::rcl_bindings::rcl_context_is_valid;
2+
use crate::{Node, RclReturnCode, RclrsError, WaitSet};
3+
use std::sync::{Arc, Mutex, Weak};
4+
use std::time::Duration;
5+
6+
/// Single-threaded executor implementation.
7+
pub struct SingleThreadedExecutor {
8+
nodes_mtx: Mutex<Vec<Weak<Node>>>,
9+
}
10+
11+
impl Default for SingleThreadedExecutor {
12+
fn default() -> Self {
13+
Self::new()
14+
}
15+
}
16+
17+
impl SingleThreadedExecutor {
18+
/// Creates a new executor.
19+
pub fn new() -> Self {
20+
SingleThreadedExecutor {
21+
nodes_mtx: Mutex::new(Vec::new()),
22+
}
23+
}
24+
25+
/// Add a node to the executor.
26+
pub fn add_node(&self, node: &Arc<Node>) -> Result<(), RclrsError> {
27+
{ self.nodes_mtx.lock().unwrap() }.push(Arc::downgrade(node));
28+
Ok(())
29+
}
30+
31+
/// Remove a node from the executor.
32+
pub fn remove_node(&self, node: Arc<Node>) -> Result<(), RclrsError> {
33+
{ self.nodes_mtx.lock().unwrap() }
34+
.retain(|n| !n.upgrade().map(|n| Arc::ptr_eq(&n, &node)).unwrap_or(false));
35+
Ok(())
36+
}
37+
38+
/// Polls the nodes for new messages and executes the corresponding callbacks.
39+
pub fn spin_once(&self, timeout: Option<Duration>) -> Result<(), RclrsError> {
40+
for node in { self.nodes_mtx.lock().unwrap() }
41+
.iter()
42+
.filter_map(Weak::upgrade)
43+
.filter(|node| unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) })
44+
{
45+
let wait_set = WaitSet::new_for_node(&node)?;
46+
let ready_entities = wait_set.wait(timeout)?;
47+
48+
for ready_subscription in ready_entities.subscriptions {
49+
ready_subscription.execute()?;
50+
}
51+
52+
for ready_client in ready_entities.clients {
53+
ready_client.execute()?;
54+
}
55+
56+
for ready_service in ready_entities.services {
57+
ready_service.execute()?;
58+
}
59+
}
60+
61+
Ok(())
62+
}
63+
64+
/// Convenience function for calling [`spin_once`] in a loop.
65+
pub fn spin(&self) -> Result<(), RclrsError> {
66+
while !{ self.nodes_mtx.lock().unwrap() }.is_empty() {
67+
match self.spin_once(None) {
68+
Ok(_)
69+
| Err(RclrsError::RclError {
70+
code: RclReturnCode::Timeout,
71+
..
72+
}) => (),
73+
error => return error,
74+
}
75+
}
76+
77+
Ok(())
78+
}
79+
}

rclrs/src/lib.rs

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ mod arguments;
99
mod client;
1010
mod context;
1111
mod error;
12+
mod executor;
1213
mod node;
1314
mod parameter;
1415
mod publisher;
@@ -30,11 +31,11 @@ pub use arguments::*;
3031
pub use client::*;
3132
pub use context::*;
3233
pub use error::*;
34+
pub use executor::*;
3335
pub use node::*;
3436
pub use parameter::*;
3537
pub use publisher::*;
3638
pub use qos::*;
37-
use rcl_bindings::rcl_context_is_valid;
3839
pub use rcl_bindings::rmw_request_id_t;
3940
pub use service::*;
4041
pub use subscription::*;
@@ -51,46 +52,19 @@ pub use wait::*;
5152
///
5253
/// [1]: crate::RclReturnCode
5354
pub fn spin_once(node: Arc<Node>, timeout: Option<Duration>) -> Result<(), RclrsError> {
54-
let wait_set = WaitSet::new_for_node(&node)?;
55-
let ready_entities = wait_set.wait(timeout)?;
56-
57-
for ready_subscription in ready_entities.subscriptions {
58-
ready_subscription.execute()?;
59-
}
60-
61-
for ready_client in ready_entities.clients {
62-
ready_client.execute()?;
63-
}
64-
65-
for ready_service in ready_entities.services {
66-
ready_service.execute()?;
67-
}
68-
69-
Ok(())
55+
let executor = SingleThreadedExecutor::new();
56+
executor.add_node(&node)?;
57+
executor.spin_once(timeout)
7058
}
7159

7260
/// Convenience function for calling [`spin_once`] in a loop.
7361
///
7462
/// This function additionally checks that the context is still valid.
7563
pub fn spin(node: Arc<Node>) -> Result<(), RclrsError> {
76-
// The context_is_valid functions exists only to abstract away ROS distro differences
7764
// SAFETY: No preconditions for this function.
78-
let context_is_valid = {
79-
let node = Arc::clone(&node);
80-
move || unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) }
81-
};
82-
83-
while context_is_valid() {
84-
match spin_once(Arc::clone(&node), None) {
85-
Ok(_)
86-
| Err(RclrsError::RclError {
87-
code: RclReturnCode::Timeout,
88-
..
89-
}) => (),
90-
error => return error,
91-
}
92-
}
93-
Ok(())
65+
let executor = SingleThreadedExecutor::new();
66+
executor.add_node(&node)?;
67+
executor.spin()
9468
}
9569

9670
/// Creates a new node in the empty namespace.

rclrs/src/node.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,11 +238,16 @@ impl Node {
238238
&self,
239239
topic: &str,
240240
qos: QoSProfile,
241-
) -> Result<Publisher<T>, RclrsError>
241+
) -> Result<Arc<Publisher<T>>, RclrsError>
242242
where
243243
T: Message,
244244
{
245-
Publisher::<T>::new(Arc::clone(&self.rcl_node_mtx), topic, qos)
245+
let publisher = Arc::new(Publisher::<T>::new(
246+
Arc::clone(&self.rcl_node_mtx),
247+
topic,
248+
qos,
249+
)?);
250+
Ok(publisher)
246251
}
247252

248253
/// Creates a [`Service`][1].

rclrs/src/wait.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,16 @@ impl WaitSet {
337337
// We cannot currently guarantee that the wait sets may not share content, but it is
338338
// mentioned in the doc comment for `add_subscription`.
339339
// Also, the rcl_wait_set is obviously valid.
340-
unsafe { rcl_wait(&mut self.rcl_wait_set, timeout_ns) }.ok()?;
340+
match unsafe { rcl_wait(&mut self.rcl_wait_set, timeout_ns) }.ok() {
341+
Ok(_) => (),
342+
Err(error) => match error {
343+
RclrsError::RclError { code, msg } => match code {
344+
RclReturnCode::WaitSetEmpty => (),
345+
_ => return Err(RclrsError::RclError { code, msg }),
346+
},
347+
_ => return Err(error),
348+
},
349+
}
341350
let mut ready_entities = ReadyEntities {
342351
subscriptions: Vec::new(),
343352
clients: Vec::new(),

0 commit comments

Comments
 (0)