Skip to content

Commit bd64a89

Browse files
committed
Refactor to remove *Handle structs
1 parent a635b0b commit bd64a89

File tree

6 files changed

+244
-340
lines changed

6 files changed

+244
-340
lines changed

rclrs/src/lib.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,22 @@ pub fn spin_once(node: &Node, timeout: Option<Duration>) -> Result<(), RclrsErro
5353
&ctx,
5454
)?;
5555

56-
for live_subscription in &live_subscriptions {
57-
wait_set.add_subscription(live_subscription.clone())?;
56+
for live_subscription in live_subscriptions {
57+
// SAFETY: The implementation of this trait function guarantees that the subscription
58+
// is not part of any other wait set. (TODO: issue #207)
59+
unsafe { live_subscription.add_to_wait_set(&mut wait_set)? };
5860
}
5961

60-
for live_client in &live_clients {
61-
wait_set.add_client(live_client.clone())?;
62+
for live_client in live_clients {
63+
// SAFETY: The implementation of this trait function guarantees that the client
64+
// is not part of any other wait set. (TODO: issue #207)
65+
unsafe { live_client.add_to_wait_set(&mut wait_set)? };
6266
}
6367

64-
for live_service in &live_services {
65-
wait_set.add_service(live_service.clone())?;
68+
for live_service in live_services {
69+
// SAFETY: The implementation of this trait function guarantees that the client
70+
// is not part of any other wait set. (TODO: issue #207)
71+
unsafe { live_service.add_to_wait_set(&mut wait_set)? };
6672
}
6773

6874
let ready_entities = wait_set.wait(timeout)?;

rclrs/src/node.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ pub use self::service::*;
1010
pub use self::subscription::*;
1111

1212
use crate::rcl_bindings::*;
13-
use crate::{Context, ParameterOverrideMap, QoSProfile, RclrsError, ToResult};
13+
use crate::{Context, ParameterOverrideMap, QoSProfile, RclrsError, ToResult, Waitable};
1414

1515
use std::cmp::PartialEq;
1616
use std::ffi::CStr;
@@ -72,9 +72,9 @@ unsafe impl Send for rcl_node_t {}
7272
pub struct Node {
7373
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
7474
pub(crate) rcl_context_mtx: Arc<Mutex<rcl_context_t>>,
75-
pub(crate) clients: Vec<Weak<dyn ClientBase>>,
76-
pub(crate) services: Vec<Weak<dyn ServiceBase>>,
77-
pub(crate) subscriptions: Vec<Weak<dyn SubscriptionBase>>,
75+
pub(crate) clients: Vec<Weak<dyn Waitable>>,
76+
pub(crate) services: Vec<Weak<dyn Waitable>>,
77+
pub(crate) subscriptions: Vec<Weak<dyn Waitable>>,
7878
_parameter_map: ParameterOverrideMap,
7979
}
8080

@@ -193,7 +193,7 @@ impl Node {
193193
{
194194
let client = Arc::new(crate::node::client::Client::<T>::new(self, topic)?);
195195
self.clients
196-
.push(Arc::downgrade(&client) as Weak<dyn ClientBase>);
196+
.push(Arc::downgrade(&client) as Weak<dyn Waitable>);
197197
Ok(client)
198198
}
199199

@@ -229,7 +229,7 @@ impl Node {
229229
self, topic, callback,
230230
)?);
231231
self.services
232-
.push(Arc::downgrade(&service) as Weak<dyn ServiceBase>);
232+
.push(Arc::downgrade(&service) as Weak<dyn Waitable>);
233233
Ok(service)
234234
}
235235

@@ -249,23 +249,23 @@ impl Node {
249249
{
250250
let subscription = Arc::new(Subscription::<T>::new(self, topic, qos, callback)?);
251251
self.subscriptions
252-
.push(Arc::downgrade(&subscription) as Weak<dyn SubscriptionBase>);
252+
.push(Arc::downgrade(&subscription) as Weak<dyn Waitable>);
253253
Ok(subscription)
254254
}
255255

256256
/// Returns the subscriptions that have not been dropped yet.
257-
pub(crate) fn live_subscriptions(&self) -> Vec<Arc<dyn SubscriptionBase>> {
257+
pub(crate) fn live_subscriptions(&self) -> Vec<Arc<dyn Waitable>> {
258258
self.subscriptions
259259
.iter()
260260
.filter_map(Weak::upgrade)
261261
.collect()
262262
}
263263

264-
pub(crate) fn live_clients(&self) -> Vec<Arc<dyn ClientBase>> {
264+
pub(crate) fn live_clients(&self) -> Vec<Arc<dyn Waitable>> {
265265
self.clients.iter().filter_map(Weak::upgrade).collect()
266266
}
267267

268-
pub(crate) fn live_services(&self) -> Vec<Arc<dyn ServiceBase>> {
268+
pub(crate) fn live_services(&self) -> Vec<Arc<dyn Waitable>> {
269269
self.services.iter().filter_map(Weak::upgrade).collect()
270270
}
271271

rclrs/src/node/client.rs

Lines changed: 64 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,76 +1,88 @@
1-
use crate::node::client::oneshot::Canceled;
21
use futures::channel::oneshot;
32
use std::boxed::Box;
43
use std::collections::HashMap;
54
use std::ffi::CString;
65
use std::sync::Arc;
76

8-
use crate::error::{RclReturnCode, ToResult};
9-
use crate::MessageCow;
10-
use crate::Node;
11-
use crate::{rcl_bindings::*, RclrsError};
7+
use crate::rcl_bindings::*;
8+
use crate::{MessageCow, Node, RclReturnCode, RclrsError, ToResult, WaitSet, Waitable};
129

13-
use parking_lot::{Mutex, MutexGuard};
10+
use parking_lot::Mutex;
1411
use rosidl_runtime_rs::Message;
1512

1613
// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
1714
// they are running in. Therefore, this type can be safely sent to another thread.
1815
unsafe impl Send for rcl_client_t {}
1916

20-
/// Internal struct used by clients.
21-
pub struct ClientHandle {
17+
type RequestValue<Response> = Box<dyn FnOnce(Response) + 'static + Send>;
18+
19+
type RequestId = i64;
20+
21+
/// Main class responsible for sending requests to a ROS service.
22+
pub struct Client<T>
23+
where
24+
T: rosidl_runtime_rs::Service,
25+
{
2226
rcl_client_mtx: Mutex<rcl_client_t>,
2327
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
28+
requests: Mutex<HashMap<RequestId, RequestValue<T::Response>>>,
29+
futures: Arc<Mutex<HashMap<RequestId, oneshot::Sender<T::Response>>>>,
2430
}
2531

26-
impl ClientHandle {
27-
pub(crate) fn lock(&self) -> MutexGuard<rcl_client_t> {
28-
self.rcl_client_mtx.lock()
29-
}
30-
}
31-
32-
impl Drop for ClientHandle {
32+
impl<T> Drop for Client<T>
33+
where
34+
T: rosidl_runtime_rs::Service,
35+
{
3336
fn drop(&mut self) {
34-
let handle = self.rcl_client_mtx.get_mut();
35-
let rcl_node_mtx = &mut *self.rcl_node_mtx.lock();
37+
let rcl_client = self.rcl_client_mtx.get_mut();
38+
let rcl_node = &mut *self.rcl_node_mtx.lock();
3639
// SAFETY: No preconditions for this function
3740
unsafe {
38-
rcl_client_fini(handle, rcl_node_mtx);
41+
rcl_client_fini(rcl_client, rcl_node);
3942
}
4043
}
4144
}
4245

43-
impl From<Canceled> for RclrsError {
44-
fn from(_: Canceled) -> Self {
45-
RclrsError::RclError {
46-
code: RclReturnCode::Error,
47-
msg: None,
48-
}
49-
}
50-
}
51-
52-
/// Trait to be implemented by concrete Client structs.
53-
///
54-
/// See [`Client<T>`] for an example.
55-
pub trait ClientBase: Send + Sync {
56-
/// Internal function to get a reference to the `rcl` handle.
57-
fn handle(&self) -> &ClientHandle;
58-
/// Tries to take a new response and run the callback or future with it.
59-
fn execute(&self) -> Result<(), RclrsError>;
60-
}
61-
62-
type RequestValue<Response> = Box<dyn FnOnce(Response) + 'static + Send>;
63-
64-
type RequestId = i64;
65-
66-
/// Main class responsible for sending requests to a ROS service.
67-
pub struct Client<T>
46+
impl<T> Waitable for Client<T>
6847
where
6948
T: rosidl_runtime_rs::Service,
7049
{
71-
pub(crate) handle: Arc<ClientHandle>,
72-
requests: Mutex<HashMap<RequestId, RequestValue<T::Response>>>,
73-
futures: Arc<Mutex<HashMap<RequestId, oneshot::Sender<T::Response>>>>,
50+
unsafe fn add_to_wait_set(self: Arc<Self>, wait_set: &mut WaitSet) -> Result<(), RclrsError> {
51+
// SAFETY: I'm not sure if it's required, but the client pointer will remain valid
52+
// for as long as the wait set exists, because it's stored in self.clients.
53+
// Passing in a null pointer for the third argument is explicitly allowed.
54+
rcl_wait_set_add_client(
55+
&mut wait_set.rcl_wait_set,
56+
&*self.rcl_client_mtx.lock(),
57+
std::ptr::null_mut(),
58+
)
59+
.ok()?;
60+
wait_set.clients.push(self);
61+
Ok(())
62+
}
63+
64+
fn execute(&self) -> Result<(), RclrsError> {
65+
let (res, req_id) = match self.take_response() {
66+
Ok((res, req_id)) => (res, req_id),
67+
Err(RclrsError::RclError {
68+
code: RclReturnCode::ClientTakeFailed,
69+
..
70+
}) => {
71+
// Spurious wakeup – this may happen even when a waitset indicated that this
72+
// client was ready, so it shouldn't be an error.
73+
return Ok(());
74+
}
75+
Err(e) => return Err(e),
76+
};
77+
let requests = &mut *self.requests.lock();
78+
let futures = &mut *self.futures.lock();
79+
if let Some(callback) = requests.remove(&req_id.sequence_number) {
80+
callback(res);
81+
} else if let Some(future) = futures.remove(&req_id.sequence_number) {
82+
let _ = future.send(res);
83+
}
84+
Ok(())
85+
}
7486
}
7587

7688
impl<T> Client<T>
@@ -110,13 +122,9 @@ where
110122
.ok()?;
111123
}
112124

113-
let handle = Arc::new(ClientHandle {
114-
rcl_client_mtx: Mutex::new(rcl_client),
115-
rcl_node_mtx: node.rcl_node_mtx.clone(),
116-
});
117-
118125
Ok(Self {
119-
handle,
126+
rcl_client_mtx: Mutex::new(rcl_client),
127+
rcl_node_mtx: Arc::clone(&node.rcl_node_mtx),
120128
requests: Mutex::new(HashMap::new()),
121129
futures: Arc::new(Mutex::new(
122130
HashMap::<RequestId, oneshot::Sender<T::Response>>::new(),
@@ -149,7 +157,7 @@ where
149157
unsafe {
150158
// SAFETY: The request type is guaranteed to match the client type by the type system.
151159
rcl_send_request(
152-
&*self.handle.lock() as *const _,
160+
&*self.rcl_client_mtx.lock() as *const _,
153161
rmw_message.as_ref() as *const <T::Request as Message>::RmwMsg as *mut _,
154162
&mut sequence_number,
155163
)
@@ -184,7 +192,7 @@ where
184192
unsafe {
185193
// SAFETY: The request type is guaranteed to match the client type by the type system.
186194
rcl_send_request(
187-
&*self.handle.lock() as *const _,
195+
&*self.rcl_client_mtx.lock() as *const _,
188196
rmw_message.as_ref() as *const <T::Request as Message>::RmwMsg as *mut _,
189197
&mut sequence_number,
190198
)
@@ -228,11 +236,11 @@ where
228236
type RmwMsg<T> =
229237
<<T as rosidl_runtime_rs::Service>::Response as rosidl_runtime_rs::Message>::RmwMsg;
230238
let mut response_out = RmwMsg::<T>::default();
231-
let handle = &*self.handle.lock();
239+
let rcl_client = &*self.rcl_client_mtx.lock();
232240
unsafe {
233241
// SAFETY: The three pointers are valid/initialized
234242
rcl_take_response(
235-
handle,
243+
rcl_client,
236244
&mut request_id_out,
237245
&mut response_out as *mut RmwMsg<T> as *mut _,
238246
)
@@ -241,35 +249,3 @@ where
241249
Ok((T::Response::from_rmw_message(response_out), request_id_out))
242250
}
243251
}
244-
245-
impl<T> ClientBase for Client<T>
246-
where
247-
T: rosidl_runtime_rs::Service,
248-
{
249-
fn handle(&self) -> &ClientHandle {
250-
&self.handle
251-
}
252-
253-
fn execute(&self) -> Result<(), RclrsError> {
254-
let (res, req_id) = match self.take_response() {
255-
Ok((res, req_id)) => (res, req_id),
256-
Err(RclrsError::RclError {
257-
code: RclReturnCode::ClientTakeFailed,
258-
..
259-
}) => {
260-
// Spurious wakeup – this may happen even when a waitset indicated that this
261-
// client was ready, so it shouldn't be an error.
262-
return Ok(());
263-
}
264-
Err(e) => return Err(e),
265-
};
266-
let requests = &mut *self.requests.lock();
267-
let futures = &mut *self.futures.lock();
268-
if let Some(callback) = requests.remove(&req_id.sequence_number) {
269-
callback(res);
270-
} else if let Some(future) = futures.remove(&req_id.sequence_number) {
271-
let _ = future.send(res);
272-
}
273-
Ok(())
274-
}
275-
}

0 commit comments

Comments
 (0)