Skip to content

Commit 27c535f

Browse files
committed
Added support for clients and services
1 parent cb87f98 commit 27c535f

File tree

21 files changed

+851
-17
lines changed

21 files changed

+851
-17
lines changed

rclrs/src/future.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/// Based on https://www.viget.com/articles/understanding-futures-in-rust-part-1/
2+
use core::marker::PhantomData;
3+
use parking_lot::Mutex;
4+
use std::future::Future;
5+
use std::pin::Pin;
6+
use std::sync::Arc;
7+
use std::task::Context;
8+
use std::task::Poll;
9+
10+
#[derive(Default)]
11+
pub struct RclFuture<T> {
12+
value: Option<T>,
13+
}
14+
15+
impl<T: Default + Clone> RclFuture<T> {
16+
pub fn new() -> RclFuture<T> {
17+
Self { value: None }
18+
}
19+
20+
pub fn set_value(&mut self, msg: T) {
21+
self.value = Some(msg);
22+
}
23+
}
24+
25+
impl<T: Clone> Future for RclFuture<T> {
26+
type Output = T;
27+
28+
fn poll(self: Pin<&mut Self>, _ctx: &mut Context) -> Poll<Self::Output> {
29+
if let Some(value) = &self.value {
30+
Poll::Ready(value.clone())
31+
} else {
32+
Poll::Pending
33+
}
34+
}
35+
}

rclrs/src/lib.rs

Lines changed: 89 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,35 +22,69 @@ pub use wait::*;
2222
use rcl_bindings::rcl_context_is_valid;
2323
use std::time::Duration;
2424

25+
use std::pin::Pin;
26+
27+
pub use rcl_bindings::rmw_request_id_t;
28+
2529
/// Polls the node for new messages and executes the corresponding callbacks.
2630
///
2731
/// See [`WaitSet::wait`] for the meaning of the `timeout` parameter.
2832
///
2933
/// This may under some circumstances return
30-
/// [`SubscriptionTakeFailed`][1] when the wait set spuriously wakes up.
34+
/// [`SubscriptionTakeFailed`][1], [`ClientTakeFailed`][2], [`ServiceTakeFailed`][3] when the wait
35+
/// set spuriously wakes up.
3136
/// This can usually be ignored.
3237
///
3338
/// [1]: crate::SubscriberErrorCode
39+
/// [2]: crate::ClientErrorCode
40+
/// [3]: crate::ServiceErrorCode
3441
pub fn spin_once(node: &Node, timeout: Option<Duration>) -> Result<(), RclReturnCode> {
3542
let live_subscriptions = node.live_subscriptions();
43+
let live_clients = node.live_clients();
44+
let live_services = node.live_services();
3645
let ctx = Context {
3746
handle: node.context.clone(),
3847
};
39-
let mut wait_set = WaitSet::new(live_subscriptions.len(), &ctx)?;
48+
let mut wait_set = WaitSet::new(
49+
live_subscriptions.len(),
50+
0,
51+
0,
52+
live_clients.len(),
53+
live_services.len(),
54+
0,
55+
&ctx,
56+
)?;
4057

4158
for live_subscription in &live_subscriptions {
4259
wait_set.add_subscription(live_subscription.clone())?;
4360
}
4461

62+
for live_client in &live_clients {
63+
wait_set.add_client(live_client.clone())?;
64+
}
65+
66+
for live_service in &live_services {
67+
wait_set.add_service(live_service.clone())?;
68+
}
69+
4570
let ready_entities = wait_set.wait(timeout)?;
71+
4672
for ready_subscription in ready_entities.subscriptions {
4773
ready_subscription.execute()?;
4874
}
4975

76+
for ready_client in ready_entities.clients {
77+
ready_client.execute()?;
78+
}
79+
80+
for ready_service in ready_entities.services {
81+
ready_service.execute()?;
82+
}
83+
5084
Ok(())
5185
}
5286

53-
/// Convenience function for calling [`spin_once`] in a loop.
87+
/// Convenience function for calling [`rclrs::spin_once`] in a loop.
5488
///
5589
/// This function additionally checks that the context is still valid.
5690
pub fn spin(node: &Node) -> Result<(), RclReturnCode> {
@@ -70,6 +104,57 @@ pub fn spin(node: &Node) -> Result<(), RclReturnCode> {
70104
};
71105
}
72106
}
73-
74107
Ok(())
75108
}
109+
110+
#[derive(Clone)]
111+
struct RclWaker {}
112+
113+
fn rclwaker_wake(_s: &RclWaker) {}
114+
115+
fn rclwaker_wake_by_ref(_s: &RclWaker) {}
116+
117+
fn rclwaker_clone(s: &RclWaker) -> RawWaker {
118+
let arc = unsafe { Arc::from_raw(s) };
119+
std::mem::forget(arc.clone());
120+
RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
121+
}
122+
123+
const VTABLE: RawWakerVTable = unsafe {
124+
RawWakerVTable::new(
125+
|s| rclwaker_clone(&*(s as *const RclWaker)),
126+
|s| rclwaker_wake(&*(s as *const RclWaker)),
127+
|s| rclwaker_wake_by_ref(&*(s as *const RclWaker)),
128+
|s| drop(Arc::from_raw(s as *const RclWaker)),
129+
)
130+
};
131+
132+
fn rclwaker_into_waker(s: *const RclWaker) -> Waker {
133+
let raw_waker = RawWaker::new(s as *const (), &VTABLE);
134+
unsafe { Waker::from_raw(raw_waker) }
135+
}
136+
137+
pub fn spin_until_future_complete<T: Unpin + Clone>(
138+
node: &node::Node,
139+
mut future: Arc<Mutex<Box<RclFuture<T>>>>,
140+
) -> Result<<future::RclFuture<T> as Future>::Output, RclReturnCode> {
141+
let rclwaker = Arc::new(RclWaker {});
142+
let waker = rclwaker_into_waker(Arc::into_raw(rclwaker));
143+
let mut cx = std::task::Context::from_waker(&waker);
144+
145+
loop {
146+
let context_valid = unsafe { rcl_context_is_valid(&mut *node.context.lock() as *mut _) };
147+
if context_valid {
148+
if let Some(error) = spin_once(node, None).err() {
149+
match error {
150+
RclReturnCode::Timeout => continue,
151+
error => return Err(error),
152+
};
153+
};
154+
match Future::poll(Pin::new(&mut *future.lock()), &mut cx) {
155+
Poll::Ready(val) => break Ok(val),
156+
Poll::Pending => continue,
157+
};
158+
}
159+
}
160+
}

0 commit comments

Comments
 (0)