Skip to content

Commit a386f64

Browse files
committed
Ensure that a waitable is only added to one wait set at a time
1 parent a635b0b commit a386f64

File tree

6 files changed

+101
-37
lines changed

6 files changed

+101
-37
lines changed

rclrs/src/error.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ pub enum RclrsError {
2727
/// The error indicating the position of the nul byte.
2828
err: NulError,
2929
},
30+
/// It was attempted to add a waitable to a wait set twice.
31+
AlreadyAddedToWaitSet,
3032
}
3133

3234
impl Display for RclrsError {
@@ -37,6 +39,12 @@ impl Display for RclrsError {
3739
RclrsError::StringContainsNul { s, .. } => {
3840
write!(f, "Could not convert string '{}' to CString", s)
3941
}
42+
RclrsError::AlreadyAddedToWaitSet => {
43+
write!(
44+
f,
45+
"Could not add entity to wait set because it was already added to a wait set"
46+
)
47+
}
4048
}
4149
}
4250
}
@@ -68,6 +76,7 @@ impl Error for RclrsError {
6876
RclrsError::RclError { msg, .. } => msg.as_ref().map(|e| e as &dyn Error),
6977
RclrsError::UnknownRclError { msg, .. } => msg.as_ref().map(|e| e as &dyn Error),
7078
RclrsError::StringContainsNul { err, .. } => Some(err).map(|e| e as &dyn Error),
79+
RclrsError::AlreadyAddedToWaitSet => None,
7180
}
7281
}
7382
}

rclrs/src/node/client.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
use crate::node::client::oneshot::Canceled;
21
use futures::channel::oneshot;
32
use std::boxed::Box;
43
use std::collections::HashMap;
54
use std::ffi::CString;
5+
use std::sync::atomic::AtomicBool;
66
use std::sync::Arc;
77

88
use crate::error::{RclReturnCode, ToResult};
@@ -21,6 +21,7 @@ unsafe impl Send for rcl_client_t {}
2121
pub struct ClientHandle {
2222
rcl_client_mtx: Mutex<rcl_client_t>,
2323
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
24+
pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
2425
}
2526

2627
impl ClientHandle {
@@ -40,15 +41,6 @@ impl Drop for ClientHandle {
4041
}
4142
}
4243

43-
impl From<Canceled> for RclrsError {
44-
fn from(_: Canceled) -> Self {
45-
RclrsError::RclError {
46-
code: RclReturnCode::Error,
47-
msg: None,
48-
}
49-
}
50-
}
51-
5244
/// Trait to be implemented by concrete Client structs.
5345
///
5446
/// See [`Client<T>`] for an example.
@@ -113,6 +105,7 @@ where
113105
let handle = Arc::new(ClientHandle {
114106
rcl_client_mtx: Mutex::new(rcl_client),
115107
rcl_node_mtx: node.rcl_node_mtx.clone(),
108+
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
116109
});
117110

