Skip to content

Commit 346f38f

Browse files
committed
Ensure that a waitable is only added to one wait set at a time
1 parent 15d62fa commit 346f38f

File tree

5 files changed

+100
-31
lines changed

5 files changed

+100
-31
lines changed

rclrs/src/error.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ pub enum RclrsError {
2727
/// The error indicating the position of the nul byte.
2828
err: NulError,
2929
},
30+
/// It was attempted to add a waitable to a wait set twice.
31+
AlreadyAddedToWaitSet,
3032
}
3133

3234
impl Display for RclrsError {
@@ -37,6 +39,12 @@ impl Display for RclrsError {
3739
RclrsError::StringContainsNul { s, .. } => {
3840
write!(f, "Could not convert string '{}' to CString", s)
3941
}
42+
RclrsError::AlreadyAddedToWaitSet => {
43+
write!(
44+
f,
45+
"Could not add entity to wait set because it was already added to a wait set"
46+
)
47+
}
4048
}
4149
}
4250
}
@@ -68,6 +76,7 @@ impl Error for RclrsError {
6876
RclrsError::RclError { msg, .. } => msg.as_ref().map(|e| e as &dyn Error),
6977
RclrsError::UnknownRclError { msg, .. } => msg.as_ref().map(|e| e as &dyn Error),
7078
RclrsError::StringContainsNul { err, .. } => Some(err).map(|e| e as &dyn Error),
79+
RclrsError::AlreadyAddedToWaitSet => None,
7180
}
7281
}
7382
}

rclrs/src/node/client.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@ use futures::channel::oneshot;
22
use std::boxed::Box;
33
use std::collections::HashMap;
44
use std::ffi::CString;
5+
use std::sync::atomic::AtomicBool;
56
use std::sync::Arc;
67

78
use crate::rcl_bindings::*;
8-
use crate::{MessageCow, Node, RclReturnCode, RclrsError, ToResult, WaitSet, Waitable};
9+
use crate::{
10+
ExclusivityGuard, MessageCow, Node, RclReturnCode, RclrsError, ToResult, WaitSet, Waitable,
11+
};
912

1013
use parking_lot::Mutex;
1114
use rosidl_runtime_rs::Message;
@@ -27,6 +30,7 @@ where
2730
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
2831
requests: Mutex<HashMap<RequestId, RequestValue<T::Response>>>,
2932
futures: Arc<Mutex<HashMap<RequestId, oneshot::Sender<T::Response>>>>,
33+
in_use_by_wait_set: Arc<AtomicBool>,
3034
}
3135

3236
impl<T> Drop for Client<T>
@@ -48,6 +52,9 @@ where
4852
T: rosidl_runtime_rs::Service,
4953
{
5054
unsafe fn add_to_wait_set(self: Arc<Self>, wait_set: &mut WaitSet) -> Result<(), RclrsError> {
55+
// First, ensure that self is not yet used by a wait set.
56+
let client_exclusive =
57+
ExclusivityGuard::new(Arc::clone(&self) as _, Arc::clone(&self.in_use_by_wait_set))?;
5158
// SAFETY: I'm not sure if it's required, but the client pointer will remain valid
5259
// for as long as the wait set exists, because it's stored in self.clients.
5360
// Passing in a null pointer for the third argument is explicitly allowed.
@@ -57,7 +64,7 @@ where
5764
std::ptr::null_mut(),
5865
)
5966
.ok()?;
60-
wait_set.clients.push(self);
67+
wait_set.clients.push(client_exclusive);
6168
Ok(())
6269
}
6370

@@ -134,6 +141,7 @@ where
134141
futures: Arc::new(Mutex::new(
135142
HashMap::<RequestId, oneshot::Sender<T::Response>>::new(),
136143
)),
144+
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
137145
})
138146
}
139147

rclrs/src/node/service.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use std::boxed::Box;
22
use std::ffi::CString;
3+
use std::sync::atomic::AtomicBool;
34
use std::sync::Arc;
45

56
use crate::rcl_bindings::*;
6-
use crate::{Node, RclReturnCode, RclrsError, ToResult, WaitSet, Waitable};
7+
use crate::{ExclusivityGuard, Node, RclReturnCode, RclrsError, ToResult, WaitSet, Waitable};
78

89
use rosidl_runtime_rs::Message;
910

@@ -27,6 +28,7 @@ where
2728
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
2829
/// The callback function that runs when a request was received.
2930
pub callback: Mutex<ServiceCallback<T::Request, T::Response>>,
31+
in_use_by_wait_set: Arc<AtomicBool>,
3032
}
3133

3234
impl<T> Drop for Service<T>
@@ -48,6 +50,9 @@ where
4850
T: rosidl_runtime_rs::Service,
4951
{
5052
unsafe fn add_to_wait_set(self: Arc<Self>, wait_set: &mut WaitSet) -> Result<(), RclrsError> {
53+
// First, ensure that self is not yet used by a wait set.
54+
let service_exclusive =
55+
ExclusivityGuard::new(Arc::clone(&self) as _, Arc::clone(&self.in_use_by_wait_set))?;
5156
// SAFETY: I'm not sure if it's required, but the service pointer will remain valid
5257
// for as long as the wait set exists, because it's stored in self.clients.
5358
// Passing in a null pointer for the third argument is explicitly allowed.
@@ -57,7 +62,7 @@ where
5762
std::ptr::null_mut(),
5863
)
5964
.ok()?;
60-
wait_set.services.push(self);
65+
wait_set.services.push(service_exclusive);
6166
Ok(())
6267
}
6368

