1
- use crate :: node:: client:: oneshot:: Canceled ;
2
1
use futures:: channel:: oneshot;
3
2
use std:: boxed:: Box ;
4
3
use std:: collections:: HashMap ;
5
4
use std:: ffi:: CString ;
6
5
use std:: sync:: Arc ;
7
6
8
- use crate :: error:: { RclReturnCode , ToResult } ;
9
- use crate :: MessageCow ;
10
- use crate :: Node ;
11
- use crate :: { rcl_bindings:: * , RclrsError } ;
7
+ use crate :: rcl_bindings:: * ;
8
+ use crate :: { MessageCow , Node , RclReturnCode , RclrsError , ToResult , WaitSet , Waitable } ;
12
9
13
- use parking_lot:: { Mutex , MutexGuard } ;
10
+ use parking_lot:: Mutex ;
14
11
use rosidl_runtime_rs:: Message ;
15
12
16
13
// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
17
14
// they are running in. Therefore, this type can be safely sent to another thread.
18
15
unsafe impl Send for rcl_client_t { }
19
16
20
- /// Internal struct used by clients.
21
- pub struct ClientHandle {
17
+ type RequestValue < Response > = Box < dyn FnOnce ( Response ) + ' static + Send > ;
18
+
19
+ type RequestId = i64 ;
20
+
21
+ /// Main class responsible for sending requests to a ROS service.
22
+ pub struct Client < T >
23
+ where
24
+ T : rosidl_runtime_rs:: Service ,
25
+ {
22
26
rcl_client_mtx : Mutex < rcl_client_t > ,
23
27
rcl_node_mtx : Arc < Mutex < rcl_node_t > > ,
28
+ requests : Mutex < HashMap < RequestId , RequestValue < T :: Response > > > ,
29
+ futures : Arc < Mutex < HashMap < RequestId , oneshot:: Sender < T :: Response > > > > ,
24
30
}
25
31
26
- impl ClientHandle {
27
- pub ( crate ) fn lock ( & self ) -> MutexGuard < rcl_client_t > {
28
- self . rcl_client_mtx . lock ( )
29
- }
30
- }
31
-
32
- impl Drop for ClientHandle {
32
+ impl < T > Drop for Client < T >
33
+ where
34
+ T : rosidl_runtime_rs:: Service ,
35
+ {
33
36
fn drop ( & mut self ) {
34
- let handle = self . rcl_client_mtx . get_mut ( ) ;
35
- let rcl_node_mtx = & mut * self . rcl_node_mtx . lock ( ) ;
37
+ let rcl_client = self . rcl_client_mtx . get_mut ( ) ;
38
+ let rcl_node = & mut * self . rcl_node_mtx . lock ( ) ;
36
39
// SAFETY: No preconditions for this function
37
40
unsafe {
38
- rcl_client_fini ( handle , rcl_node_mtx ) ;
41
+ rcl_client_fini ( rcl_client , rcl_node ) ;
39
42
}
40
43
}
41
44
}
42
45
43
- impl From < Canceled > for RclrsError {
44
- fn from ( _: Canceled ) -> Self {
45
- RclrsError :: RclError {
46
- code : RclReturnCode :: Error ,
47
- msg : None ,
48
- }
49
- }
50
- }
51
-
52
- /// Trait to be implemented by concrete Client structs.
53
- ///
54
- /// See [`Client<T>`] for an example.
55
- pub trait ClientBase : Send + Sync {
56
- /// Internal function to get a reference to the `rcl` handle.
57
- fn handle ( & self ) -> & ClientHandle ;
58
- /// Tries to take a new response and run the callback or future with it.
59
- fn execute ( & self ) -> Result < ( ) , RclrsError > ;
60
- }
61
-
62
- type RequestValue < Response > = Box < dyn FnOnce ( Response ) + ' static + Send > ;
63
-
64
- type RequestId = i64 ;
65
-
66
- /// Main class responsible for sending requests to a ROS service.
67
- pub struct Client < T >
46
+ impl < T > Waitable for Client < T >
68
47
where
69
48
T : rosidl_runtime_rs:: Service ,
70
49
{
71
- pub ( crate ) handle : Arc < ClientHandle > ,
72
- requests : Mutex < HashMap < RequestId , RequestValue < T :: Response > > > ,
73
- futures : Arc < Mutex < HashMap < RequestId , oneshot:: Sender < T :: Response > > > > ,
50
+ unsafe fn add_to_wait_set ( self : Arc < Self > , wait_set : & mut WaitSet ) -> Result < ( ) , RclrsError > {
51
+ // SAFETY: I'm not sure if it's required, but the client pointer will remain valid
52
+ // for as long as the wait set exists, because it's stored in self.clients.
53
+ // Passing in a null pointer for the third argument is explicitly allowed.
54
+ rcl_wait_set_add_client (
55
+ & mut wait_set. rcl_wait_set ,
56
+ & * self . rcl_client_mtx . lock ( ) ,
57
+ std:: ptr:: null_mut ( ) ,
58
+ )
59
+ . ok ( ) ?;
60
+ wait_set. clients . push ( self ) ;
61
+ Ok ( ( ) )
62
+ }
63
+
64
+ fn execute ( & self ) -> Result < ( ) , RclrsError > {
65
+ let ( res, req_id) = match self . take_response ( ) {
66
+ Ok ( ( res, req_id) ) => ( res, req_id) ,
67
+ Err ( RclrsError :: RclError {
68
+ code : RclReturnCode :: ClientTakeFailed ,
69
+ ..
70
+ } ) => {
71
+ // Spurious wakeup – this may happen even when a waitset indicated that this
72
+ // client was ready, so it shouldn't be an error.
73
+ return Ok ( ( ) ) ;
74
+ }
75
+ Err ( e) => return Err ( e) ,
76
+ } ;
77
+ let requests = & mut * self . requests . lock ( ) ;
78
+ let futures = & mut * self . futures . lock ( ) ;
79
+ if let Some ( callback) = requests. remove ( & req_id. sequence_number ) {
80
+ callback ( res) ;
81
+ } else if let Some ( future) = futures. remove ( & req_id. sequence_number ) {
82
+ let _ = future. send ( res) ;
83
+ }
84
+ Ok ( ( ) )
85
+ }
74
86
}
75
87
76
88
impl < T > Client < T >
@@ -110,13 +122,9 @@ where
110
122
. ok ( ) ?;
111
123
}
112
124
113
- let handle = Arc :: new ( ClientHandle {
114
- rcl_client_mtx : Mutex :: new ( rcl_client) ,
115
- rcl_node_mtx : node. rcl_node_mtx . clone ( ) ,
116
- } ) ;
117
-
118
125
Ok ( Self {
119
- handle,
126
+ rcl_client_mtx : Mutex :: new ( rcl_client) ,
127
+ rcl_node_mtx : Arc :: clone ( & node. rcl_node_mtx ) ,
120
128
requests : Mutex :: new ( HashMap :: new ( ) ) ,
121
129
futures : Arc :: new ( Mutex :: new (
122
130
HashMap :: < RequestId , oneshot:: Sender < T :: Response > > :: new ( ) ,
@@ -149,7 +157,7 @@ where
149
157
unsafe {
150
158
// SAFETY: The request type is guaranteed to match the client type by the type system.
151
159
rcl_send_request (
152
- & * self . handle . lock ( ) as * const _ ,
160
+ & * self . rcl_client_mtx . lock ( ) as * const _ ,
153
161
rmw_message. as_ref ( ) as * const <T :: Request as Message >:: RmwMsg as * mut _ ,
154
162
& mut sequence_number,
155
163
)
@@ -184,7 +192,7 @@ where
184
192
unsafe {
185
193
// SAFETY: The request type is guaranteed to match the client type by the type system.
186
194
rcl_send_request (
187
- & * self . handle . lock ( ) as * const _ ,
195
+ & * self . rcl_client_mtx . lock ( ) as * const _ ,
188
196
rmw_message. as_ref ( ) as * const <T :: Request as Message >:: RmwMsg as * mut _ ,
189
197
& mut sequence_number,
190
198
)
@@ -228,11 +236,11 @@ where
228
236
type RmwMsg < T > =
229
237
<<T as rosidl_runtime_rs:: Service >:: Response as rosidl_runtime_rs:: Message >:: RmwMsg ;
230
238
let mut response_out = RmwMsg :: < T > :: default ( ) ;
231
- let handle = & * self . handle . lock ( ) ;
239
+ let rcl_client = & * self . rcl_client_mtx . lock ( ) ;
232
240
unsafe {
233
241
// SAFETY: The three pointers are valid/initialized
234
242
rcl_take_response (
235
- handle ,
243
+ rcl_client ,
236
244
& mut request_id_out,
237
245
& mut response_out as * mut RmwMsg < T > as * mut _ ,
238
246
)
@@ -241,35 +249,3 @@ where
241
249
Ok ( ( T :: Response :: from_rmw_message ( response_out) , request_id_out) )
242
250
}
243
251
}
244
-
245
- impl < T > ClientBase for Client < T >
246
- where
247
- T : rosidl_runtime_rs:: Service ,
248
- {
249
- fn handle ( & self ) -> & ClientHandle {
250
- & self . handle
251
- }
252
-
253
- fn execute ( & self ) -> Result < ( ) , RclrsError > {
254
- let ( res, req_id) = match self . take_response ( ) {
255
- Ok ( ( res, req_id) ) => ( res, req_id) ,
256
- Err ( RclrsError :: RclError {
257
- code : RclReturnCode :: ClientTakeFailed ,
258
- ..
259
- } ) => {
260
- // Spurious wakeup – this may happen even when a waitset indicated that this
261
- // client was ready, so it shouldn't be an error.
262
- return Ok ( ( ) ) ;
263
- }
264
- Err ( e) => return Err ( e) ,
265
- } ;
266
- let requests = & mut * self . requests . lock ( ) ;
267
- let futures = & mut * self . futures . lock ( ) ;
268
- if let Some ( callback) = requests. remove ( & req_id. sequence_number ) {
269
- callback ( res) ;
270
- } else if let Some ( future) = futures. remove ( & req_id. sequence_number ) {
271
- let _ = future. send ( res) ;
272
- }
273
- Ok ( ( ) )
274
- }
275
- }
0 commit comments