Skip to content

Commit fc6c9a5

Browse files
committed
---
yaml --- r: 36091 b: refs/heads/try2 c: 61bb357 h: refs/heads/master i: 36089: 8f3a73f 36087: bb3f606 v: v3
1 parent 37389fa commit fc6c9a5

34 files changed

+426
-140
lines changed

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ refs/heads/snap-stage3: eb8fd119c65c67f3b1b8268cc7341c22d39b7b61
55
refs/heads/try: d324a424d8f84b1eb049b12cf34182bda91b0024
66
refs/tags/release-0.1: 1f5c5126e96c79d22cb7862f75304136e204f105
77
refs/heads/ndm: f3868061cd7988080c30d6d5bf352a5a5fe2460b
8-
refs/heads/try2: d29962f0eb2772c208a299efae86ad16bc25f7de
8+
refs/heads/try2: 61bb3571a59f4659a0a46565c71fa7ecfa352811
99
refs/heads/incoming: d9317a174e434d4c99fc1a37fd7dc0d2f5328d37
1010
refs/heads/dist-snap: 22efa39382d41b084fde1719df7ae8ce5697d8c9
1111
refs/tags/release-0.2: c870d2dffb391e14efb05aa27898f1f6333a9596

branches/try2/src/libcore/core.rc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ pub mod task {
181181
pub mod spawn;
182182
pub mod rt;
183183
}
184+
pub mod future;
184185
pub mod pipes;
185186

186187
// Runtime and language-primitive support

branches/try2/src/libstd/future.rs renamed to branches/try2/src/libcore/future.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
use either::Either;
20-
use pipes::{recv, oneshot, ChanOne, PortOne, send_one, recv_one};
20+
use pipes::recv;
2121
use cast::copy_lifetime;
2222

2323
#[doc = "The future type"]
@@ -67,7 +67,7 @@ pub fn from_value<A>(val: A) -> Future<A> {
6767
Future {state: Forced(~(move val))}
6868
}
6969

70-
pub fn from_port<A:Send>(port: PortOne<A>) ->
70+
pub fn from_port<A:Send>(port: future_pipe::client::waiting<A>) ->
7171
Future<A> {
7272
/*!
7373
* Create a future from a port
@@ -82,7 +82,7 @@ pub fn from_port<A:Send>(port: PortOne<A>) ->
8282
port_ <-> *port;
8383
let port = option::unwrap(move port_);
8484
match recv(move port) {
85-
oneshot::send(move data) => move data
85+
future_pipe::completed(move data) => move data
8686
}
8787
}
8888
}
@@ -107,15 +107,9 @@ pub fn spawn<A:Send>(blk: fn~() -> A) -> Future<A> {
107107
* value of the future.
108108
*/
109109

110-
let (chan, port) = oneshot::init();
111-
112-
let chan = ~mut Some(move chan);
113-
do task::spawn |move blk, move chan| {
114-
let chan = option::swap_unwrap(&mut *chan);
115-
send_one(move chan, blk());
116-
}
117-
118-
return from_port(move port);
110+
from_port(pipes::spawn_service_recv(future_pipe::init, |move blk, ch| {
111+
future_pipe::server::completed(move ch, blk());
112+
}))
119113
}
120114

