Skip to content

Commit 55425d6

Browse files
committed
Make the uv bindings resilient to linked failure
In the ideal world, uv I/O could be canceled safely at any time. In reality, however, we are unable to do this. Right now linked failure is fairly flaky as implemented in the runtime, making it very difficult to test whether the linked failure mechanisms inside of the uv bindings are ready for this kind of interaction. Right now, all constructors will execute in a task::unkillable block, and all homing I/O operations will prevent linked failure in the duration of the homing operation. What this means is that tasks which perform I/O are still susceptible to linked failure, but the I/O operations themselves will never get interrupted. Instead, the linked failure will be received at the edge of the I/O operation.
1 parent 56c2755 commit 55425d6

File tree

12 files changed

+846
-828
lines changed

12 files changed

+846
-828
lines changed

src/librustuv/addrinfo.rs

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,14 @@
99
// except according to those terms.
1010

1111
use ai = std::rt::io::net::addrinfo;
12-
use std::cast;
1312
use std::libc::c_int;
1413
use std::ptr::null;
1514
use std::rt::BlockedTask;
1615
use std::rt::local::Local;
1716
use std::rt::sched::Scheduler;
1817

1918
use net;
20-
use super::{Loop, UvError, Request};
19+
use super::{Loop, UvError, Request, wait_until_woken_after};
2120
use uvll;
2221

2322
struct Addrinfo {
@@ -76,20 +75,19 @@ impl GetAddrInfoRequest {
7675
}
7776
});
7877
let hint_ptr = hint.as_ref().map_default(null(), |x| x as *uvll::addrinfo);
79-
let req = Request::new(uvll::UV_GETADDRINFO);
78+
let mut req = Request::new(uvll::UV_GETADDRINFO);
8079

