Skip to content

Commit a045e63

Browse files
olsonjefferybrson
authored andcommitted
std: get_monitor_task_gl() is global_loop::get() default
1 parent e15f1d5 commit a045e63

File tree

4 files changed

+120
-10
lines changed

4 files changed

+120
-10
lines changed

src/libstd/uv_global_loop.rs

Lines changed: 103 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
#[doc="
2-
Process-wide, lazily started/stopped libuv event loop interaction.
2+
A process-wide libuv event loop for library use.
33
"];
44

55
import ll = uv_ll;
66
import hl = uv_hl;
77
import get_gl = get;
88

9-
export get, get_single_task_gl;
9+
export get, get_single_task_gl, get_monitor_task_gl;
1010

1111
native mod rustrt {
1212
fn rust_uv_get_kernel_global_chan_ptr() -> *libc::uintptr_t;
13+
fn rust_uv_get_kernel_monitor_global_chan_ptr() -> *libc::uintptr_t;
1314
fn rust_uv_get_kernel_global_async_handle() -> *libc::uintptr_t;
1415
fn rust_compare_and_swap_ptr(address: *libc::uintptr_t,
1516
oldval: libc::uintptr_t,
@@ -20,13 +21,34 @@ native mod rustrt {
2021
Race-free helper to get access to a global task where a libuv
2122
loop is running.
2223
24+
Use `uv::hl::interact`, `uv::hl::ref_handle` and `uv::hl::unref_handle` to
25+
do operations against the global loop that this function returns.
26+
2327
# Return
2428
2529
* A `hl::high_level_loop` that encapsulates communication with the global
2630
loop.
2731
"]
2832
fn get() -> hl::high_level_loop {
29-
ret get_single_task_gl();
33+
ret get_monitor_task_gl();
34+
}
35+
36+
#[doc(hidden)]
37+
fn get_monitor_task_gl() -> hl::high_level_loop {
38+
let monitor_loop_chan =
39+
rustrt::rust_uv_get_kernel_monitor_global_chan_ptr();
40+
ret spawn_global_weak_task(
41+
monitor_loop_chan,
42+
{|weak_exit_po, msg_po, loop_ptr, first_msg|
43+
log(debug, "monitor gl: entering inner loop");
44+
unsafe {
45+
monitor_task_loop_body(weak_exit_po, msg_po, loop_ptr,
46+
copy(first_msg))
47+
}
48+
},
49+
{|msg_ch|
50+
hl::monitor_task_loop({op_chan: msg_ch})
51+
});
3052
}
3153

3254
#[doc(hidden)]
@@ -35,7 +57,7 @@ fn get_single_task_gl() -> hl::high_level_loop {
3557
ret spawn_global_weak_task(
3658
global_loop_chan_ptr,
3759
{|weak_exit_po, msg_po, loop_ptr, first_msg|
38-
log(debug, "about to enter inner loop");
60+
log(debug, "single-task gl: about to enter inner loop");
3961
unsafe {
4062
single_task_loop_body(weak_exit_po, msg_po, loop_ptr,
4163
copy(first_msg))
@@ -135,6 +157,83 @@ unsafe fn outer_global_loop_body(
135157

136158
ll::loop_delete(loop_ptr);
137159
}
160+
161+
unsafe fn monitor_task_loop_body(weak_exit_po_in: comm::port<()>,
162+
msg_po_in: comm::port<hl::high_level_msg>,
163+
loop_ptr: *libc::c_void,
164+
-first_interaction: hl::high_level_msg) -> bool {
165+
// resend the msg to be handled in the select2 loop below..
166+
comm::send(comm::chan(msg_po_in), first_interaction);
167+
168+
// our async_handle
169+
let async_handle_po = comm::port::<*ll::uv_async_t>();
170+
let async_handle_ch = comm::chan(async_handle_po);
171+
172+
// the msg_po that libuv will be receiving on..
173+
let loop_msg_po = comm::port::<hl::high_level_msg>();
174+
let loop_msg_po_ptr = ptr::addr_of(loop_msg_po);
175+
let loop_msg_ch = comm::chan(loop_msg_po);
176+
177+
// the question of whether unsupervising this will even do any
178+
// good is there.. but since this'll go into blocking in libuv with
179+
// a quickness.. any errors that occur (including inside crust) will
180+
// be segfaults.. so yeah.
181+
task::spawn_sched(task::manual_threads(1u)) {||
182+
let loop_msg_po_in = *loop_msg_po_ptr;
183+
hl::run_high_level_loop(
184+
loop_ptr,
185+
loop_msg_po_in, // here the loop gets handed a different message
186+
// port, as we'll be receiving all of the messages
187+
// initially and then passing them on..
188+
// before_run
189+
{|async_handle|
190+
log(debug,#fmt("monitor gl: before_run: async_handle %?",
191+
async_handle));
192+
// when this is ran, our async_handle is set up, so let's
193+
// do an async_send with it.. letting the loop know, once it
194+
// starts, that is has work
195+
ll::async_send(async_handle);
196+
comm::send(async_handle_ch, copy(async_handle));
197+
},
198+
// before_msg_drain
199+
{|async_handle|
200+
log(debug,#fmt("monitor gl: b4_msg_drain: async_handle %?",
201+
async_handle));
202+
true
203+
},
204+
// before_tear_down
205+
{|async_handle|
206+
log(debug,#fmt("monitor gl: b4_tear_down: async_handle %?",
207+
async_handle));
208+
});
209+
};
210+
211+
// our loop is set up, so let's emit the handle back out to our users..
212+
let async_handle = comm::recv(async_handle_po);
213+
// supposed to return a bool to indicate to the enclosing loop whether
214+
// it should continue or not..
215+
let mut continue_inner_loop = true;
216+
let mut didnt_get_hl_bailout = true;
217+
while continue_inner_loop {
218+
log(debug, "monitor task inner loop.. about to block on select2");
219+
continue_inner_loop = either::either(
220+
{|left_val|
221+
// bail out..
222+
log(debug, #fmt("monitor inner weak_exit_po recv'd msg: %?",
223+
left_val));
224+
// TODO: make loop bail out
225+
didnt_get_hl_bailout = false;
226+
false
227+
}, {|right_val|
228+
// wake up our inner loop and pass it a msg..
229+
comm::send(loop_msg_ch, copy(right_val));
230+
ll::async_send(async_handle);
231+
true
232+
}, comm::select2(weak_exit_po_in, msg_po_in)
233+
)
234+
}
235+
didnt_get_hl_bailout
236+
}
138237

139238
unsafe fn single_task_loop_body(weak_exit_po_in: comm::port<()>,
140239
msg_po_in: comm::port<hl::high_level_msg>,

src/libstd/uv_hl.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -193,12 +193,17 @@ unsafe fn send_high_level_msg(hl_loop: high_level_loop,
193193
// the loop isn't active, so we don't need to wake it up,
194194
// (the loop's enclosing task should be blocking on a message
195195
// receive on this port)
196-
if (*(hl_loop.async_handle()) != 0 as *ll::uv_async_t) {
197-
log(debug,"global async handle != 0, waking up loop..");
198-
ll::async_send(*(hl_loop.async_handle()));
199-
}
200-
else {
201-
log(debug,"GLOBAL ASYNC handle == 0");
196+
alt hl_loop {
197+
single_task_loop({async_handle, op_chan}) {
198+
if ((*async_handle) != 0 as *ll::uv_async_t) {
199+
log(debug,"global async handle != 0, waking up loop..");
200+
ll::async_send((*async_handle));
201+
}
202+
else {
203+
log(debug,"GLOBAL ASYNC handle == 0");
204+
}
205+
}
206+
_ {}
202207
}
203208
}
204209

src/rt/rust_uv.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,11 @@ rust_uv_get_kernel_global_chan_ptr() {
446446
return result;
447447
}
448448

449+
extern "C" uintptr_t*
450+
rust_uv_get_kernel_monitor_global_chan_ptr() {
451+
return rust_uv_get_kernel_global_chan_ptr();
452+
}
453+
449454
extern "C" uintptr_t*
450455
rust_uv_get_kernel_global_async_handle() {
451456
return rust_get_current_task()->kernel->get_global_async_handle();

src/rt/rustrt.def.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ rust_uv_set_data_for_req
138138
rust_uv_get_base_from_buf
139139
rust_uv_get_len_from_buf
140140
rust_uv_get_kernel_global_chan_ptr
141+
rust_uv_get_kernel_monitor_global_chan_ptr
141142
rust_uv_get_kernel_global_async_handle
142143
rust_dbg_lock_create
143144
rust_dbg_lock_destroy

0 commit comments

Comments
 (0)