118111
Ok(Self {

rclrs/src/node/service.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::boxed::Box;
22
use std::ffi::CString;
3+
use std::sync::atomic::AtomicBool;
34
use std::sync::Arc;
45

56
use crate::error::{RclReturnCode, ToResult};
@@ -20,6 +21,7 @@ unsafe impl Send for rcl_service_t {}
2021
pub struct ServiceHandle {
2122
handle: Mutex<rcl_service_t>,
2223
node_handle: Arc<Mutex<rcl_node_t>>,
24+
pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
2325
}
2426

2527
impl ServiceHandle {
@@ -103,6 +105,7 @@ where
103105
let handle = Arc::new(ServiceHandle {
104106
handle: Mutex::new(service_handle),
105107
node_handle: node.rcl_node_mtx.clone(),
108+
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
106109
});
107110

108111
Ok(Self {

rclrs/src/node/subscription.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::boxed::Box;
77
use std::ffi::CStr;
88
use std::ffi::CString;
99
use std::marker::PhantomData;
10+
use std::sync::atomic::AtomicBool;
1011
use std::sync::Arc;
1112

1213
use rosidl_runtime_rs::{Message, RmwMessage};
@@ -21,6 +22,7 @@ unsafe impl Send for rcl_subscription_t {}
2122
pub struct SubscriptionHandle {
2223
rcl_subscription_mtx: Mutex<rcl_subscription_t>,
2324
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
25+
pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
2426
}
2527

2628
impl SubscriptionHandle {
@@ -116,6 +118,7 @@ where
116118
let handle = Arc::new(SubscriptionHandle {
117119
rcl_subscription_mtx: Mutex::new(rcl_subscription),
118120
rcl_node_mtx: node.rcl_node_mtx.clone(),
121+
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
119122
});
120123

121124
Ok(Self {

rclrs/src/wait.rs

Lines changed: 47 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ use std::vec::Vec;
2525

2626
use parking_lot::Mutex;
2727

28+
mod exclusivity_guard;
29+
use exclusivity_guard::*;
30+
2831
/// A struct for waiting on subscriptions and other waitable entities to become ready.
2932
pub struct WaitSet {
3033
rcl_wait_set: rcl_wait_set_t,
@@ -33,9 +36,9 @@ pub struct WaitSet {
3336
// The subscriptions that are currently registered in the wait set.
3437
// This correspondence is an invariant that must be maintained by all functions,
3538
// even in the error case.
36-
subscriptions: Vec<Arc<dyn SubscriptionBase>>,
37-
clients: Vec<Arc<dyn ClientBase>>,
38-
services: Vec<Arc<dyn ServiceBase>>,
39+
subscriptions: Vec<ExclusivityGuard<Arc<dyn SubscriptionBase>>>,
40+
clients: Vec<ExclusivityGuard<Arc<dyn ClientBase>>>,
41+
services: Vec<ExclusivityGuard<Arc<dyn ServiceBase>>>,
3942
}
4043

4144
/// A list of entities that are ready, returned by [`WaitSet::wait`].
@@ -118,17 +121,22 @@ impl WaitSet {
118121

119122
/// Adds a subscription to the wait set.
120123
///
121-
/// It is possible, but not useful, to add the same subscription twice.
122-
///
123-
/// This will return an error if the number of subscriptions in the wait set is larger than the
124-
/// capacity set in [`WaitSet::new`].
124+
/// # Errors
125+
/// - If the subscription was already added to this wait set or another one,
126+
/// [`AlreadyAddedToWaitSet`][1] will be returned
127+
/// - If the number of subscriptions in the wait set is larger than the
128+
/// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
125129
///
126-
/// The same subscription must not be added to multiple wait sets, because that would make it
127-
/// unsafe to simultaneously wait on those wait sets.
130+
/// [1]: crate::RclrsError
131+
/// [2]: crate::RclReturnCode
128132
pub fn add_subscription(
129133
&mut self,
130134
subscription: Arc<dyn SubscriptionBase>,
131135
) -> Result<(), RclrsError> {
136+
let exclusive_subscription = ExclusivityGuard::new(
137+
Arc::clone(&subscription),
138+
Arc::clone(&subscription.handle().in_use_by_wait_set),
139+
)?;
132140
unsafe {
133141
// SAFETY: I'm not sure if it's required, but the subscription pointer will remain valid
134142
// for as long as the wait set exists, because it's stored in self.subscriptions.
@@ -140,20 +148,25 @@ impl WaitSet {
140148
)
141149
}
142150
.ok()?;
143-
self.subscriptions.push(subscription);
151+
self.subscriptions.push(exclusive_subscription);
144152
Ok(())
145153
}
146154

147155
/// Adds a client to the wait set.
148156
///
149-
/// It is possible, but not useful, to add the same client twice.
150-
///
151-
/// This will return an error if the number of clients in the wait set is larger than the
152-
/// capacity set in [`WaitSet::new`].
157+
/// # Errors
158+
/// - If the client was already added to this wait set or another one,
159+
/// [`AlreadyAddedToWaitSet`][1] will be returned
160+
/// - If the number of clients in the wait set is larger than the
161+
/// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
153162
///
154-
/// The same client must not be added to multiple wait sets, because that would make it
155-
/// unsafe to simultaneously wait on those wait sets.
163+
/// [1]: crate::RclrsError
164+
/// [2]: crate::RclReturnCode
156165
pub fn add_client(&mut self, client: Arc<dyn ClientBase>) -> Result<(), RclrsError> {
166+
let exclusive_client = ExclusivityGuard::new(
167+
Arc::clone(&client),
168+
Arc::clone(&client.handle().in_use_by_wait_set),
169+
)?;
157170
unsafe {
158171
// SAFETY: I'm not sure if it's required, but the client pointer will remain valid
159172
// for as long as the wait set exists, because it's stored in self.clients.
@@ -165,20 +178,25 @@ impl WaitSet {
165178
)
166179
}
167180
.ok()?;
168-
self.clients.push(client);
181+
self.clients.push(exclusive_client);
169182
Ok(())
170183
}
171184

172185
/// Adds a service to the wait set.
173186
///
174-
/// It is possible, but not useful, to add the same service twice.
175-
///
176-
/// This will return an error if the number of services in the wait set is larger than the
177-
/// capacity set in [`WaitSet::new`].
187+
/// # Errors
188+
/// - If the service was already added to this wait set or another one,
189+
/// [`AlreadyAddedToWaitSet`][1] will be returned
190+
/// - If the number of services in the wait set is larger than the
191+
/// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
178192
///
179-
/// The same service must not be added to multiple wait sets, because that would make it
180-
/// unsafe to simultaneously wait on those wait sets.
193+
/// [1]: crate::RclrsError
194+
/// [2]: crate::RclReturnCode
181195
pub fn add_service(&mut self, service: Arc<dyn ServiceBase>) -> Result<(), RclrsError> {
196+
let exclusive_service = ExclusivityGuard::new(
197+
Arc::clone(&service),
198+
Arc::clone(&service.handle().in_use_by_wait_set),
199+
)?;
182200
unsafe {
183201
// SAFETY: I'm not sure if it's required, but the service pointer will remain valid
184202
// for as long as the wait set exists, because it's stored in self.services.
@@ -190,7 +208,7 @@ impl WaitSet {
190208
)
191209
}
192210
.ok()?;
193-
self.services.push(service);
211+
self.services.push(exclusive_service);
194212
Ok(())
195213
}
196214

@@ -245,7 +263,9 @@ impl WaitSet {
245263
// https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
246264
let wait_set_entry = unsafe { *self.rcl_wait_set.subscriptions.add(i) };
247265
if !wait_set_entry.is_null() {
248-
ready_entities.subscriptions.push(subscription.clone());
266+
ready_entities
267+
.subscriptions
268+
.push(Arc::clone(&subscription.waitable));
249269
}
250270
}
251271
for (i, client) in self.clients.iter().enumerate() {
@@ -254,7 +274,7 @@ impl WaitSet {
254274
// https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
255275
let wait_set_entry = unsafe { *self.rcl_wait_set.clients.add(i) };
256276
if !wait_set_entry.is_null() {
257-
ready_entities.clients.push(client.clone());
277+
ready_entities.clients.push(Arc::clone(&client.waitable));
258278
}
259279
}
260280
for (i, service) in self.services.iter().enumerate() {
@@ -263,7 +283,7 @@ impl WaitSet {
263283
// https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
264284
let wait_set_entry = unsafe { *self.rcl_wait_set.services.add(i) };
265285
if !wait_set_entry.is_null() {
266-
ready_entities.services.push(service.clone());
286+
ready_entities.services.push(Arc::clone(&service.waitable));
267287
}
268288
}
269289
Ok(ready_entities)

rclrs/src/wait/exclusivity_guard.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use std::sync::atomic::{AtomicBool, Ordering};
2+
use std::sync::Arc;
3+
4+
use crate::RclrsError;
5+
6+
/// A helper struct for tracking whether the waitable is currently in a wait set.
7+
///
8+
/// When this struct is constructed, which happens when adding an entity to the wait set,
9+
/// it checks that the atomic boolean is false and sets it to true.
10+
/// When it is dropped, which happens when it is removed from the wait set,
11+
/// or the wait set itself is dropped, it sets the atomic bool to false.
12+
pub(super) struct ExclusivityGuard<T> {
13+
in_use_by_wait_set: Arc<AtomicBool>,
14+
pub(super) waitable: T,
15+
}
16+
17+
impl<T> Drop for ExclusivityGuard<T> {
18+
fn drop(&mut self) {
19+
self.in_use_by_wait_set.store(false, Ordering::Relaxed)
20+
}
21+
}
22+
23+
impl<T> ExclusivityGuard<T> {
24+
pub fn new(waitable: T, in_use_by_wait_set: Arc<AtomicBool>) -> Result<Self, RclrsError> {
25+
if in_use_by_wait_set
26+
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
27+
.is_err()
28+
{
29+
return Err(RclrsError::AlreadyAddedToWaitSet);
30+
}
31+
Ok(Self {
32+
in_use_by_wait_set,
33+
waitable,
34+
})
35+
}
36+
}

0 commit comments

Comments
 (0)