8180
return match unsafe {
8281
uvll::uv_getaddrinfo(loop_.handle, req.handle,
8382
getaddrinfo_cb, c_node_ptr, c_service_ptr,
8483
hint_ptr)
8584
} {
8685
0 => {
86+
req.defuse(); // uv callback now owns this request
8787
let mut cx = Ctx { slot: None, status: 0, addrinfo: None };
88-
req.set_data(&cx);
89-
req.defuse();
90-
let scheduler: ~Scheduler = Local::take();
91-
do scheduler.deschedule_running_task_and_then |_, task| {
92-
cx.slot = Some(task);
88+
89+
do wait_until_woken_after(&mut cx.slot) {
90+
req.set_data(&cx);
9391
}
9492

9593
match cx.status {
@@ -105,8 +103,8 @@ impl GetAddrInfoRequest {
105103
status: c_int,
106104
res: *uvll::addrinfo) {
107105
let req = Request::wrap(req);
108-
if status == uvll::ECANCELED { return }
109-
let cx: &mut Ctx = unsafe { cast::transmute(req.get_data()) };
106+
assert!(status != uvll::ECANCELED);
107+
let cx: &mut Ctx = unsafe { req.get_data() };
110108
cx.status = status;
111109
cx.addrinfo = Some(Addrinfo { handle: res });
112110

@@ -191,25 +189,23 @@ pub fn accum_addrinfo(addr: &Addrinfo) -> ~[ai::Info] {
191189
mod test {
192190
use std::rt::io::net::ip::{SocketAddr, Ipv4Addr};
193191
use super::*;
194-
use super::super::run_uv_loop;
192+
use super::super::local_loop;
195193

196194
#[test]
197195
fn getaddrinfo_test() {
198-
do run_uv_loop |l| {
199-
match GetAddrInfoRequest::run(l, Some("localhost"), None, None) {
200-
Ok(infos) => {
201-
let mut found_local = false;
202-
let local_addr = &SocketAddr {
203-
ip: Ipv4Addr(127, 0, 0, 1),
204-
port: 0
205-
};
206-
for addr in infos.iter() {
207-
found_local = found_local || addr.address == *local_addr;
208-
}
209-
assert!(found_local);
196+
match GetAddrInfoRequest::run(local_loop(), Some("localhost"), None, None) {
197+
Ok(infos) => {
198+
let mut found_local = false;
199+
let local_addr = &SocketAddr {
200+
ip: Ipv4Addr(127, 0, 0, 1),
201+
port: 0
202+
};
203+
for addr in infos.iter() {
204+
found_local = found_local || addr.address == *local_addr;
210205
}
211-
Err(e) => fail!("{:?}", e),
206+
assert!(found_local);
212207
}
208+
Err(e) => fail!("{:?}", e),
213209
}
214210
}
215211
}

src/librustuv/async.rs

Lines changed: 12 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,12 @@ mod test_remote {
131131
use std::rt::tube::Tube;
132132

133133
use super::*;
134-
use super::super::run_uv_loop;
134+
use super::super::local_loop;
135135

136-
// Make sure that we can fire watchers in remote threads
136+
// Make sure that we can fire watchers in remote threads and that they
137+
// actually trigger what they say they will.
137138
#[test]
138-
fn test_uv_remote() {
139+
fn smoke_test() {
139140
struct MyCallback(Option<Tube<int>>);
140141
impl Callback for MyCallback {
141142
fn call(&mut self) {
@@ -147,35 +148,15 @@ mod test_remote {
147148
}
148149
}
149150

150-
do run_uv_loop |l| {
151-
let mut tube = Tube::new();
152-
let cb = ~MyCallback(Some(tube.clone()));
153-
let watcher = Cell::new(AsyncWatcher::new(l, cb as ~Callback));
154-
155-
let thread = do Thread::start {
156-
watcher.take().fire();
157-
};
151+
let mut tube = Tube::new();
152+
let cb = ~MyCallback(Some(tube.clone()));
153+
let watcher = Cell::new(AsyncWatcher::new(local_loop(), cb as ~Callback));
158154

159-
assert_eq!(tube.recv(), 1);
160-
thread.join();
161-
}
162-
}
163-
164-
#[test]
165-
fn smoke_test() {
166-
static mut hits: uint = 0;
155+
let thread = do Thread::start {
156+
watcher.take().fire();
157+
};
167158

168-
struct MyCallback;
169-
impl Callback for MyCallback {
170-
fn call(&mut self) {
171-
unsafe { hits += 1; }
172-
}
173-
}
174-
175-
do run_uv_loop |l| {
176-
let mut watcher = AsyncWatcher::new(l, ~MyCallback as ~Callback);
177-
watcher.fire();
178-
}
179-
assert!(unsafe { hits > 0 });
159+
assert_eq!(tube.recv(), 1);
160+
thread.join();
180161
}
181162
}

src/librustuv/file.rs

Lines changed: 82 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@ use std::cast;
1515
use std::libc::{c_int, c_char, c_void, c_uint};
1616
use std::libc;
1717
use std::rt::BlockedTask;
18-
use std::rt::io;
1918
use std::rt::io::{FileStat, IoError};
20-
use std::rt::rtio;
19+
use std::rt::io;
2120
use std::rt::local::Local;
21+
use std::rt::rtio;
2222
use std::rt::sched::{Scheduler, SchedHandle};
2323
use std::vec;
2424

25-
use super::{Loop, UvError, uv_error_to_io_error};
25+
use super::{Loop, UvError, uv_error_to_io_error, wait_until_woken_after};
2626
use uvio::HomingIO;
2727
use uvll;
2828

@@ -305,10 +305,8 @@ fn execute(f: &fn(*uvll::uv_fs_t, uvll::uv_fs_cb) -> c_int)
305305
0 => {
306306
req.fired = true;
307307
let mut slot = None;
308-
unsafe { uvll::set_data_for_req(req.req, &slot) }
309-
let sched: ~Scheduler = Local::take();
310-
do sched.deschedule_running_task_and_then |_, task| {
311-
slot = Some(task);
308+
do wait_until_woken_after(&mut slot) {
309+
unsafe { uvll::set_data_for_req(req.req, &slot) }
312310
}
313311
match req.get_result() {
314312
n if n < 0 => Err(UvError(n)),
@@ -454,123 +452,113 @@ mod test {
454452
use std::str;
455453
use std::vec;
456454
use super::*;
457-
use super::super::{run_uv_loop};
455+
use l = super::super::local_loop;
458456

459457
#[test]
460458
fn file_test_full_simple_sync() {
461-
do run_uv_loop |l| {
462-
let create_flags = O_RDWR | O_CREAT;
463-
let read_flags = O_RDONLY;
464-
let mode = S_IWUSR | S_IRUSR;
465-
let path_str = "./tmp/file_full_simple_sync.txt";
466-
467-
{
468-
// open/create
469-
let result = FsRequest::open(l, &path_str.to_c_str(),
470-
create_flags as int, mode as int);
471-
assert!(result.is_ok());
472-
let result = result.unwrap();
473-
let fd = result.fd;
474-
475-
// write
476-
let result = FsRequest::write(l, fd, "hello".as_bytes(), -1);
477-
assert!(result.is_ok());
478-
}
459+
let create_flags = O_RDWR | O_CREAT;
460+
let read_flags = O_RDONLY;
461+
let mode = S_IWUSR | S_IRUSR;
462+
let path_str = "./tmp/file_full_simple_sync.txt";
463+
464+
{
465+
// open/create
466+
let result = FsRequest::open(l(), &path_str.to_c_str(),
467+
create_flags as int, mode as int);
468+
assert!(result.is_ok());
469+
let result = result.unwrap();
470+
let fd = result.fd;
479471

480-
{
481-
// re-open
482-
let result = FsRequest::open(l, &path_str.to_c_str(),
483-
read_flags as int, 0);
484-
assert!(result.is_ok());
485-
let result = result.unwrap();
486-
let fd = result.fd;
487-
488-
// read
489-
let mut read_mem = vec::from_elem(1000, 0u8);
490-
let result = FsRequest::read(l, fd, read_mem, 0);
491-
assert!(result.is_ok());
492-
493-
let nread = result.unwrap();
494-
assert!(nread > 0);
495-
let read_str = str::from_utf8(read_mem.slice(0, nread as uint));
496-
assert_eq!(read_str, ~"hello");
497-
}
498-
// unlink
499-
let result = FsRequest::unlink(l, &path_str.to_c_str());
472+
// write
473+
let result = FsRequest::write(l(), fd, "hello".as_bytes(), -1);
500474
assert!(result.is_ok());
501475
}
476+
477+
{
478+
// re-open
479+
let result = FsRequest::open(l(), &path_str.to_c_str(),
480+
read_flags as int, 0);
481+
assert!(result.is_ok());
482+
let result = result.unwrap();
483+
let fd = result.fd;
484+
485+
// read
486+
let mut read_mem = vec::from_elem(1000, 0u8);
487+
let result = FsRequest::read(l(), fd, read_mem, 0);
488+
assert!(result.is_ok());
489+
490+
let nread = result.unwrap();
491+
assert!(nread > 0);
492+
let read_str = str::from_utf8(read_mem.slice(0, nread as uint));
493+
assert_eq!(read_str, ~"hello");
494+
}
495+
// unlink
496+
let result = FsRequest::unlink(l(), &path_str.to_c_str());
497+
assert!(result.is_ok());
502498
}
503499
504500
#[test]
505501
fn file_test_stat() {
506-
do run_uv_loop |l| {
507-
let path = &"./tmp/file_test_stat_simple".to_c_str();
508-
let create_flags = (O_RDWR | O_CREAT) as int;
509-
let mode = (S_IWUSR | S_IRUSR) as int;
502+
let path = &"./tmp/file_test_stat_simple".to_c_str();
503+
let create_flags = (O_RDWR | O_CREAT) as int;
504+
let mode = (S_IWUSR | S_IRUSR) as int;
510505
511-
let result = FsRequest::open(l, path, create_flags, mode);
512-
assert!(result.is_ok());
513-
let file = result.unwrap();
506+
let result = FsRequest::open(l(), path, create_flags, mode);
507+
assert!(result.is_ok());
508+
let file = result.unwrap();
514509
515-
let result = FsRequest::write(l, file.fd, "hello".as_bytes(), 0);
516-
assert!(result.is_ok());
510+
let result = FsRequest::write(l(), file.fd, "hello".as_bytes(), 0);
511+
assert!(result.is_ok());
517512
518-
let result = FsRequest::stat(l, path);
519-
assert!(result.is_ok());
520-
assert_eq!(result.unwrap().size, 5);
513+
let result = FsRequest::stat(l(), path);
514+
assert!(result.is_ok());
515+
assert_eq!(result.unwrap().size, 5);
521516
522-
fn free<T>(_: T) {}
523-
free(file);
517+
fn free<T>(_: T) {}
518+
free(file);
524519
525-
let result = FsRequest::unlink(l, path);
526-
assert!(result.is_ok());
527-
}
520+
let result = FsRequest::unlink(l(), path);
521+
assert!(result.is_ok());
528522
}
529523
530524
#[test]
531525
fn file_test_mk_rm_dir() {
532-
do run_uv_loop |l| {
533-
let path = &"./tmp/mk_rm_dir".to_c_str();
534-
let mode = S_IWUSR | S_IRUSR;
526+
let path = &"./tmp/mk_rm_dir".to_c_str();
527+
let mode = S_IWUSR | S_IRUSR;
535528
536-
let result = FsRequest::mkdir(l, path, mode);
537-
assert!(result.is_ok());
529+
let result = FsRequest::mkdir(l(), path, mode);
530+
assert!(result.is_ok());
538531
539-
let result = FsRequest::stat(l, path);
540-
assert!(result.is_ok());
541-
assert!(result.unwrap().kind == io::TypeDirectory);
532+
let result = FsRequest::stat(l(), path);
533+
assert!(result.is_ok());
534+
assert!(result.unwrap().kind == io::TypeDirectory);
542535
543-
let result = FsRequest::rmdir(l, path);
544-
assert!(result.is_ok());
536+
let result = FsRequest::rmdir(l(), path);
537+
assert!(result.is_ok());
545538
546-
let result = FsRequest::stat(l, path);
547-
assert!(result.is_err());
548-
}
539+
let result = FsRequest::stat(l(), path);
540+
assert!(result.is_err());
549541
}
550542
551543
#[test]
552544
fn file_test_mkdir_chokes_on_double_create() {
553-
do run_uv_loop |l| {
554-
let path = &"./tmp/double_create_dir".to_c_str();
555-
let mode = S_IWUSR | S_IRUSR;
556-
557-
let result = FsRequest::stat(l, path);
558-
assert!(result.is_err(), "{:?}", result);
559-
let result = FsRequest::mkdir(l, path, mode as c_int);
560-
assert!(result.is_ok(), "{:?}", result);
561-
let result = FsRequest::mkdir(l, path, mode as c_int);
562-
assert!(result.is_err(), "{:?}", result);
563-
let result = FsRequest::rmdir(l, path);
564-
assert!(result.is_ok(), "{:?}", result);
565-
}
545+
let path = &"./tmp/double_create_dir".to_c_str();
546+
let mode = S_IWUSR | S_IRUSR;
547+
548+
let result = FsRequest::stat(l(), path);
549+
assert!(result.is_err(), "{:?}", result);
550+
let result = FsRequest::mkdir(l(), path, mode as c_int);
551+
assert!(result.is_ok(), "{:?}", result);
552+
let result = FsRequest::mkdir(l(), path, mode as c_int);
553+
assert!(result.is_err(), "{:?}", result);
554+
let result = FsRequest::rmdir(l(), path);
555+
assert!(result.is_ok(), "{:?}", result);
566556
}
567557
568558
#[test]
569559
fn file_test_rmdir_chokes_on_nonexistant_path() {
570-
do run_uv_loop |l| {
571-
let path = &"./tmp/never_existed_dir".to_c_str();
572-
let result = FsRequest::rmdir(l, path);
573-
assert!(result.is_err());
574-
}
560+
let path = &"./tmp/never_existed_dir".to_c_str();
561+
let result = FsRequest::rmdir(l(), path);
562+
assert!(result.is_err());
575563
}
576564
}

0 commit comments

Comments
 (0)