Skip to content

Commit 414f3c7

Browse files
committed
core::rt: Add a simple channel type for passing buffered messages between Scheduler and Task
Called 'Tube' for lack of anything better.
1 parent 40a9de5 commit 414f3c7

File tree

4 files changed

+212
-8
lines changed

4 files changed

+212
-8
lines changed

src/libcore/logging.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use libc;
1919
use repr;
2020
use vec;
2121
use cast;
22+
use str;
2223

2324
/// Turns on logging to stdout globally
2425
pub fn console_on() {
@@ -57,7 +58,7 @@ pub fn log_type<T>(level: u32, object: &T) {
5758
}
5859
_ => {
5960
// XXX: Bad allocation
60-
let msg = bytes.to_str();
61+
let msg = str::from_bytes(bytes);
6162
newsched_log_str(msg);
6263
}
6364
}

src/libcore/rt/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use libc::c_char;
1818
mod sched;
1919

2020
/// Thread-local access to the current Scheduler
21-
mod local_sched;
21+
pub mod local_sched;
2222

2323
/// Synchronous I/O
2424
#[path = "io/mod.rs"]
@@ -68,6 +68,10 @@ pub mod test;
6868
/// Reference counting
6969
pub mod rc;
7070

71+
/// A simple single-threaded channel type for passing buffered data between
72+
/// scheduler and task context
73+
pub mod tube;
74+
7175
/// Set up a default runtime configuration, given compiler-supplied arguments.
7276
///
7377
/// This is invoked by the `start` _language item_ (unstable::lang) to

src/libcore/rt/tube.rs

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2+
// file at the top-level directory of this distribution and at
3+
// http://rust-lang.org/COPYRIGHT.
4+
//
5+
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8+
// option. This file may not be copied, modified, or distributed
9+
// except according to those terms.
10+
11+
//! A very simple unsynchronized channel type for sending buffered data from
12+
//! scheduler context to task context.
13+
//!
14+
//! XXX: This would be safer to use if split into two types like Port/Chan
15+
16+
use option::*;
17+
use clone::Clone;
18+
use super::rc::RC;
19+
use rt::sched::Task;
20+
use rt::{context, TaskContext, SchedulerContext};
21+
use rt::local_sched;
22+
23+
struct TubeState<T> {
24+
blocked_task: Option<~Task>,
25+
buf: ~[T]
26+
}
27+
28+
pub struct Tube<T> {
29+
p: RC<TubeState<T>>
30+
}
31+
32+
impl<T> Tube<T> {
33+
pub fn new() -> Tube<T> {
34+
Tube {
35+
p: RC::new(TubeState {
36+
blocked_task: None,
37+
buf: ~[]
38+
})
39+
}
40+
}
41+
42+
pub fn send(&mut self, val: T) {
43+
rtdebug!("tube send");
44+
assert!(context() == SchedulerContext);
45+
46+
unsafe {
47+
let state = self.p.unsafe_borrow_mut();
48+
(*state).buf.push(val);
49+
50+
if (*state).blocked_task.is_some() {
51+
// There's a waiting task. Wake it up
52+
rtdebug!("waking blocked tube");
53+
let task = (*state).blocked_task.swap_unwrap();
54+
let sched = local_sched::take();
55+
sched.resume_task_immediately(task);
56+
}
57+
}
58+
}
59+
60+
pub fn recv(&mut self) -> T {
61+
assert!(context() == TaskContext);
62+
63+
unsafe {
64+
let state = self.p.unsafe_borrow_mut();
65+
if !(*state).buf.is_empty() {
66+
return (*state).buf.shift();
67+
} else {
68+
// Block and wait for the next message
69+
rtdebug!("blocking on tube recv");
70+
assert!(self.p.refcount() > 1); // There better be somebody to wake us up
71+
assert!((*state).blocked_task.is_none());
72+
let sched = local_sched::take();
73+
do sched.deschedule_running_task_and_then |task| {
74+
(*state).blocked_task = Some(task);
75+
}
76+
rtdebug!("waking after tube recv");
77+
let buf = &mut (*state).buf;
78+
assert!(!buf.is_empty());
79+
return buf.shift();
80+
}
81+
}
82+
}
83+
}
84+
85+
impl<T> Clone for Tube<T> {
86+
fn clone(&self) -> Tube<T> {
87+
Tube { p: self.p.clone() }
88+
}
89+
}
90+
91+
#[cfg(test)]
92+
mod test {
93+
use int;
94+
use cell::Cell;
95+
use rt::local_sched;
96+
use rt::test::*;
97+
use rt::rtio::EventLoop;
98+
use super::*;
99+
100+
#[test]
101+
fn simple_test() {
102+
do run_in_newsched_task {
103+
let mut tube: Tube<int> = Tube::new();
104+
let tube_clone = tube.clone();
105+
let tube_clone_cell = Cell(tube_clone);
106+
let sched = local_sched::take();
107+
do sched.deschedule_running_task_and_then |task| {
108+
let mut tube_clone = tube_clone_cell.take();
109+
tube_clone.send(1);
110+
let sched = local_sched::take();
111+
sched.resume_task_immediately(task);
112+
}
113+
114+
assert!(tube.recv() == 1);
115+
}
116+
}
117+
118+
#[test]
119+
fn blocking_test() {
120+
do run_in_newsched_task {
121+
let mut tube: Tube<int> = Tube::new();
122+
let tube_clone = tube.clone();
123+
let tube_clone = Cell(Cell(Cell(tube_clone)));
124+
let sched = local_sched::take();
125+
do sched.deschedule_running_task_and_then |task| {
126+
let tube_clone = tube_clone.take();
127+
do local_sched::borrow |sched| {
128+
let tube_clone = tube_clone.take();
129+
do sched.event_loop.callback {
130+
let mut tube_clone = tube_clone.take();
131+
// The task should be blocked on this now and
132+
// sending will wake it up.
133+
tube_clone.send(1);
134+
}
135+
}
136+
let sched = local_sched::take();
137+
sched.resume_task_immediately(task);
138+
}
139+
140+
assert!(tube.recv() == 1);
141+
}
142+
}
143+
144+
#[test]
145+
fn many_blocking_test() {
146+
static MAX: int = 100;
147+
148+
do run_in_newsched_task {
149+
let mut tube: Tube<int> = Tube::new();
150+
let tube_clone = tube.clone();
151+
let tube_clone = Cell(tube_clone);
152+
let sched = local_sched::take();
153+
do sched.deschedule_running_task_and_then |task| {
154+
callback_send(tube_clone.take(), 0);
155+
156+
fn callback_send(tube: Tube<int>, i: int) {
157+
if i == 100 { return; }
158+
159+
let tube = Cell(Cell(tube));
160+
do local_sched::borrow |sched| {
161+
let tube = tube.take();
162+
do sched.event_loop.callback {
163+
let mut tube = tube.take();
164+
// The task should be blocked on this now and
165+
// sending will wake it up.
166+
tube.send(i);
167+
callback_send(tube, i + 1);
168+
}
169+
}
170+
}
171+
172+
let sched = local_sched::take();
173+
sched.resume_task_immediately(task);
174+
}
175+
176+
for int::range(0, MAX) |i| {
177+
let j = tube.recv();
178+
assert!(j == i);
179+
}
180+
}
181+
}
182+
}

