@@ -7,7 +7,7 @@ libuv functionality.
7
7
" ] ;
8
8
9
9
export high_level_loop, hl_loop_ext, high_level_msg;
10
- export run_high_level_loop, interact, ref_handle , unref_handle ;
10
+ export run_high_level_loop, interact, ref , unref , unref_and_close ;
11
11
12
12
import ll = uv_ll;
13
13
@@ -23,6 +23,10 @@ the C uv loop to process any pending callbacks
23
23
by the C uv loop
24
24
" ]
25
25
enum high_level_loop {
26
+ simple_task_loop( {
27
+ async_handle : * ll:: uv_async_t ,
28
+ op_chan : comm:: chan < high_level_msg >
29
+ } ) ,
26
30
single_task_loop( {
27
31
async_handle : * * ll:: uv_async_t ,
28
32
op_chan : comm:: chan < high_level_msg >
@@ -52,6 +56,9 @@ impl hl_loop_ext for high_level_loop {
52
56
monitor_task_loop ( { op_chan} ) {
53
57
ret op_chan;
54
58
}
59
+ simple_task_loop ( { async_handle, op_chan} ) {
60
+ ret op_chan;
61
+ }
55
62
}
56
63
}
57
64
}
@@ -61,8 +68,8 @@ Represents the range of interactions with a `high_level_loop`
61
68
" ]
62
69
enum high_level_msg {
63
70
interaction ( fn ~( * libc:: c_void ) ) ,
64
- auto_ref_handle ( * libc:: c_void ) ,
65
- auto_unref_handle ( * libc:: c_void , * u8 ) ,
71
+ ref_handle ( * libc:: c_void ) ,
72
+ manual_unref_handle ( * libc:: c_void , option < * u8 > ) ,
66
73
tear_down
67
74
}
68
75
@@ -96,7 +103,7 @@ unsafe fn run_high_level_loop(loop_ptr: *libc::c_void,
96
103
ll:: async_init ( loop_ptr, async_handle, high_level_wake_up_cb) ;
97
104
98
105
// initialize our loop data and store it in the loop
99
- let data: global_loop_data = default_gl_data ( {
106
+ let data: hl_loop_data = default_gl_data ( {
100
107
async_handle: async_handle,
101
108
mut active: true ,
102
109
before_msg_drain: before_msg_drain,
@@ -159,22 +166,25 @@ resource safe_handle_container<T>(handle_fields: safe_handle_fields<T>) {
159
166
#[ doc="
160
167
Needs to be encapsulated within `safe_handle`
161
168
" ]
162
- fn ref_handle < T > ( hl_loop : high_level_loop , handle : * T ) unsafe {
163
- send_high_level_msg ( hl_loop, auto_ref_handle ( handle as * libc:: c_void ) ) ;
169
+ fn ref < T > ( hl_loop : high_level_loop , handle : * T ) unsafe {
170
+ send_high_level_msg ( hl_loop, ref_handle ( handle as * libc:: c_void ) ) ;
164
171
}
165
172
#[ doc="
166
173
Needs to be encapsulated within `safe_handle`
167
174
" ]
168
- fn unref_handle < T > ( hl_loop : high_level_loop , handle : * T ,
169
- user_close_cb : * u8 ) unsafe {
170
- send_high_level_msg ( hl_loop, auto_unref_handle ( handle as * libc:: c_void ,
171
- user_close_cb) ) ;
175
+ fn unref < T > ( hl_loop : high_level_loop , handle : * T ) unsafe {
176
+ send_high_level_msg ( hl_loop, manual_unref_handle ( handle as * libc:: c_void ,
177
+ none) ) ;
178
+ }
179
+ fn unref_and_close < T > ( hl_loop : high_level_loop , handle : * T , cb : * u8 ) unsafe {
180
+ send_high_level_msg ( hl_loop, manual_unref_handle ( handle as * libc:: c_void ,
181
+ some ( cb) ) ) ;
172
182
}
173
183
174
184
// INTERNAL API
175
185
176
186
// data that lives for the lifetime of the high-evel oo
177
- enum global_loop_data {
187
+ enum hl_loop_data {
178
188
default_gl_data( {
179
189
async_handle : * ll:: uv_async_t ,
180
190
mut active : bool ,
@@ -203,6 +213,10 @@ unsafe fn send_high_level_msg(hl_loop: high_level_loop,
203
213
log ( debug, "GLOBAL ASYNC handle == 0" ) ;
204
214
}
205
215
}
216
+ simple_task_loop ( { async_handle, op_chan} ) {
217
+ log ( debug, "simple async handle != 0, waking up loop.." ) ;
218
+ ll:: async_send ( ( async_handle) ) ;
219
+ }
206
220
_ { }
207
221
}
208
222
}
@@ -218,7 +232,7 @@ crust fn high_level_wake_up_cb(async_handle: *ll::uv_async_t,
218
232
log( debug, #fmt ( "high_level_wake_up_cb crust.. handle: %? status: %?" ,
219
233
async_handle, status) ) ;
220
234
let loop_ptr = ll:: get_loop_for_uv_handle ( async_handle) ;
221
- let data = ll:: get_data_for_uv_handle ( async_handle) as * global_loop_data ;
235
+ let data = ll:: get_data_for_uv_handle ( async_handle) as * hl_loop_data ;
222
236
// we check to see if the loop is "active" (the loop is set to
223
237
// active = false the first time we realize we need to 'tear down',
224
238
// set subsequent calls to the global async handle may be triggered
@@ -245,11 +259,11 @@ crust fn high_level_wake_up_cb(async_handle: *ll::uv_async_t,
245
259
cb ( loop_ptr) ;
246
260
log ( debug, "after calling cb" ) ;
247
261
}
248
- auto_ref_handle ( handle) {
262
+ ref_handle ( handle) {
249
263
high_level_ref ( data, handle) ;
250
264
}
251
- auto_unref_handle ( handle, user_close_cb) {
252
- high_level_unref ( data, handle, false , user_close_cb) ;
265
+ manual_unref_handle ( handle, user_close_cb) {
266
+ high_level_unref ( data, handle, true , user_close_cb) ;
253
267
}
254
268
tear_down {
255
269
log( debug, "incoming hl_msg: got tear_down" ) ;
@@ -258,6 +272,9 @@ crust fn high_level_wake_up_cb(async_handle: *ll::uv_async_t,
258
272
continue = comm:: peek ( msg_po) ;
259
273
}
260
274
}
275
+ else {
276
+ log ( debug, "in hl wake_cb, no pending messages" ) ;
277
+ }
261
278
}
262
279
log ( debug, #fmt ( "after on_wake, continue? %?" , continue ) ) ;
263
280
if !do_msg_drain {
@@ -269,50 +286,80 @@ crust fn high_level_wake_up_cb(async_handle: *ll::uv_async_t,
269
286
crust fn tear_down_close_cb ( handle: * ll:: uv_async_t) unsafe {
270
287
log ( debug, #fmt ( "tear_down_close_cb called, closing handle at %?" ,
271
288
handle) ) ;
272
- let data = ll:: get_data_for_uv_handle ( handle) as * global_loop_data ;
273
- if vec:: len ( ( * data) . refd_handles ) > 0 {
289
+ let data = ll:: get_data_for_uv_handle ( handle) as * hl_loop_data ;
290
+ if vec:: len ( ( * data) . refd_handles ) > 0 u {
274
291
fail "Didn't unref all high-level handles" ;
275
292
}
276
293
}
277
294
278
- fn high_level_tear_down ( data : * global_loop_data ) unsafe {
295
+ fn high_level_tear_down ( data : * hl_loop_data ) unsafe {
279
296
log ( debug, "high_level_tear_down() called, close async_handle" ) ;
280
297
// call user-suppled before_tear_down cb
281
298
let async_handle = ( * data) . async_handle ;
282
299
( * data) . before_tear_down ( async_handle) ;
283
300
ll:: close ( async_handle as * libc:: c_void , tear_down_close_cb) ;
284
301
}
285
302
286
- unsafe fn high_level_ref ( data : * global_loop_data , handle : * libc:: c_void ) {
287
- log ( debug, "incoming hl_msg: got auto_ref_handle " ) ;
303
+ unsafe fn high_level_ref ( data : * hl_loop_data , handle : * libc:: c_void ) {
304
+ log ( debug, "incoming hl_msg: got ..ref_handle " ) ;
288
305
let mut refd_handles = ( * data) . refd_handles ;
306
+ let mut unrefd_handles = ( * data) . unrefd_handles ;
289
307
let handle_already_refd = refd_handles. contains ( handle) ;
290
308
if handle_already_refd {
291
309
fail "attempt to do a high-level ref an already ref'd handle" ;
292
310
}
311
+ let handle_already_unrefd = unrefd_handles. contains ( handle) ;
312
+ // if we are ref'ing a handle (by ptr) that was already unref'd,
313
+ // probably
314
+ if handle_already_unrefd {
315
+ let last_idx = vec:: len ( unrefd_handles) - 1 u;
316
+ let handle_idx = vec:: position_elem ( unrefd_handles, handle) ;
317
+ alt handle_idx {
318
+ none {
319
+ fail "trying to remove handle that isn't in unrefd_handles" ;
320
+ }
321
+ some ( idx) {
322
+ unrefd_handles[ idx] <-> unrefd_handles[ last_idx] ;
323
+ vec:: pop ( unrefd_handles) ;
324
+ }
325
+ }
326
+ ( * data) . unrefd_handles = unrefd_handles;
327
+ }
293
328
refd_handles += [ handle] ;
294
329
( * data) . refd_handles = refd_handles;
295
330
}
296
331
297
- unsafe fn high_level_unref ( data : * global_loop_data , handle : * libc:: c_void ,
298
- manual_unref : bool , user_close_cb : * u8 ) {
332
+ unsafe fn high_level_unref ( data : * hl_loop_data , handle : * libc:: c_void ,
333
+ manual_unref : bool , user_close_cb : option < * u8 > ) {
299
334
log ( debug, "incoming hl_msg: got auto_unref_handle" ) ;
300
335
let mut refd_handles = ( * data) . refd_handles ;
301
336
let mut unrefd_handles = ( * data) . unrefd_handles ;
337
+ log ( debug, #fmt ( "refs: %?, unrefs %? handle %?" , vec:: len ( refd_handles) ,
338
+ vec:: len ( unrefd_handles) , handle) ) ;
302
339
let handle_already_refd = refd_handles. contains ( handle) ;
303
340
if !handle_already_refd {
304
341
fail "attempting to high-level unref an untracked handle" ;
305
342
}
306
343
let double_unref = unrefd_handles. contains ( handle) ;
307
344
if double_unref {
345
+ log ( debug, "double unref encountered" ) ;
308
346
if manual_unref {
309
347
// will allow a user to manual unref, but only signal
310
348
// a fail when a double-unref is caused by a user
311
349
fail "attempting to high-level unref an unrefd handle" ;
312
350
}
351
+ else {
352
+ log ( debug, "not failing..." ) ;
353
+ }
313
354
}
314
355
else {
315
- ll:: close ( handle, user_close_cb) ;
356
+ log ( debug, "attempting to unref handle" ) ;
357
+ alt user_close_cb {
358
+ some( cb) {
359
+ ll:: close ( handle, cb) ;
360
+ }
361
+ none { }
362
+ }
316
363
let last_idx = vec:: len ( refd_handles) - 1 u;
317
364
let handle_idx = vec:: position_elem ( refd_handles, handle) ;
318
365
alt handle_idx {
@@ -337,3 +384,128 @@ unsafe fn high_level_unref(data: *global_loop_data, handle: *libc::c_void,
337
384
}
338
385
339
386
}
387
+ #[ cfg( test) ]
388
+ mod test {
389
+ crust fn async_close_cb ( handle : * ll:: uv_async_t ) unsafe {
390
+ log ( debug, #fmt ( "async_close_cb handle %?" , handle) ) ;
391
+ let exit_ch = ( * ( ll:: get_data_for_uv_handle ( handle)
392
+ as * ah_data ) ) . exit_ch ;
393
+ comm:: send ( exit_ch, ( ) ) ;
394
+ }
395
+ crust fn async_handle_cb ( handle : * ll:: uv_async_t , status : libc:: c_int )
396
+ unsafe {
397
+ log ( debug, #fmt ( "async_handle_cb handle %? status %?" , handle, status) ) ;
398
+ let hl_loop = ( * ( ll:: get_data_for_uv_handle ( handle)
399
+ as * ah_data ) ) . hl_loop ;
400
+ unref_and_close ( hl_loop, handle, async_close_cb) ;
401
+ }
402
+ type ah_data = {
403
+ hl_loop : high_level_loop ,
404
+ exit_ch : comm:: chan < ( ) >
405
+ } ;
406
+ fn impl_uv_hl_async ( hl_loop : high_level_loop ) unsafe {
407
+ let async_handle = ll:: async_t ( ) ;
408
+ let ah_ptr = ptr:: addr_of ( async_handle) ;
409
+ let exit_po = comm:: port :: < ( ) > ( ) ;
410
+ let exit_ch = comm:: chan ( exit_po) ;
411
+ let ah_data = {
412
+ hl_loop: hl_loop,
413
+ exit_ch: exit_ch
414
+ } ;
415
+ let ah_data_ptr = ptr:: addr_of ( ah_data) ;
416
+ interact ( hl_loop) { |loop_ptr|
417
+ ref ( hl_loop, ah_ptr) ;
418
+ ll:: async_init ( loop_ptr, ah_ptr, async_handle_cb) ;
419
+ ll:: set_data_for_uv_handle ( ah_ptr, ah_data_ptr as * libc:: c_void ) ;
420
+ ll:: async_send ( ah_ptr) ;
421
+ } ;
422
+ comm:: recv( exit_po) ;
423
+ }
424
+
425
+ // this fn documents the bear minimum neccesary to roll your own
426
+ // high_level_loop
427
+ unsafe fn spawn_test_loop ( exit_ch : comm:: chan < ( ) > ) -> high_level_loop {
428
+ let hl_loop_port = comm:: port :: < high_level_loop > ( ) ;
429
+ let hl_loop_ch = comm:: chan ( hl_loop_port) ;
430
+ task:: spawn_sched ( task:: manual_threads ( 1 u) ) { ||
431
+ let loop_ptr = ll:: loop_new ( ) ;
432
+ let msg_po = comm:: port :: < high_level_msg > ( ) ;
433
+ let msg_ch = comm:: chan ( msg_po) ;
434
+ run_high_level_loop (
435
+ loop_ptr,
436
+ msg_po,
437
+ // before_run
438
+ { |async_handle|
439
+ log ( debug, #fmt ( "hltest before_run: async_handle %?" ,
440
+ async_handle) ) ;
441
+ // do an async_send with it
442
+ ll:: async_send ( async_handle) ;
443
+ comm:: send ( hl_loop_ch, simple_task_loop ( {
444
+ async_handle: async_handle,
445
+ op_chan: msg_ch
446
+ } ) ) ;
447
+ } ,
448
+ // before_msg_drain
449
+ { |async_handle|
450
+ log ( debug, #fmt ( "hltest before_msg_drain: async_handle %?" ,
451
+ async_handle) ) ;
452
+ true
453
+ } ,
454
+ // before_tear_down
455
+ { |async_handle|
456
+ log ( debug, #fmt ( "hl test_loop b4_tear_down: async %?" ,
457
+ async_handle) ) ;
458
+ } ) ;
459
+ ll:: loop_delete ( loop_ptr) ;
460
+ comm:: send ( exit_ch, ( ) ) ;
461
+ } ;
462
+ ret comm:: recv ( hl_loop_port) ;
463
+ }
464
+
465
+ crust fn lifetime_handle_close ( handle: * libc:: c_void) unsafe {
466
+ log ( debug, #fmt ( "lifetime_handle_close ptr %?" , handle) ) ;
467
+ }
468
+
469
+ crust fn lifetime_async_callback ( handle : * libc:: c_void ,
470
+ status : libc:: c_int ) {
471
+ log ( debug, #fmt ( "lifetime_handle_close ptr %? status %?" ,
472
+ handle, status) ) ;
473
+ }
474
+
475
+ #[ test]
476
+ #[ ignore( cfg( target_os = "freebsd" ) ) ]
477
+ fn test_uv_hl_async ( ) unsafe {
478
+ let exit_po = comm:: port :: < ( ) > ( ) ;
479
+ let exit_ch = comm:: chan ( exit_po) ;
480
+ let hl_loop = spawn_test_loop ( exit_ch) ;
481
+
482
+ // using this handle to manage the lifetime of the high_level_loop,
483
+ // as it will exit the first time one of the impl_uv_hl_async() is
484
+ // cleaned up with no one ref'd handles on the loop (Which can happen
485
+ // under race-condition type situations.. this ensures that the loop
486
+ // lives until, at least, all of the impl_uv_hl_async() runs have been
487
+ // called, at least.
488
+ let lifetime_handle = ll:: async_t ( ) ;
489
+ let lifetime_handle_ptr = ptr:: addr_of ( lifetime_handle) ;
490
+ interact ( hl_loop) { |loop_ptr|
491
+ ref ( hl_loop, lifetime_handle_ptr) ;
492
+ ll:: async_init ( loop_ptr, lifetime_handle_ptr,
493
+ lifetime_async_callback) ;
494
+ } ;
495
+
496
+ iter:: repeat ( 7 u) { ||
497
+ task:: spawn_sched ( task:: manual_threads ( 1 u) , { ||
498
+ impl_uv_hl_async ( hl_loop) ;
499
+ } ) ;
500
+ } ;
501
+ impl_uv_hl_async ( hl_loop) ;
502
+ impl_uv_hl_async ( hl_loop) ;
503
+ impl_uv_hl_async ( hl_loop) ;
504
+ interact ( hl_loop) { |loop_ptr|
505
+ ll:: close ( lifetime_handle_ptr, lifetime_handle_close) ;
506
+ unref ( hl_loop, lifetime_handle_ptr) ;
507
+ log ( debug, "close and unref lifetime handle" ) ;
508
+ } ;
509
+ comm:: recv ( exit_po) ;
510
+ }
511
+ }
0 commit comments