@@ -136,6 +141,7 @@ where
136141
rcl_service_mtx: Mutex::new(service_handle),
137142
rcl_node_mtx: Arc::clone(&node.rcl_node_mtx),
138143
callback: Mutex::new(Box::new(callback)),
144+
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
139145
})
140146
}
141147

rclrs/src/node/subscription.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use crate::error::{RclReturnCode, ToResult};
22
use crate::{rcl_bindings::*, RclrsError};
3-
use crate::{Node, QoSProfile, WaitSet, Waitable};
3+
use crate::{ExclusivityGuard, Node, QoSProfile, WaitSet, Waitable};
44

55
use std::boxed::Box;
66
use std::ffi::CStr;
77
use std::ffi::CString;
88
use std::marker::PhantomData;
9+
use std::sync::atomic::AtomicBool;
910
use std::sync::Arc;
1011

1112
use rosidl_runtime_rs::{Message, RmwMessage};
@@ -36,6 +37,7 @@ where
3637
/// The callback function that runs when a message was received.
3738
pub callback: Mutex<Box<dyn FnMut(T) + 'static + Send>>,
3839
message: PhantomData<T>,
40+
in_use_by_wait_set: Arc<AtomicBool>,
3941
}
4042

4143
impl<T: Message> Drop for Subscription<T> {
@@ -54,6 +56,9 @@ where
5456
T: Message,
5557
{
5658
unsafe fn add_to_wait_set(self: Arc<Self>, wait_set: &mut WaitSet) -> Result<(), RclrsError> {
59+
// First, ensure that self is not yet used by a wait set.
60+
let subscription_exclusive =
61+
ExclusivityGuard::new(Arc::clone(&self) as _, Arc::clone(&self.in_use_by_wait_set))?;
5762
// SAFETY: I'm not sure if it's required, but the subscription pointer will remain valid
5863
// for as long as the wait set exists, because it's stored in self.subscriptions.
5964
// Passing in a null pointer for the third argument is explicitly allowed.
@@ -63,7 +68,7 @@ where
6368
std::ptr::null_mut(),
6469
)
6570
.ok()?;
66-
wait_set.subscriptions.push(self);
71+
wait_set.subscriptions.push(subscription_exclusive);
6772
Ok(())
6873
}
6974

@@ -139,6 +144,7 @@ where
139144
rcl_node_mtx: Arc::clone(&node.rcl_node_mtx),
140145
callback: Mutex::new(Box::new(callback)),
141146
message: PhantomData,
147+
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
142148
})
143149
}
144150

rclrs/src/wait.rs

Lines changed: 65 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,47 @@ use crate::error::{to_rclrs_result, RclReturnCode, RclrsError, ToResult};
1919
use crate::rcl_bindings::*;
2020
use crate::{ClientWaitable, Context, ServiceWaitable, SubscriptionWaitable};
2121

22-
use std::sync::Arc;
22+
use std::sync::{
23+
atomic::{AtomicBool, Ordering},
24+
Arc,
25+
};
2326
use std::time::Duration;
2427
use std::vec::Vec;
2528

2629
use parking_lot::Mutex;
2730

31+
/// A helper struct for tracking whether the waitable is currently in a wait set.
32+
///
33+
/// When this struct is constructed, which happens when adding an entity to the wait set,
34+
/// it checks that the atomic boolean is false and sets it to true.
35+
/// When it is dropped, which happens when it is removed from the wait set,
36+
/// or the wait set itself is dropped, it sets the atomic bool to false.
37+
pub(crate) struct ExclusivityGuard<T> {
38+
in_use_by_wait_set: Arc<AtomicBool>,
39+
waitable: T,
40+
}
41+
42+
impl<T> Drop for ExclusivityGuard<T> {
43+
fn drop(&mut self) {
44+
self.in_use_by_wait_set.store(false, Ordering::Relaxed)
45+
}
46+
}
47+
48+
impl<T> ExclusivityGuard<T> {
49+
pub fn new(waitable: T, in_use_by_wait_set: Arc<AtomicBool>) -> Result<Self, RclrsError> {
50+
if in_use_by_wait_set
51+
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
52+
.is_err()
53+
{
54+
return Err(RclrsError::AlreadyAddedToWaitSet);
55+
}
56+
Ok(Self {
57+
in_use_by_wait_set,
58+
waitable,
59+
})
60+
}
61+
}
62+
2863
/// Trait to be implemented by entities that can be waited on, like a [`Subscription`][1].
2964
///
3065
/// [1]: crate::Subscription
@@ -59,9 +94,9 @@ pub struct WaitSet {
5994
// The subscriptions that are currently registered in the wait set.
6095
// This correspondence is an invariant that must be maintained by all functions,
6196
// even in the error case.
62-
pub(crate) subscriptions: Vec<Arc<dyn SubscriptionWaitable>>,
63-
pub(crate) clients: Vec<Arc<dyn ClientWaitable>>,
64-
pub(crate) services: Vec<Arc<dyn ServiceWaitable>>,
97+
pub(crate) subscriptions: Vec<ExclusivityGuard<Arc<dyn SubscriptionWaitable>>>,
98+
pub(crate) clients: Vec<ExclusivityGuard<Arc<dyn ClientWaitable>>>,
99+
pub(crate) services: Vec<ExclusivityGuard<Arc<dyn ServiceWaitable>>>,
65100
}
66101

