Skip to content

Commit 7e42d44

Browse files
committed
Refactor to remove SubscriptionBase and SubscriptionHandle
1 parent e0dad8b commit 7e42d44

File tree

4 files changed

+61
-74
lines changed

4 files changed

+61
-74
lines changed

rclrs/src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,10 @@ pub fn spin_once(node: &Node, timeout: Option<Duration>) -> Result<(), RclrsErro
4040
};
4141
let mut wait_set = WaitSet::new(live_subscriptions.len(), &ctx)?;
4242

43-
for live_subscription in &live_subscriptions {
44-
wait_set.add_subscription(live_subscription.clone())?;
43+
for live_subscription in live_subscriptions {
44+
// SAFETY: The implementation of this trait function guarantees that the subscription
45+
// is not part of any other wait set. (TODO: issue #207)
46+
unsafe { live_subscription.add_to_wait_set(&mut wait_set)? };
4547
}
4648

4749
let ready_entities = wait_set.wait(timeout)?;

rclrs/src/node.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ pub use self::publisher::*;
66
pub use self::subscription::*;
77

88
use crate::rcl_bindings::*;
9-
use crate::{Context, ParameterOverrideMap, QoSProfile, RclrsError, ToResult};
9+
use crate::{Context, ParameterOverrideMap, QoSProfile, RclrsError, ToResult, Waitable};
1010

1111
use std::cmp::PartialEq;
1212
use std::ffi::CStr;
@@ -68,7 +68,7 @@ unsafe impl Send for rcl_node_t {}
6868
pub struct Node {
6969
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
7070
pub(crate) rcl_context_mtx: Arc<Mutex<rcl_context_t>>,
71-
pub(crate) subscriptions: Vec<Weak<dyn SubscriptionBase>>,
71+
pub(crate) subscriptions: Vec<Weak<dyn Waitable>>,
7272
_parameter_map: ParameterOverrideMap,
7373
}
7474

@@ -205,12 +205,12 @@ impl Node {
205205
{
206206
let subscription = Arc::new(Subscription::<T>::new(self, topic, qos, callback)?);
207207
self.subscriptions
208-
.push(Arc::downgrade(&subscription) as Weak<dyn SubscriptionBase>);
208+
.push(Arc::downgrade(&subscription) as Weak<dyn Waitable>);
209209
Ok(subscription)
210210
}
211211

212212
/// Returns the subscriptions that have not been dropped yet.
213-
pub(crate) fn live_subscriptions(&self) -> Vec<Arc<dyn SubscriptionBase>> {
213+
pub(crate) fn live_subscriptions(&self) -> Vec<Arc<dyn Waitable>> {
214214
self.subscriptions
215215
.iter()
216216
.filter_map(Weak::upgrade)

rclrs/src/node/subscription.rs

Lines changed: 22 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use crate::error::{RclReturnCode, ToResult};
2-
use crate::qos::QoSProfile;
3-
use crate::Node;
42
use crate::{rcl_bindings::*, RclrsError};
3+
use crate::{Node, QoSProfile, WaitSet, Waitable};
54

65
use std::boxed::Box;
76
use std::ffi::CStr;
@@ -11,25 +10,13 @@ use std::sync::Arc;
1110

1211
use rosidl_runtime_rs::{Message, RmwMessage};
1312

14-
use parking_lot::{Mutex, MutexGuard};
13+
use parking_lot::Mutex;
1514

1615
// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
1716
// they are running in. Therefore, this type can be safely sent to another thread.
1817
unsafe impl Send for rcl_subscription_t {}
1918

20-
/// Internal struct used by subscriptions.
21-
pub struct SubscriptionHandle {
22-
rcl_subscription_mtx: Mutex<rcl_subscription_t>,
23-
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
24-
}
25-
26-
impl SubscriptionHandle {
27-
pub(crate) fn lock(&self) -> MutexGuard<rcl_subscription_t> {
28-
self.rcl_subscription_mtx.lock()
29-
}
30-
}
31-
32-
impl Drop for SubscriptionHandle {
19+
impl<T: Message> Drop for Subscription<T> {
3320
fn drop(&mut self) {
3421
let rcl_subscription = self.rcl_subscription_mtx.get_mut();
3522
let rcl_node = &mut *self.rcl_node_mtx.lock();
@@ -40,14 +27,6 @@ impl Drop for SubscriptionHandle {
4027
}
4128
}
4229

43-
/// Trait to be implemented by concrete [`Subscription`]s.
44-
pub trait SubscriptionBase: Send + Sync {
45-
/// Internal function to get a reference to the `rcl` handle.
46-
fn handle(&self) -> &SubscriptionHandle;
47-
/// Tries to take a new message and run the callback with it.
48-
fn execute(&self) -> Result<(), RclrsError>;
49-
}
50-
5130
/// Struct for receiving messages of type `T`.
5231
///
5332
/// There can be multiple subscriptions for the same topic, in different nodes or the same node.
@@ -63,7 +42,8 @@ pub struct Subscription<T>
6342
where
6443
T: Message,
6544
{
66-
pub(crate) handle: Arc<SubscriptionHandle>,
45+
rcl_subscription_mtx: Mutex<rcl_subscription_t>,
46+
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
6747
/// The callback function that runs when a message was received.
6848
pub callback: Mutex<Box<dyn FnMut(T) + 'static + Send>>,
6949
message: PhantomData<T>,
@@ -113,13 +93,9 @@ where
11393
.ok()?;
11494
}
11595

116-
let handle = Arc::new(SubscriptionHandle {
96+
Ok(Self {
11797
rcl_subscription_mtx: Mutex::new(rcl_subscription),
11898
rcl_node_mtx: node.rcl_node_mtx.clone(),
119-
});
120-
121-
Ok(Self {
122-
handle,
12399
callback: Mutex::new(Box::new(callback)),
124100
message: PhantomData,
125101
})
@@ -133,7 +109,8 @@ where
133109
// SAFETY: No preconditions for the function used
134110
// The unsafe variables get converted to safe types before being returned
135111
unsafe {
136-
let raw_topic_pointer = rcl_subscription_get_topic_name(&*self.handle.lock());
112+
let raw_topic_pointer =
113+
rcl_subscription_get_topic_name(&*self.rcl_subscription_mtx.lock());
137114
CStr::from_ptr(raw_topic_pointer)
138115
.to_string_lossy()
139116
.into_owned()
@@ -164,7 +141,7 @@ where
164141
// ```
165142
pub fn take(&self) -> Result<T, RclrsError> {
166143
let mut rmw_message = <T as Message>::RmwMsg::default();
167-
let rcl_subscription = &mut *self.handle.lock();
144+
let rcl_subscription = &mut *self.rcl_subscription_mtx.lock();
168145
let ret = unsafe {
169146
// SAFETY: The first two pointers are valid/initialized, and do not need to be valid
170147
// beyond the function call.
@@ -181,12 +158,22 @@ where
181158
}
182159
}
183160

184-
impl<T> SubscriptionBase for Subscription<T>
161+
impl<T> Waitable for Subscription<T>
185162
where
186163
T: Message,
187164
{
188-
fn handle(&self) -> &SubscriptionHandle {
189-
&self.handle
165+
unsafe fn add_to_wait_set(self: Arc<Self>, wait_set: &mut WaitSet) -> Result<(), RclrsError> {
166+
// SAFETY: I'm not sure if it's required, but the subscription pointer will remain valid
167+
// for as long as the wait set exists, because it's stored in self.subscriptions.
168+
// Passing in a null pointer for the third argument is explicitly allowed.
169+
rcl_wait_set_add_subscription(
170+
&mut wait_set.rcl_wait_set,
171+
&*self.rcl_subscription_mtx.lock(),
172+
std::ptr::null_mut(),
173+
)
174+
.ok()?;
175+
wait_set.subscriptions.push(self);
176+
Ok(())
190177
}
191178

192179
fn execute(&self) -> Result<(), RclrsError> {

rclrs/src/wait.rs

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,55 @@
1717

1818
use crate::error::{to_rclrs_result, RclReturnCode, RclrsError, ToResult};
1919
use crate::rcl_bindings::*;
20-
use crate::{Context, SubscriptionBase};
20+
use crate::Context;
2121

2222
use std::sync::Arc;
2323
use std::time::Duration;
2424
use std::vec::Vec;
2525

2626
use parking_lot::Mutex;
2727

28+
/// Trait to be implemented by entities that can be waited on, like a [`Subscription`][1].
29+
///
30+
/// [1]: crate::Subscription
31+
pub trait Waitable: Send + Sync {
32+
/// Adds itself to the given wait set.
33+
///
34+
/// This will return an error if the number of waitables of this kind in the wait set is larger
35+
/// than the capacity set in [`WaitSet::new`].
36+
///
37+
/// # Safety
38+
///
39+
/// The same waitable must not be added to multiple wait sets, because that would make it
40+
/// unsafe to simultaneously wait on those wait sets. Quoting from the rcl docs:
41+
/// "This function is thread-safe for unique wait sets with unique contents.
42+
/// This function cannot operate on the same wait set in multiple threads, and
43+
/// the wait sets may not share content.
44+
/// For example, calling `rcl_wait()` in two threads on two different wait sets
45+
/// that both contain a single, shared guard condition is undefined behavior."
46+
///
47+
/// This function is unsafe because the implementation must ensure that the item is not yet in a
48+
/// different wait set.
49+
unsafe fn add_to_wait_set(self: Arc<Self>, wait_set: &mut WaitSet) -> Result<(), RclrsError>;
50+
/// Tries to take a new message and run the callback with it.
51+
fn execute(&self) -> Result<(), RclrsError>;
52+
}
53+
2854
/// A struct for waiting on subscriptions and other waitable entities to become ready.
2955
pub struct WaitSet {
30-
rcl_wait_set: rcl_wait_set_t,
56+
pub(crate) rcl_wait_set: rcl_wait_set_t,
3157
// Used to ensure the context is alive while the wait set is alive.
3258
_rcl_context_mtx: Arc<Mutex<rcl_context_t>>,
3359
// The subscriptions that are currently registered in the wait set.
3460
// This correspondence is an invariant that must be maintained by all functions,
3561
// even in the error case.
36-
subscriptions: Vec<Arc<dyn SubscriptionBase>>,
62+
pub(crate) subscriptions: Vec<Arc<dyn Waitable>>,
3763
}
3864

3965
/// A list of entities that are ready, returned by [`WaitSet::wait`].
4066
pub struct ReadyEntities {
4167
/// A list of subscriptions that have potentially received messages.
42-
pub subscriptions: Vec<Arc<dyn SubscriptionBase>>,
68+
pub subscriptions: Vec<Arc<dyn Waitable>>,
4369
}
4470

4571
impl Drop for rcl_wait_set_t {
@@ -56,7 +82,7 @@ impl WaitSet {
5682
/// Creates a new wait set.
5783
///
5884
/// The given number of subscriptions is a capacity, corresponding to how often
59-
/// [`WaitSet::add_subscription`] may be called.
85+
/// [`Waitable::add_to_wait_set`] may be called.
6086
pub fn new(number_of_subscriptions: usize, context: &Context) -> Result<Self, RclrsError> {
6187
let rcl_wait_set = unsafe {
6288
// SAFETY: Getting a zero-initialized value is always safe
@@ -98,34 +124,6 @@ impl WaitSet {
98124
debug_assert_eq!(ret, 0);
99125
}
100126

101-
/// Adds a subscription to the wait set.
102-
///
103-
/// It is possible, but not useful, to add the same subscription twice.
104-
///
105-
/// This will return an error if the number of subscriptions in the wait set is larger than the
106-
/// capacity set in [`WaitSet::new`].
107-
///
108-
/// The same subscription must not be added to multiple wait sets, because that would make it
109-
/// unsafe to simultaneously wait on those wait sets.
110-
pub fn add_subscription(
111-
&mut self,
112-
subscription: Arc<dyn SubscriptionBase>,
113-
) -> Result<(), RclrsError> {
114-
unsafe {
115-
// SAFETY: I'm not sure if it's required, but the subscription pointer will remain valid
116-
// for as long as the wait set exists, because it's stored in self.subscriptions.
117-
// Passing in a null pointer for the third argument is explicitly allowed.
118-
rcl_wait_set_add_subscription(
119-
&mut self.rcl_wait_set,
120-
&*subscription.handle().lock(),
121-
std::ptr::null_mut(),
122-
)
123-
}
124-
.ok()?;
125-
self.subscriptions.push(subscription);
126-
Ok(())
127-
}
128-
129127
/// Blocks until the wait set is ready, or until the timeout has been exceeded.
130128
///
131129
/// If the timeout is `None` then this function will block indefinitely until

0 commit comments

Comments
 (0)