Skip to content

Commit 27dc3c4

Browse files
committed
Added support for clients and services
1 parent 7daf8cc commit 27dc3c4

File tree

25 files changed

+952
-17
lines changed

25 files changed

+952
-17
lines changed

examples/message_demo/package.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
<build_depend>rclrs</build_depend>
1313
<build_depend>rosidl_runtime_rs</build_depend>
1414
<build_depend>rclrs_example_msgs</build_depend>
15+
<build_depend>example_interfaces</build_depend>
1516

1617
<exec_depend>rclrs</exec_depend>
1718
<exec_depend>rosidl_runtime_rs</exec_depend>
1819
<exec_depend>rclrs_example_msgs</exec_depend>
20+
<exec_depend>example_interfaces</exec_depend>
1921

2022
<export>
2123
<build_type>ament_cargo</build_type>
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
[package]
2+
name = "examples_rclrs_minimal_client_service"
3+
version = "0.2.0"
4+
authors = ["Esteve Fernandez <[email protected]>"]
5+
edition = "2021"
6+
7+
[[bin]]
8+
name = "minimal_client"
9+
path = "src/minimal_client.rs"
10+
11+
[[bin]]
12+
name = "minimal_client_async"
13+
path = "src/minimal_client_async.rs"
14+
15+
[[bin]]
16+
name = "minimal_service"
17+
path = "src/minimal_service.rs"
18+
19+
[dependencies]
20+
anyhow = {version = "1", features = ["backtrace"]}
21+
22+
[dependencies.rclrs]
23+
version = "*"
24+
25+
[dependencies.rosidl_runtime_rs]
26+
version = "*"
27+
28+
[dependencies.std_msgs]
29+
version = "*"
30+
31+
[dependencies.rclrs_example_msgs]
32+
version = "*"
33+
34+
[dependencies.example_interfaces]
35+
version = "*"
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?xml version="1.0"?>
2+
<?xml-model
3+
href="http://download.ros.org/schema/package_format3.xsd"
4+
schematypens="http://www.w3.org/2001/XMLSchema"?>
5+
<package format="3">
6+
<name>examples_rclrs_minimal_pub_sub</name>
7+
<version>0.2.0</version>
8+
<description>Package containing an example of the publish-subscribe mechanism in rclrs.</description>
9+
<maintainer email="[email protected]">Esteve Fernandez</maintainer>
10+
<license>Apache License 2.0</license>
11+
12+
<build_depend>rclrs</build_depend>
13+
<build_depend>rosidl_runtime_rs</build_depend>
14+
<build_depend>std_msgs</build_depend>
15+
16+
<exec_depend>rclrs</exec_depend>
17+
<exec_depend>rosidl_runtime_rs</exec_depend>
18+
<exec_depend>std_msgs</exec_depend>
19+
20+
<export>
21+
<build_type>ament_cargo</build_type>
22+
</export>
23+
</package>
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use anyhow::{Error, Result};
2+
use std::env;
3+
4+
fn main() -> Result<(), Error> {
5+
let context = rclrs::Context::new(env::args()).unwrap();
6+
7+
let mut node = context.create_node("minimal_client")?;
8+
9+
let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;
10+
11+
let mut request = example_interfaces::srv::AddTwoInts_Request::default();
12+
request.a = 41;
13+
request.b = 1;
14+
15+
println!("Starting client");
16+
17+
std::thread::sleep(std::time::Duration::from_millis(500));
18+
19+
client.async_send_request_with_callback(
20+
&request,
21+
move |response: &example_interfaces::srv::AddTwoInts_Response| {
22+
println!(
23+
"Result of {} + {} is: {}",
24+
request.a, request.b, response.sum
25+
);
26+
},
27+
)?;
28+
29+
std::thread::sleep(std::time::Duration::from_millis(500));
30+
31+
println!("Waiting for response");
32+
rclrs::spin(&node).map_err(|err| err.into())
33+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use anyhow::{Error, Result};
2+
use std::env;
3+
4+
fn main() -> Result<(), Error> {
5+
let context = rclrs::Context::new(env::args()).unwrap();
6+
7+
let mut node = context.create_node("minimal_client")?;
8+
9+
let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;
10+
11+
let mut request = example_interfaces::srv::AddTwoInts_Request::default();
12+
request.a = 41;
13+
request.b = 1;
14+
15+
println!("Starting client");
16+
17+
std::thread::sleep(std::time::Duration::from_millis(500));
18+
19+
let future = client.call_async(&request)?;
20+
21+
println!("Waiting for response");
22+
let response = rclrs::spin_until_future_complete(&node, future.clone())?;
23+
24+
println!(
25+
"Result of {} + {} is: {}",
26+
request.a, request.b, response.sum
27+
);
28+
Ok(())
29+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
use anyhow::{Error, Result};
2+
use std::env;
3+
4+
fn main() -> Result<(), Error> {
5+
let context = rclrs::Context::new(env::args())?;
6+
7+
let node = context.create_node("minimal_publisher")?;
8+
9+
let publisher =
10+
node.create_publisher::<std_msgs::msg::String>("topic", rclrs::QOS_PROFILE_DEFAULT)?;
11+
12+
let mut message = std_msgs::msg::String::default();
13+
14+
let mut publish_count: u32 = 1;
15+
16+
while context.ok() {
17+
message.data = format!("Hello, world! {}", publish_count);
18+
println!("Publishing: [{}]", message.data);
19+
publisher.publish(&message)?;
20+
publish_count += 1;
21+
std::thread::sleep(std::time::Duration::from_millis(500));
22+
}
23+
Ok(())
24+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use anyhow::{Error, Result};
2+
use std::env;
3+
4+
fn handle_service(
5+
_request_header: &rclrs::rmw_request_id_t,
6+
request: &example_interfaces::srv::AddTwoInts_Request,
7+
response: &mut example_interfaces::srv::AddTwoInts_Response,
8+
) {
9+
println!("request: {} + {}", request.a, request.b);
10+
response.sum = request.a + request.b;
11+
}
12+
13+
fn main() -> Result<(), Error> {
14+
let context = rclrs::Context::new(env::args()).unwrap();
15+
16+
let mut node = context.create_node("minimal_service")?;
17+
18+
let _server = node
19+
.create_service::<example_interfaces::srv::AddTwoInts, _>("add_two_ints", handle_service)?;
20+
21+
println!("Starting server");
22+
rclrs::spin(&node).map_err(|err| err.into())
23+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use std::env;
2+
3+
use anyhow::{Error, Result};
4+
5+
fn main() -> Result<(), Error> {
6+
let context = rclrs::Context::new(env::args())?;
7+
8+
let mut node = context.create_node("minimal_subscriber")?;
9+
10+
let mut num_messages: usize = 0;
11+
12+
let _subscription = node.create_subscription::<std_msgs::msg::String, _>(
13+
"topic",
14+
rclrs::QOS_PROFILE_DEFAULT,
15+
move |msg: std_msgs::msg::String| {
16+
num_messages += 1;
17+
println!("I heard: '{}'", msg.data);
18+
println!("(Got {} messages so far)", num_messages);
19+
},
20+
)?;
21+
22+
rclrs::spin(&node).map_err(|err| err.into())
23+
}

examples/minimal_pub_sub/Cargo.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ path = "src/minimal_subscriber.rs"
1313
name = "minimal_publisher"
1414
path = "src/minimal_publisher.rs"
1515

16+
[[bin]]
17+
name = "message_demo"
18+
path = "src/message_demo.rs"
19+
1620
[dependencies]
1721
anyhow = {version = "1", features = ["backtrace"]}
1822

@@ -24,3 +28,9 @@ version = "*"
2428

2529
[dependencies.std_msgs]
2630
version = "*"
31+
32+
[dependencies.rclrs_example_msgs]
33+
version = "*"
34+
35+
[dependencies.example_interfaces]
36+
version = "*"

rclrs/src/future.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/// Based on https://www.viget.com/articles/understanding-futures-in-rust-part-1/
2+
use core::marker::PhantomData;
3+
use parking_lot::Mutex;
4+
use std::future::Future;
5+
use std::pin::Pin;
6+
use std::sync::Arc;
7+
use std::task::Context;
8+
use std::task::Poll;
9+
10+
#[derive(Default)]
11+
pub struct RclFuture<T> {
12+
value: Option<T>,
13+
}
14+
15+
impl<T: Default + Clone> RclFuture<T> {
16+
pub fn new() -> RclFuture<T> {
17+
Self { value: None }
18+
}
19+
20+
pub fn set_value(&mut self, msg: T) {
21+
self.value = Some(msg);
22+
}
23+
}
24+
25+
impl<T: Clone> Future for RclFuture<T> {
26+
type Output = T;
27+
28+
fn poll(self: Pin<&mut Self>, _ctx: &mut Context) -> Poll<Self::Output> {
29+
if let Some(value) = &self.value {
30+
Poll::Ready(value.clone())
31+
} else {
32+
Poll::Pending
33+
}
34+
}
35+
}

rclrs/src/lib.rs

Lines changed: 89 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,35 +22,69 @@ pub use wait::*;
2222
use rcl_bindings::rcl_context_is_valid;
2323
use std::time::Duration;
2424

25+
use std::pin::Pin;
26+
27+
pub use rcl_bindings::rmw_request_id_t;
28+
2529
/// Polls the node for new messages and executes the corresponding callbacks.
2630
///
2731
/// See [`WaitSet::wait`] for the meaning of the `timeout` parameter.
2832
///
2933
/// This may under some circumstances return
30-
/// [`SubscriptionTakeFailed`][1] when the wait set spuriously wakes up.
34+
/// [`SubscriptionTakeFailed`][1], [`ClientTakeFailed`][2], [`ServiceTakeFailed`][3] when the wait
35+
/// set spuriously wakes up.
3136
/// This can usually be ignored.
3237
///
3338
/// [1]: crate::SubscriberErrorCode
39+
/// [2]: crate::ClientErrorCode
40+
/// [3]: crate::ServiceErrorCode
3441
pub fn spin_once(node: &Node, timeout: Option<Duration>) -> Result<(), RclrsError> {
3542
let live_subscriptions = node.live_subscriptions();
43+
let live_clients = node.live_clients();
44+
let live_services = node.live_services();
3645
let ctx = Context {
3746
handle: node.context.clone(),
3847
};
39-
let mut wait_set = WaitSet::new(live_subscriptions.len(), &ctx)?;
48+
let mut wait_set = WaitSet::new(
49+
live_subscriptions.len(),
50+
0,
51+
0,
52+
live_clients.len(),
53+
live_services.len(),
54+
0,
55+
&ctx,
56+
)?;
4057

4158
for live_subscription in &live_subscriptions {
4259
wait_set.add_subscription(live_subscription.clone())?;
4360
}
4461

62+
for live_client in &live_clients {
63+
wait_set.add_client(live_client.clone())?;
64+
}
65+
66+
for live_service in &live_services {
67+
wait_set.add_service(live_service.clone())?;
68+
}
69+
4570
let ready_entities = wait_set.wait(timeout)?;
71+
4672
for ready_subscription in ready_entities.subscriptions {
4773
ready_subscription.execute()?;
4874
}
4975

76+
for ready_client in ready_entities.clients {
77+
ready_client.execute()?;
78+
}
79+
80+
for ready_service in ready_entities.services {
81+
ready_service.execute()?;
82+
}
83+
5084
Ok(())
5185
}
5286

53-
/// Convenience function for calling [`spin_once`] in a loop.
87+
/// Convenience function for calling [`rclrs::spin_once`] in a loop.
5488
///
5589
/// This function additionally checks that the context is still valid.
5690
pub fn spin(node: &Node) -> Result<(), RclrsError> {
@@ -70,6 +104,57 @@ pub fn spin(node: &Node) -> Result<(), RclrsError> {
70104
};
71105
}
72106
}
73-
74107
Ok(())
75108
}
109+
110+
#[derive(Clone)]
111+
struct RclWaker {}
112+
113+
fn rclwaker_wake(_s: &RclWaker) {}
114+
115+
fn rclwaker_wake_by_ref(_s: &RclWaker) {}
116+
117+
fn rclwaker_clone(s: &RclWaker) -> RawWaker {
118+
let arc = unsafe { Arc::from_raw(s) };
119+
std::mem::forget(arc.clone());
120+
RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
121+
}
122+
123+
const VTABLE: RawWakerVTable = unsafe {
124+
RawWakerVTable::new(
125+
|s| rclwaker_clone(&*(s as *const RclWaker)),
126+
|s| rclwaker_wake(&*(s as *const RclWaker)),
127+
|s| rclwaker_wake_by_ref(&*(s as *const RclWaker)),
128+
|s| drop(Arc::from_raw(s as *const RclWaker)),
129+
)
130+
};
131+
132+
fn rclwaker_into_waker(s: *const RclWaker) -> Waker {
133+
let raw_waker = RawWaker::new(s as *const (), &VTABLE);
134+
unsafe { Waker::from_raw(raw_waker) }
135+
}
136+
137+
pub fn spin_until_future_complete<T: Unpin + Clone>(
138+
node: &node::Node,
139+
mut future: Arc<Mutex<Box<RclFuture<T>>>>,
140+
) -> Result<<future::RclFuture<T> as Future>::Output, RclReturnCode> {
141+
let rclwaker = Arc::new(RclWaker {});
142+
let waker = rclwaker_into_waker(Arc::into_raw(rclwaker));
143+
let mut cx = std::task::Context::from_waker(&waker);
144+
145+
loop {
146+
let context_valid = unsafe { rcl_context_is_valid(&mut *node.context.lock() as *mut _) };
147+
if context_valid {
148+
if let Some(error) = spin_once(node, None).err() {
149+
match error {
150+
RclReturnCode::Timeout => continue,
151+
error => return Err(error),
152+
};
153+
};
154+
match Future::poll(Pin::new(&mut *future.lock()), &mut cx) {
155+
Poll::Ready(val) => break Ok(val),
156+
Poll::Pending => continue,
157+
};
158+
}
159+
}
160+
}

0 commit comments

Comments
 (0)