src/libcore/sys.rs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -202,10 +202,12 @@ impl FailWithCause for &'static str {
202202

203203
// FIXME #4427: Temporary until rt::rt_fail_ goes away
204204
pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! {
205-
use rt::{context, OldTaskContext};
206-
use rt::local_services::unsafe_borrow_local_services;
205+
use option::Option;
206+
use rt::{context, OldTaskContext, TaskContext};
207+
use rt::local_services::{unsafe_borrow_local_services, Unwinder};
207208

208-
match context() {
209+
let context = context();
210+
match context {
209211
OldTaskContext => {
210212
unsafe {
211213
gc::cleanup_stack_for_failure();
@@ -214,11 +216,26 @@ pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! {
214216
}
215217
}
216218
_ => {
217-
// XXX: Need to print the failure message
218-
gc::cleanup_stack_for_failure();
219219
unsafe {
220+
// XXX: Bad re-allocations. fail! needs some refactoring
221+
let msg = str::raw::from_c_str(msg);
222+
let file = str::raw::from_c_str(file);
223+
224+
let outmsg = fmt!("%s at line %i of file %s", msg, line as int, file);
225+
226+
// XXX: Logging doesn't work correctly in non-task context because it
227+
// invokes the local heap
228+
if context == TaskContext {
229+
error!(outmsg);
230+
} else {
231+
rtdebug!("%s", outmsg);
232+
}
233+
234+
gc::cleanup_stack_for_failure();
235+
220236
let local_services = unsafe_borrow_local_services();
221-
match (*local_services).unwinder {
237+
let unwinder: &mut Option<Unwinder> = &mut (*local_services).unwinder;
238+
match *unwinder {
222239
Some(ref mut unwinder) => unwinder.begin_unwind(),
223240
None => abort!("failure without unwinder. aborting process")
224241
}

0 commit comments

Comments
 (0)