2
2
3
3
// UV2
4
4
enum uv_operation {
5
- op_hw ( )
5
+ op_async_init ( [ u8 ] )
6
6
}
7
7
8
+ type uv_async = {
9
+ id : [ u8 ] ,
10
+ loop : uv_loop
11
+ } ;
12
+
8
13
enum uv_msg {
9
14
// requests from library users
10
15
msg_run( comm:: chan < bool > ) ,
11
16
msg_run_in_bg( ) ,
12
17
msg_loop_delete( ) ,
13
- msg_async_init( [ u8 ] , fn ~( ) ) ,
18
+ msg_async_init( fn ~ ( uv_async ) , fn ~( uv_async ) ) ,
14
19
msg_async_send( [ u8 ] ) ,
15
- msg_hw( ) ,
16
20
17
21
// dispatches from libuv
18
- uv_hw( )
22
+ uv_async_init( [ u8 ] , * ctypes:: void ) ,
23
+ uv_async_send( [ u8 ] )
19
24
}
20
25
21
26
type uv_loop_data = {
@@ -25,10 +30,6 @@ type uv_loop_data = {
25
30
26
31
type uv_loop = comm:: chan < uv_msg > ;
27
32
28
- enum uv_handle {
29
- handle( [ u8 ] , * ctypes:: void )
30
- }
31
-
32
33
#[ nolink]
33
34
native mod rustrt {
34
35
fn rust_uvtmp_create_thread ( ) -> thread ;
@@ -66,10 +67,14 @@ native mod rustrt {
66
67
fn rust_uvtmp_uv_bind_op_cb ( loop : * ctypes:: void , cb : * u8 ) -> * ctypes:: void ;
67
68
fn rust_uvtmp_uv_run ( loop_handle : * ctypes:: void ) ;
68
69
fn rust_uvtmp_uv_async_send ( handle : * ctypes:: void ) ;
70
+ fn rust_uvtmp_uv_async_init (
71
+ loop_handle : * ctypes:: void ,
72
+ cb : * u8 ,
73
+ id : * u8 ) -> * ctypes:: void ;
69
74
}
70
75
71
76
mod uv {
72
- export loop_new, run, run_in_bg, hw ;
77
+ export loop_new, run, run_in_bg, async_init , async_send ;
73
78
74
79
// public functions
75
80
fn loop_new ( ) -> uv_loop unsafe {
@@ -78,7 +83,9 @@ mod uv {
78
83
let ret_recv_chan: comm:: chan < uv_loop > =
79
84
comm:: chan ( ret_recv_port) ;
80
85
81
- task:: spawn_sched ( 3 u) { ||
86
+ let num_threads = 4 u; // would be cool to tie this to
87
+ // the number of logical procs
88
+ task:: spawn_sched ( num_threads) { ||
82
89
// our beloved uv_loop_t ptr
83
90
let loop_handle = rustrt::
84
91
rust_uvtmp_uv_loop_new ( ) ;
@@ -115,12 +122,17 @@ mod uv {
115
122
// to libuv, this will be
116
123
// in the process_operation
117
124
// crust fn
118
- let async_handle = rustrt:: rust_uvtmp_uv_bind_op_cb (
125
+ let op_handle = rustrt:: rust_uvtmp_uv_bind_op_cb (
119
126
loop_handle,
120
127
process_operation) ;
121
128
122
129
// all state goes here
123
- let handles: map:: map < [ u8 ] , uv_handle > =
130
+ let handles: map:: map < [ u8 ] , * ctypes:: void > =
131
+ map:: new_bytes_hash ( ) ;
132
+ let async_cbs: map:: map < [ u8 ] , fn ~( uv_async ) > =
133
+ map:: new_bytes_hash ( ) ;
134
+ let async_init_after_cbs: map:: map < [ u8 ] ,
135
+ fn ~( uv_async ) > =
124
136
map:: new_bytes_hash ( ) ;
125
137
126
138
// the main loop that this task blocks on.
@@ -143,36 +155,51 @@ mod uv {
143
155
comm:: send ( end_chan, true ) ;
144
156
} ;
145
157
}
158
+
146
159
msg_run_in_bg {
147
160
task : : spawn_sched ( 1 u) { ||
148
161
// this call blocks
149
162
rustrt:: rust_uvtmp_uv_run ( loop_handle) ;
150
163
} ;
151
164
}
152
- msg_hw ( ) {
153
- comm:: send ( operation_chan, op_hw) ;
154
- io:: println ( "CALLING ASYNC_SEND FOR HW" ) ;
155
- rustrt:: rust_uvtmp_uv_async_send ( async_handle) ;
156
- }
157
- uv_hw ( ) {
158
- io:: println ( "HELLO WORLD!!!" ) ;
159
- }
160
-
161
- ////// STUBS ///////
162
- msg_loop_delete {
163
- // delete the event loop's c ptr
164
- // this will of course stop any
165
- // further processing
166
- }
167
- msg_async_init ( id, callback) {
165
+
166
+ msg_async_init ( callback, after_cb) {
168
167
// create a new async handle
169
168
// with the id as the handle's
170
169
// data and save the callback for
171
170
// invocation on msg_async_send
171
+ let id = gen_handle_id ( ) ;
172
+ async_cbs. insert ( id, callback) ;
173
+ async_init_after_cbs. insert ( id, after_cb) ;
174
+ let op = op_async_init ( id) ;
175
+ comm:: send ( operation_chan, op) ;
176
+ rustrt:: rust_uvtmp_uv_async_send ( op_handle) ;
177
+ io:: println ( "MSG_ASYNC_INIT" ) ;
172
178
}
179
+ uv_async_init ( id, async_handle) {
180
+ // libuv created a handle, which is
181
+ // passed back to us. save it and
182
+ // then invoke the supplied callback
183
+ // for after completion
184
+ handles. insert ( id, async_handle) ;
185
+ let after_cb = async_init_after_cbs. get ( id) ;
186
+ async_init_after_cbs. remove ( id) ;
187
+ task:: spawn { ||
188
+ let async: uv_async = {
189
+ id: id,
190
+ loop : rust_loop_chan
191
+ } ;
192
+ after_cb ( async ) ;
193
+ } ;
194
+ }
195
+
173
196
msg_async_send( id) {
174
- // get the callback matching the
175
- // supplied id and invoke it
197
+ let async_handle = handles. get ( id) ;
198
+ rustrt:: rust_uvtmp_uv_async_send( async_handle) ;
199
+ }
200
+ uv_async_send ( id) {
201
+ let async_cb = async_cbs. get ( id) ;
202
+ async_cb ( { id: id, loop : rust_loop_chan} ) ;
176
203
}
177
204
178
205
_ { fail "unknown form of uv_msg received" ; }
@@ -193,37 +220,88 @@ mod uv {
193
220
comm:: send ( loop , msg_run_in_bg) ;
194
221
}
195
222
196
- fn hw ( loop : uv_loop ) {
197
- comm:: send ( loop , msg_hw) ;
223
+ fn async_init (
224
+ loop : uv_loop ,
225
+ async_cb : fn ~( uv_async ) ,
226
+ after_cb : fn ~( uv_async ) ) {
227
+ let msg = msg_async_init ( async_cb, after_cb) ;
228
+ comm:: send ( loop , msg) ;
229
+ }
230
+
231
+ fn async_send ( async : uv_async ) {
232
+ comm:: send ( async . loop , msg_async_send ( async . id) ) ;
198
233
}
199
234
200
235
// internal functions
236
+ fn gen_handle_id ( ) -> [ u8 ] {
237
+ ret rand:: mk_rng ( ) . gen_bytes ( 16 u) ;
238
+ }
239
+ fn get_handle_id_from ( buf : * u8 ) -> [ u8 ] unsafe {
240
+ ret vec:: unsafe:: from_buf ( buf, 16 u) ;
241
+ }
242
+
243
+ fn get_loop_chan_from ( data : * uv_loop_data )
244
+ -> comm:: chan < uv_msg > unsafe {
245
+ ret ( * data) . rust_loop_chan ;
246
+ }
201
247
202
248
// crust
203
- crust fn process_operation ( data : * uv_loop_data ) unsafe {
249
+ crust fn process_operation (
250
+ loop : * ctypes:: void ,
251
+ data : * uv_loop_data ) unsafe {
204
252
io:: println ( "IN PROCESS_OPERATION" ) ;
205
253
let op_port = ( * data) . operation_port ;
206
- let loop_chan = ( * data) . rust_loop_chan ;
254
+ let loop_chan = get_loop_chan_from ( data) ;
207
255
let op_pending = comm:: peek ( op_port) ;
208
256
while ( op_pending) {
209
257
io:: println ( "OPERATION PENDING!" ) ;
210
258
alt comm:: recv ( op_port) {
211
- op_hw ( ) {
212
- io:: println ( "GOT OP_HW IN CRUST" ) ;
213
- comm:: send ( loop_chan, uv_hw) ;
259
+ op_async_init ( id) {
260
+ io:: println ( "OP_ASYNC_INIT" ) ;
261
+ let id_ptr = vec:: unsafe:: to_ptr ( id) ;
262
+ let async_handle = rustrt:: rust_uvtmp_uv_async_init (
263
+ loop ,
264
+ process_async_send,
265
+ id_ptr) ;
266
+ comm:: send ( loop_chan, uv_async_init (
267
+ id,
268
+ async_handle) ) ;
214
269
}
270
+
215
271
_ { fail "unknown form of uv_operation received" ; }
216
272
}
217
273
op_pending = comm:: peek ( op_port) ;
218
274
}
219
275
io:: println ( "NO MORE OPERATIONS PENDING!" ) ;
220
276
}
277
+
278
+ crust fn process_async_send ( id_buf: * u8, data: * uv_loop_data)
279
+ unsafe {
280
+ let handle_id = get_handle_id_from ( id_buf) ;
281
+ let loop_chan = get_loop_chan_from ( data) ;
282
+ comm:: send ( loop_chan, uv_async_send ( handle_id) ) ;
283
+ }
284
+
285
+
286
+ }
287
+
288
+ #[ test]
289
+ fn test_uvtmp_uv_new_loop_no_handles ( ) {
290
+ let test_loop = uv:: loop_new ( ) ;
291
+ uv:: run ( test_loop) ; // this should return immediately
292
+ // since there aren't any handles..
221
293
}
222
294
223
295
#[ test]
224
- fn uvtmp_uv_test_hello_world ( ) {
296
+ fn test_uvtmp_uv_simple_async ( ) {
225
297
let test_loop = uv:: loop_new ( ) ;
226
- uv:: hw ( test_loop) ;
298
+ let cb: fn ~( uv_async ) = fn ~( h: uv_async) {
299
+ io:: println ( "HELLO FROM ASYNC CALLBACK!" ) ;
300
+ } ;
301
+ uv:: async_init ( test_loop, cb) { |new_async|
302
+ io:: println ( "NEW_ASYNC CREATED!" ) ;
303
+ uv:: async_send ( new_async) ;
304
+ } ;
227
305
uv:: run ( test_loop) ;
228
306
}
229
307
0 commit comments