121115
pub fn get_ref<A>(future: &r/Future<A>) -> &r/A {
@@ -168,6 +162,12 @@ pub fn with<A,B>(future: &Future<A>, blk: fn((&A)) -> B) -> B {
168162
blk(get_ref(future))
169163
}
170164

165+
proto! future_pipe (
166+
waiting:recv<T:Send> {
167+
completed(T) -> !
168+
}
169+
)
170+
171171
#[allow(non_implicitly_copyable_typarams)]
172172
pub mod test {
173173
#[test]
@@ -178,8 +178,8 @@ pub mod test {
178178

179179
#[test]
180180
pub fn test_from_port() {
181-
let (ch, po) = oneshot::init();
182-
send_one(move ch, ~"whale");
181+
let (po, ch) = future_pipe::init();
182+
future_pipe::server::completed(move ch, ~"whale");
183183
let f = from_port(move po);
184184
assert get(&f) == ~"whale";
185185
}

branches/try2/src/libcore/private.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -581,20 +581,16 @@ pub mod tests {
581581

582582
for uint::range(0, num_tasks) |_i| {
583583
let total = total.clone();
584-
let (chan, port) = pipes::stream();
585-
futures.push(move port);
586-
587-
do task::spawn |move total, move chan| {
584+
futures.push(future::spawn(|move total| {
588585
for uint::range(0, count) |_i| {
589586
do total.with |count| {
590587
**count += 1;
591588
}
592589
}
593-
chan.send(());
594-
}
590+
}));
595591
};
596592

597-
for futures.each |f| { f.recv() }
593+
for futures.each |f| { f.get() }
598594

599595
do total.with |total| {
600596
assert **total == num_tasks * count
@@ -646,7 +642,7 @@ pub mod tests {
646642
// Have to get rid of our reference before blocking.
647643
{ let _x = move x; } // FIXME(#3161) util::ignore doesn't work here
648644
let res = option::swap_unwrap(&mut res);
649-
res.recv();
645+
future::get(&res);
650646
}
651647

652648
#[test] #[should_fail] #[ignore(cfg(windows))]
@@ -661,7 +657,7 @@ pub mod tests {
661657
}
662658
assert unwrap_exclusive(move x) == ~~"hello";
663659
let res = option::swap_unwrap(&mut res);
664-
res.recv();
660+
future::get(&res);
665661
}
666662

667663
#[test] #[ignore(cfg(windows))]

branches/try2/src/libcore/task.rs

Lines changed: 67 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use cmp::Eq;
3232
use result::Result;
3333
use pipes::{stream, Chan, Port};
3434
use local_data_priv::{local_get, local_set};
35-
use util::replace;
3635

3736
use rt::task_id;
3837
use rt::rust_task;
@@ -73,6 +72,25 @@ impl TaskResult : Eq {
7372
pure fn ne(other: &TaskResult) -> bool { !self.eq(other) }
7473
}
7574

75+
/// A message type for notifying of task lifecycle events
76+
pub enum Notification {
77+
/// Sent when a task exits with the task handle and result
78+
Exit(Task, TaskResult)
79+
}
80+
81+
impl Notification : cmp::Eq {
82+
pure fn eq(other: &Notification) -> bool {
83+
match self {
84+
Exit(e0a, e1a) => {
85+
match (*other) {
86+
Exit(e0b, e1b) => e0a == e0b && e1a == e1b
87+
}
88+
}
89+
}
90+
}
91+
pure fn ne(other: &Notification) -> bool { !self.eq(other) }
92+
}
93+
7694
/// Scheduler modes
7795
pub enum SchedMode {
7896
/// All tasks run in the same OS thread
@@ -182,7 +200,7 @@ pub type SchedOpts = {
182200
pub type TaskOpts = {
183201
linked: bool,
184202
supervised: bool,
185-
mut notify_chan: Option<Chan<TaskResult>>,
203+
mut notify_chan: Option<Chan<Notification>>,
186204
sched: Option<SchedOpts>,
187205
};
188206

@@ -228,7 +246,11 @@ priv impl TaskBuilder {
228246
fail ~"Cannot copy a task_builder"; // Fake move mode on self
229247
}
230248
self.consumed = true;
231-
let notify_chan = replace(&mut self.opts.notify_chan, None);
249+
let notify_chan = if self.opts.notify_chan.is_none() {
250+
None
251+
} else {
252+
Some(option::swap_unwrap(&mut self.opts.notify_chan))
253+
};
232254
TaskBuilder({
233255
opts: {
234256
linked: self.opts.linked,
@@ -249,7 +271,11 @@ impl TaskBuilder {
249271
* the other will not be killed.
250272
*/
251273
fn unlinked() -> TaskBuilder {
252-
let notify_chan = replace(&mut self.opts.notify_chan, None);
274+
let notify_chan = if self.opts.notify_chan.is_none() {
275+
None
276+
} else {
277+
Some(option::swap_unwrap(&mut self.opts.notify_chan))
278+
};
253279
TaskBuilder({
254280
opts: {
255281
linked: false,
@@ -267,7 +293,11 @@ impl TaskBuilder {
267293
* the child.
268294
*/
269295
fn supervised() -> TaskBuilder {
270-
let notify_chan = replace(&mut self.opts.notify_chan, None);
296+
let notify_chan = if self.opts.notify_chan.is_none() {
297+
None
298+
} else {
299+
Some(option::swap_unwrap(&mut self.opts.notify_chan))
300+
};
271301
TaskBuilder({
272302
opts: {
273303
linked: false,
@@ -284,7 +314,11 @@ impl TaskBuilder {
284314
* other will be killed.
285315
*/
286316
fn linked() -> TaskBuilder {
287-
let notify_chan = replace(&mut self.opts.notify_chan, None);
317+
let notify_chan = if self.opts.notify_chan.is_none() {
318+
None
319+
} else {
320+
Some(option::swap_unwrap(&mut self.opts.notify_chan))
321+
};
288322
TaskBuilder({
289323
opts: {
290324
linked: true,
@@ -314,7 +348,7 @@ impl TaskBuilder {
314348
* # Failure
315349
* Fails if a future_result was already set for this task.
316350
*/
317-
fn future_result(blk: fn(v: Port<TaskResult>)) -> TaskBuilder {
351+
fn future_result(blk: fn(v: future::Future<TaskResult>)) -> TaskBuilder {
318352
// FIXME (#3725): Once linked failure and notification are
319353
// handled in the library, I can imagine implementing this by just
320354
// registering an arbitrary number of task::on_exit handlers and
@@ -325,9 +359,13 @@ impl TaskBuilder {
325359
}
326360
327361
// Construct the future and give it to the caller.
328-
let (notify_pipe_ch, notify_pipe_po) = stream::<TaskResult>();
362+
let (notify_pipe_ch, notify_pipe_po) = stream::<Notification>();
329363
330-
blk(move notify_pipe_po);
364+
blk(do future::from_fn |move notify_pipe_po| {
365+
match notify_pipe_po.recv() {
366+
Exit(_, result) => result
367+
}
368+
});
331369
332370
// Reconfigure self to use a notify channel.
333371
TaskBuilder({
@@ -343,7 +381,11 @@ impl TaskBuilder {
343381
}
344382
/// Configure a custom scheduler mode for the task.
345383
fn sched_mode(mode: SchedMode) -> TaskBuilder {
346-
let notify_chan = replace(&mut self.opts.notify_chan, None);
384+
let notify_chan = if self.opts.notify_chan.is_none() {
385+
None
386+
} else {
387+
Some(option::swap_unwrap(&mut self.opts.notify_chan))
388+
};
347389
TaskBuilder({
348390
opts: {
349391
linked: self.opts.linked,
@@ -370,7 +412,11 @@ impl TaskBuilder {
370412
*/
371413
fn add_wrapper(wrapper: fn@(v: fn~()) -> fn~()) -> TaskBuilder {
372414
let prev_gen_body = self.gen_body;
373-
let notify_chan = replace(&mut self.opts.notify_chan, None);
415+
let notify_chan = if self.opts.notify_chan.is_none() {
416+
None
417+
} else {
418+
Some(option::swap_unwrap(&mut self.opts.notify_chan))
419+
};
374420
TaskBuilder({
375421
opts: {
376422
linked: self.opts.linked,
@@ -401,7 +447,13 @@ impl TaskBuilder {
401447
* must be greater than zero.
402448
*/
403449
fn spawn(f: fn~()) {
404-
let notify_chan = replace(&mut self.opts.notify_chan, None);
450+
let notify_chan = if self.opts.notify_chan.is_none() {
451+
None
452+
} else {
453+
let swapped_notify_chan =
454+
option::swap_unwrap(&mut self.opts.notify_chan);
455+
Some(move swapped_notify_chan)
456+
};
405457
let x = self.consume();
406458
let opts = {
407459
linked: x.opts.linked,
@@ -480,7 +532,7 @@ impl TaskBuilder {
480532
do fr_task_builder.spawn |move f| {
481533
comm::send(ch, f());
482534
}
483-
match option::unwrap(move result).recv() {
535+
match future::get(&option::unwrap(move result)) {
484536
Success => result::Ok(comm::recv(po)),
485537
Failure => result::Err(())
486538
}
@@ -897,14 +949,14 @@ fn test_add_wrapper() {
897949
fn test_future_result() {
898950
let mut result = None;
899951
do task().future_result(|+r| { result = Some(move r); }).spawn { }
900-
assert option::unwrap(move result).recv() == Success;
952+
assert future::get(&option::unwrap(move result)) == Success;
901953

902954
result = None;
903955
do task().future_result(|+r|
904956
{ result = Some(move r); }).unlinked().spawn {
905957
fail;
906958
}
907-
assert option::unwrap(move result).recv() == Failure;
959+
assert future::get(&option::unwrap(move result)) == Failure;
908960
}
909961

910962
#[test] #[should_fail] #[ignore(cfg(windows))]

branches/try2/src/libcore/task/spawn.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -320,15 +320,15 @@ fn TCB(me: *rust_task, tasks: TaskGroupArc, ancestors: AncestorList,
320320
}
321321

322322
struct AutoNotify {
323-
notify_chan: Chan<TaskResult>,
323+
notify_chan: Chan<Notification>,
324324
mut failed: bool,
325325
drop {
326326
let result = if self.failed { Failure } else { Success };
327-
self.notify_chan.send(result);
327+
self.notify_chan.send(Exit(get_task(), result));
328328
}
329329
}
330330

331-
fn AutoNotify(chan: Chan<TaskResult>) -> AutoNotify {
331+
fn AutoNotify(chan: Chan<Notification>) -> AutoNotify {
332332
AutoNotify {
333333
notify_chan: move chan,
334334
failed: true // Un-set above when taskgroup successfully made.
@@ -532,7 +532,7 @@ pub fn spawn_raw(opts: TaskOpts, f: fn~()) {
532532
// (4) ...and runs the provided body function.
533533
fn make_child_wrapper(child: *rust_task, child_arc: TaskGroupArc,
534534
ancestors: AncestorList, is_main: bool,
535-
notify_chan: Option<Chan<TaskResult>>,
535+
notify_chan: Option<Chan<Notification>>,
536536
f: fn~()) -> fn~() {
537537
let child_data = ~mut Some((move child_arc, move ancestors));
538538
return fn~(move notify_chan, move child_data, move f) {
@@ -660,30 +660,36 @@ fn test_spawn_raw_unsupervise() {
660660
#[test]
661661
#[ignore(cfg(windows))]
662662
fn test_spawn_raw_notify_success() {
663+
let (task_ch, task_po) = pipes::stream();
663664
let (notify_ch, notify_po) = pipes::stream();
664665

665666
let opts = {
666667
notify_chan: Some(move notify_ch),
667668
.. default_task_opts()
668669
};
669-
do spawn_raw(move opts) {
670+
do spawn_raw(move opts) |move task_ch| {
671+
task_ch.send(get_task());
670672
}
671-
assert notify_po.recv() == Success;
673+
let task_ = task_po.recv();
674+
assert notify_po.recv() == Exit(task_, Success);
672675
}
673676

674677
#[test]
675678
#[ignore(cfg(windows))]
676679
fn test_spawn_raw_notify_failure() {
677680
// New bindings for these
681+
let (task_ch, task_po) = pipes::stream();
678682
let (notify_ch, notify_po) = pipes::stream();
679683

680684
let opts = {
681685
linked: false,
682686
notify_chan: Some(move notify_ch),
683687
.. default_task_opts()
684688
};
685-
do spawn_raw(move opts) {
689+
do spawn_raw(move opts) |move task_ch| {
690+
task_ch.send(get_task());
686691
fail;
687692
}
688-
assert notify_po.recv() == Failure;
693+
let task_ = task_po.recv();
694+
assert notify_po.recv() == Exit(task_, Failure);
689695
}

branches/try2/src/libstd/arc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -651,7 +651,7 @@ mod tests {
651651
}
652652

653653
// Wait for children to pass their asserts
654-
for vec::each(children) |r| { r.recv(); }
654+
for vec::each(children) |r| { future::get(r); }
655655

656656
// Wait for writer to finish
657657
p.recv();

0 commit comments

Comments
 (0)