|
8 | 8 | // option. This file may not be copied, modified, or distributed
|
9 | 9 | // except according to those terms.
|
10 | 10 |
|
| 11 | +use std::cast; |
11 | 12 | use std::libc::c_int;
|
| 13 | +use std::rt::rtio::{Callback, RemoteCallback}; |
| 14 | +use std::unstable::sync::Exclusive; |
12 | 15 |
|
13 | 16 | use uvll;
|
14 |
| -use super::{Watcher, Loop, NativeHandle, AsyncCallback, WatcherInterop}; |
15 |
| -use super::status_to_maybe_uv_error; |
| 17 | +use super::{Loop, UvHandle}; |
16 | 18 |
|
17 |
| -pub struct AsyncWatcher(*uvll::uv_async_t); |
18 |
| -impl Watcher for AsyncWatcher { } |
| 19 | +// The entire point of async is to call into a loop from other threads so it |
| 20 | +// does not need to home. |
| 21 | +pub struct AsyncWatcher { |
| 22 | + handle: *uvll::uv_async_t, |
| 23 | + |
| 24 | + // A flag to tell the callback to exit, set from the dtor. This is |
| 25 | + // almost never contested - only in rare races with the dtor. |
| 26 | + exit_flag: Exclusive<bool> |
| 27 | +} |
| 28 | + |
| 29 | +struct Payload { |
| 30 | + callback: ~Callback, |
| 31 | + exit_flag: Exclusive<bool>, |
| 32 | +} |
19 | 33 |
|
20 | 34 | impl AsyncWatcher {
|
21 |
| - pub fn new(loop_: &mut Loop, cb: AsyncCallback) -> AsyncWatcher { |
| 35 | + pub fn new(loop_: &mut Loop, cb: ~Callback) -> AsyncWatcher { |
| 36 | + let handle = UvHandle::alloc(None::<AsyncWatcher>, uvll::UV_ASYNC); |
| 37 | + assert_eq!(unsafe { |
| 38 | + uvll::async_init(loop_.native_handle(), handle, async_cb) |
| 39 | + }, 0); |
| 40 | + let flag = Exclusive::new(false); |
| 41 | + let payload = ~Payload { callback: cb, exit_flag: flag.clone() }; |
22 | 42 | unsafe {
|
23 |
| - let handle = uvll::malloc_handle(uvll::UV_ASYNC); |
24 |
| - assert!(handle.is_not_null()); |
25 |
| - let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle); |
26 |
| - watcher.install_watcher_data(); |
27 |
| - let data = watcher.get_watcher_data(); |
28 |
| - data.async_cb = Some(cb); |
29 |
| - assert_eq!(0, uvll::uv_async_init(loop_.native_handle(), handle, async_cb)); |
30 |
| - return watcher; |
| 43 | + let payload: *u8 = cast::transmute(payload); |
| 44 | + uvll::set_data_for_uv_handle(handle, payload); |
31 | 45 | }
|
| 46 | + return AsyncWatcher { handle: handle, exit_flag: flag, }; |
| 47 | + } |
| 48 | +} |
32 | 49 |
|
33 |
| - extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { |
34 |
| - let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle); |
35 |
| - let status = status_to_maybe_uv_error(status); |
36 |
| - let data = watcher.get_watcher_data(); |
37 |
| - let cb = data.async_cb.get_ref(); |
38 |
| - (*cb)(watcher, status); |
39 |
| - } |
| 50 | +impl UvHandle<uvll::uv_async_t> for AsyncWatcher { |
| 51 | + fn uv_handle(&self) -> *uvll::uv_async_t { self.handle } |
| 52 | + unsafe fn from_uv_handle<'a>(h: &'a *T) -> &'a mut AsyncWatcher { |
| 53 | + fail!("async watchers can't be built from their handles"); |
40 | 54 | }
|
| 55 | +} |
| 56 | + |
| 57 | +extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { |
| 58 | + assert!(status == 0); |
| 59 | + let payload: &mut Payload = unsafe { |
| 60 | + cast::transmute(uvll::get_data_for_uv_handle(handle)) |
| 61 | + }; |
| 62 | + |
| 63 | + // The synchronization logic here is subtle. To review, |
| 64 | + // the uv async handle type promises that, after it is |
| 65 | + // triggered the remote callback is definitely called at |
| 66 | + // least once. UvRemoteCallback needs to maintain those |
| 67 | + // semantics while also shutting down cleanly from the |
| 68 | + // dtor. In our case that means that, when the |
| 69 | + // UvRemoteCallback dtor calls `async.send()`, here `f` is |
| 70 | + // always called later. |
| 71 | + |
| 72 | + // In the dtor both the exit flag is set and the async |
| 73 | + // callback fired under a lock. Here, before calling `f`, |
| 74 | + // we take the lock and check the flag. Because we are |
| 75 | + // checking the flag before calling `f`, and the flag is |
| 76 | + // set under the same lock as the send, then if the flag |
| 77 | + // is set then we're guaranteed to call `f` after the |
| 78 | + // final send. |
| 79 | + |
| 80 | + // If the check was done after `f()` then there would be a |
| 81 | + // period between that call and the check where the dtor |
| 82 | + // could be called in the other thread, missing the final |
| 83 | + // callback while still destroying the handle. |
| 84 | + |
| 85 | + let should_exit = unsafe { |
| 86 | + payload.exit_flag.with_imm(|&should_exit| should_exit) |
| 87 | + }; |
| 88 | + |
| 89 | + payload.callback.call(); |
| 90 | + |
| 91 | + if should_exit { |
| 92 | + unsafe { uvll::close(handle, close_cb) } |
| 93 | + } |
| 94 | +} |
41 | 95 |
|
42 |
| - pub fn send(&mut self) { |
| 96 | +extern fn close_cb(handle: *uvll::uv_handle_t) { |
| 97 | + // drop the payload |
| 98 | + let _payload: ~Payload = unsafe { |
| 99 | + cast::transmute(uvll::get_data_for_uv_handle(handle)) |
| 100 | + }; |
| 101 | + // and then free the handle |
| 102 | + unsafe { uvll::free_handle(handle) } |
| 103 | +} |
| 104 | + |
| 105 | +impl RemoteCallback for AsyncWatcher { |
| 106 | + fn fire(&mut self) { |
| 107 | + unsafe { uvll::async_send(self.handle) } |
| 108 | + } |
| 109 | +} |
| 110 | + |
| 111 | +impl Drop for AsyncWatcher { |
| 112 | + fn drop(&mut self) { |
43 | 113 | unsafe {
|
44 |
| - let handle = self.native_handle(); |
45 |
| - uvll::uv_async_send(handle); |
| 114 | + do self.exit_flag.with |should_exit| { |
| 115 | + // NB: These two things need to happen atomically. Otherwise |
| 116 | + // the event handler could wake up due to a *previous* |
| 117 | + // signal and see the exit flag, destroying the handle |
| 118 | + // before the final send. |
| 119 | + *should_exit = true; |
| 120 | + uvll::async_send(self.handle) |
| 121 | + } |
46 | 122 | }
|
47 | 123 | }
|
48 | 124 | }
|
49 | 125 |
|
50 |
| -impl NativeHandle<*uvll::uv_async_t> for AsyncWatcher { |
51 |
| - fn from_native_handle(handle: *uvll::uv_async_t) -> AsyncWatcher { |
52 |
| - AsyncWatcher(handle) |
53 |
| - } |
54 |
| - fn native_handle(&self) -> *uvll::uv_async_t { |
55 |
| - match self { &AsyncWatcher(ptr) => ptr } |
| 126 | +#[cfg(test)] |
| 127 | +mod test_remote { |
| 128 | + use std::cell::Cell; |
| 129 | + use std::rt::test::*; |
| 130 | + use std::rt::thread::Thread; |
| 131 | + use std::rt::tube::Tube; |
| 132 | + use std::rt::rtio::EventLoop; |
| 133 | + use std::rt::local::Local; |
| 134 | + use std::rt::sched::Scheduler; |
| 135 | + |
| 136 | + #[test] |
| 137 | + fn test_uv_remote() { |
| 138 | + do run_in_mt_newsched_task { |
| 139 | + let mut tube = Tube::new(); |
| 140 | + let tube_clone = tube.clone(); |
| 141 | + let remote_cell = Cell::new_empty(); |
| 142 | + do Local::borrow |sched: &mut Scheduler| { |
| 143 | + let tube_clone = tube_clone.clone(); |
| 144 | + let tube_clone_cell = Cell::new(tube_clone); |
| 145 | + let remote = do sched.event_loop.remote_callback { |
| 146 | + // This could be called multiple times |
| 147 | + if !tube_clone_cell.is_empty() { |
| 148 | + tube_clone_cell.take().send(1); |
| 149 | + } |
| 150 | + }; |
| 151 | + remote_cell.put_back(remote); |
| 152 | + } |
| 153 | + let thread = do Thread::start { |
| 154 | + remote_cell.take().fire(); |
| 155 | + }; |
| 156 | + |
| 157 | + assert!(tube.recv() == 1); |
| 158 | + thread.join(); |
| 159 | + } |
56 | 160 | }
|
57 | 161 | }
|
58 | 162 |
|
|
0 commit comments