67102
/// A list of entities that are ready, returned by [`WaitSet::wait`].
@@ -128,13 +163,14 @@ impl WaitSet {
128163

129164
/// Adds a client to the wait set.
130165
///
131-
/// It is possible, but not useful, to add the same client twice.
132-
///
133-
/// This will return an error if the number of clients in the wait set is larger than the
134-
/// capacity set in [`WaitSet::new`].
166+
/// # Errors
167+
/// - If the client was already added to this wait set or another one,
168+
/// [`AlreadyAddedToWaitSet`][1] will be returned
169+
/// - If the number of clients in the wait set is larger than the
170+
/// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
135171
///
136-
/// The same client must not be added to multiple wait sets, because that would make it
137-
/// unsafe to simultaneously wait on those wait sets.
172+
/// [1]: crate::RclrsError
173+
/// [2]: crate::RclReturnCode
138174
pub fn add_client(&mut self, client: Arc<dyn ClientWaitable>) -> Result<(), RclrsError> {
139175
// SAFETY: The implementation of this trait for clients checks that the client
140176
// has not already been added to a different wait set.
@@ -143,13 +179,14 @@ impl WaitSet {
143179

144180
/// Adds a service to the wait set.
145181
///
146-
/// It is possible, but not useful, to add the same service twice.
147-
///
148-
/// This will return an error if the number of services in the wait set is larger than the
149-
/// capacity set in [`WaitSet::new`].
182+
/// # Errors
183+
/// - If the service was already added to this wait set or another one,
184+
/// [`AlreadyAddedToWaitSet`][1] will be returned
185+
/// - If the number of services in the wait set is larger than the
186+
/// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
150187
///
151-
/// The same service must not be added to multiple wait sets, because that would make it
152-
/// unsafe to simultaneously wait on those wait sets.
188+
/// [1]: crate::RclrsError
189+
/// [2]: crate::RclReturnCode
153190
pub fn add_service(&mut self, service: Arc<dyn ServiceWaitable>) -> Result<(), RclrsError> {
154191
// SAFETY: The implementation of this trait for services checks that the service
155192
// has not already been added to a different wait set.
@@ -158,13 +195,14 @@ impl WaitSet {
158195

159196
/// Adds a subscription to the wait set.
160197
///
161-
/// It is possible, but not useful, to add the same subscription twice.
162-
///
163-
/// This will return an error if the number of subscriptions in the wait set is larger than the
164-
/// capacity set in [`WaitSet::new`].
198+
/// # Errors
199+
/// - If the subscription was already added to this wait set or another one,
200+
/// [`AlreadyAddedToWaitSet`][1] will be returned
201+
/// - If the number of subscriptions in the wait set is larger than the
202+
/// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
165203
///
166-
/// The same subscription must not be added to multiple wait sets, because that would make it
167-
/// unsafe to simultaneously wait on those wait sets.
204+
/// [1]: crate::RclrsError
205+
/// [2]: crate::RclReturnCode
168206
pub fn add_subscription(
169207
&mut self,
170208
subscription: Arc<dyn SubscriptionWaitable>,
@@ -241,7 +279,9 @@ impl WaitSet {
241279
// https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
242280
let wait_set_entry = unsafe { *self.rcl_wait_set.subscriptions.add(i) };
243281
if !wait_set_entry.is_null() {
244-
ready_entities.subscriptions.push(subscription.clone());
282+
ready_entities
283+
.subscriptions
284+
.push(Arc::clone(&subscription.waitable));
245285
}
246286
}
247287
for (i, client) in self.clients.iter().enumerate() {
@@ -250,7 +290,7 @@ impl WaitSet {
250290
// https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
251291
let wait_set_entry = unsafe { *self.rcl_wait_set.clients.add(i) };
252292
if !wait_set_entry.is_null() {
253-
ready_entities.clients.push(client.clone());
293+
ready_entities.clients.push(Arc::clone(&client.waitable));
254294
}
255295
}
256296
for (i, service) in self.services.iter().enumerate() {
@@ -259,7 +299,7 @@ impl WaitSet {
259299
// https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
260300
let wait_set_entry = unsafe { *self.rcl_wait_set.services.add(i) };
261301
if !wait_set_entry.is_null() {
262-
ready_entities.services.push(service.clone());
302+
ready_entities.services.push(Arc::clone(&service.waitable));
263303
}
264304
}
265305
Ok(ready_entities)

0 commit comments

Comments
 (0)