Skip to content

Commit 01dc4a8

Browse files
committed
core: Add priv::weaken_task
1 parent a6e748a commit 01dc4a8

File tree

7 files changed

+199
-19
lines changed

7 files changed

+199
-19
lines changed

src/libcore/comm.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@ enum rust_port {}
3737

3838
#[abi = "cdecl"]
3939
native mod rustrt {
40-
fn rust_port_id_send<T: send>(t: *sys::type_desc,
41-
target_port: port_id,
40+
fn rust_port_id_send<T: send>(target_port: port_id,
4241
data: T) -> libc::uintptr_t;
4342

4443
fn new_port(unit_sz: libc::size_t) -> *rust_port;
@@ -114,7 +113,7 @@ whereupon the caller loses access to it.
114113
"]
115114
fn send<T: send>(ch: chan<T>, -data: T) {
116115
let chan_t(p) = ch;
117-
let res = rustrt::rust_port_id_send(sys::get_type_desc::<T>(), p, data);
116+
let res = rustrt::rust_port_id_send(p, data);
118117
if res != 0u unsafe {
119118
// Data sent successfully
120119
unsafe::forget(data);

src/libcore/priv.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,14 @@ export chan_from_global_ptr;
44

55
import compare_and_swap = rustrt::rust_compare_and_swap_ptr;
66

7+
type rust_port_id = uint;
8+
79
native mod rustrt {
810
fn rust_compare_and_swap_ptr(address: *libc::uintptr_t,
911
oldval: libc::uintptr_t,
1012
newval: libc::uintptr_t) -> bool;
13+
fn rust_task_weaken(ch: rust_port_id);
14+
fn rust_task_unweaken(ch: rust_port_id);
1115
}
1216

1317
type global_ptr<T: send> = *libc::uintptr_t;
@@ -143,3 +147,83 @@ fn test_from_global_chan2() unsafe {
143147
assert winners == 1u;
144148
}
145149
}
150+
151+
#[doc = "
152+
Convert the current task to a 'weak' task temporarily
153+
154+
As a weak task it will not be counted towards the runtime's set
155+
of live tasks. When there are no more outstanding live (non-weak) tasks
156+
the runtime will send an exit message on the provided channel.
157+
158+
This function is super-unsafe. Do not use.
159+
160+
# Safety notes
161+
162+
* Weak tasks must either die on their own or exit upon receipt of
163+
the exit message. Failure to do so will cause the runtime to never
164+
exit
165+
* Tasks must not call `weaken_task` multiple times. This will
166+
break the kernel's accounting of live tasks.
167+
* Weak tasks must not be supervised. A supervised task keeps
168+
a reference to its parent, so the parent will not die.
169+
"]
170+
unsafe fn weaken_task(f: fn(comm::port<()>)) unsafe {
171+
let po = comm::port();
172+
let ch = comm::chan(po);
173+
rustrt::rust_task_weaken(unsafe::reinterpret_cast(ch));
174+
let _unweaken = unweaken(ch);
175+
f(po);
176+
177+
resource unweaken(ch: comm::chan<()>) unsafe {
178+
rustrt::rust_task_unweaken(unsafe::reinterpret_cast(ch));
179+
}
180+
}
181+
182+
#[test]
183+
fn test_weaken_task_then_unweaken() unsafe {
184+
task::try {||
185+
weaken_task {|_po|
186+
}
187+
};
188+
}
189+
190+
#[test]
191+
fn test_weaken_task_wait() unsafe {
192+
let builder = task::builder();
193+
task::unsupervise(builder);
194+
task::run(builder) {||
195+
weaken_task {|po|
196+
comm::recv(po);
197+
}
198+
}
199+
}
200+
201+
#[test]
202+
fn test_weaken_task_stress() unsafe {
203+
// Create a bunch of weak tasks
204+
iter::repeat(100u) {||
205+
task::spawn {||
206+
weaken_task {|_po|
207+
}
208+
}
209+
let builder = task::builder();
210+
task::unsupervise(builder);
211+
task::run(builder) {||
212+
weaken_task {|po|
213+
// Wait for it to tell us to die
214+
comm::recv(po);
215+
}
216+
}
217+
}
218+
}
219+
220+
#[test]
221+
#[ignore(cfg(target_os = "win32"))]
222+
fn test_weaken_task_fail() unsafe {
223+
let res = task::try {||
224+
weaken_task {|_po|
225+
fail;
226+
}
227+
};
228+
assert result::is_failure(res);
229+
}

src/rt/rust_builtin.cpp

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -669,22 +669,9 @@ get_port_id(rust_port *port) {
669669
}
670670

671671
extern "C" CDECL uintptr_t
672-
rust_port_id_send(type_desc *t, rust_port_id target_port_id, void *sptr) {
673-
bool sent = false;
672+
rust_port_id_send(rust_port_id target_port_id, void *sptr) {
674673
rust_task *task = rust_get_current_task();
675-
676-
LOG(task, comm, "rust_port_id*_send port: 0x%" PRIxPTR,
677-
(uintptr_t) target_port_id);
678-
679-
rust_port *port = task->kernel->get_port_by_id(target_port_id);
680-
if(port) {
681-
port->send(sptr);
682-
port->deref();
683-
sent = true;
684-
} else {
685-
LOG(task, comm, "didn't get the port");
686-
}
687-
return (uintptr_t)sent;
674+
return (uintptr_t)task->kernel->send_to_port(target_port_id, sptr);
688675
}
689676

690677
// This is called by an intrinsic on the Rust stack and must run
@@ -782,6 +769,18 @@ rust_compare_and_swap_ptr(intptr_t *address,
782769
return sync::compare_and_swap(address, oldval, newval);
783770
}
784771

772+
extern "C" CDECL void
773+
rust_task_weaken(rust_port_id chan) {
774+
rust_task *task = rust_get_current_task();
775+
task->kernel->weaken_task(chan);
776+
}
777+
778+
extern "C" CDECL void
779+
rust_task_unweaken(rust_port_id chan) {
780+
rust_task *task = rust_get_current_task();
781+
task->kernel->unweaken_task(chan);
782+
}
783+
785784
//
786785
// Local Variables:
787786
// mode: C++

src/rt/rust_kernel.cpp

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11

22

3-
43
#include "rust_kernel.h"
54
#include "rust_port.h"
65
#include "rust_util.h"
76
#include "rust_scheduler.h"
87
#include "rust_sched_launcher.h"
8+
#include <algorithm>
99

1010
#define KLOG_(...) \
1111
KLOG(this, kern, __VA_ARGS__)
@@ -21,6 +21,7 @@ rust_kernel::rust_kernel(rust_env *env) :
2121
max_sched_id(0),
2222
sched_reaper(this),
2323
osmain_driver(NULL),
24+
non_weak_tasks(0),
2425
env(env)
2526
{
2627
// Create the single threaded scheduler that will run on the platform's
@@ -286,6 +287,84 @@ rust_kernel::set_exit_status(int code) {
286287
}
287288
}
288289

290+
void
291+
rust_kernel::register_task() {
292+
KLOG_("Registering task");
293+
uintptr_t new_non_weak_tasks = sync::increment(non_weak_tasks);
294+
KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
295+
}
296+
297+
void
298+
rust_kernel::unregister_task() {
299+
KLOG_("Unregistering task");
300+
uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks);
301+
KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
302+
if (new_non_weak_tasks == 0) {
303+
end_weak_tasks();
304+
}
305+
}
306+
307+
void
308+
rust_kernel::weaken_task(rust_port_id chan) {
309+
{
310+
scoped_lock with(weak_task_lock);
311+
KLOG_("Weakening task with channel %" PRIdPTR, chan);
312+
weak_task_chans.push_back(chan);
313+
}
314+
uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks);
315+
KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
316+
if (new_non_weak_tasks == 0) {
317+
end_weak_tasks();
318+
}
319+
}
320+
321+
void
322+
rust_kernel::unweaken_task(rust_port_id chan) {
323+
uintptr_t new_non_weak_tasks = sync::increment(non_weak_tasks);
324+
KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
325+
{
326+
scoped_lock with(weak_task_lock);
327+
KLOG_("Unweakening task with channel %" PRIdPTR, chan);
328+
std::vector<rust_port_id>::iterator iter =
329+
std::find(weak_task_chans.begin(), weak_task_chans.end(), chan);
330+
if (iter != weak_task_chans.end()) {
331+
weak_task_chans.erase(iter);
332+
}
333+
}
334+
}
335+
336+
void
337+
rust_kernel::end_weak_tasks() {
338+
std::vector<rust_port_id> chancopies;
339+
{
340+
//scoped_lock with(weak_task_lock);
341+
chancopies = weak_task_chans;
342+
weak_task_chans.clear();
343+
}
344+
while (!chancopies.empty()) {
345+
rust_port_id chan = chancopies.back();
346+
chancopies.pop_back();
347+
KLOG_("Notifying weak task " PRIdPTR, chan);
348+
uintptr_t token = 0;
349+
send_to_port(chan, &token);
350+
}
351+
}
352+
353+
bool
354+
rust_kernel::send_to_port(rust_port_id chan, void *sptr) {
355+
KLOG_("rust_port_id*_send port: 0x%" PRIxPTR, (uintptr_t) chan);
356+
357+
rust_port *port = get_port_by_id(chan);
358+
if(port) {
359+
port->send(sptr);
360+
port->deref();
361+
return true;
362+
} else {
363+
KLOG_("didn't get the port");
364+
return false;
365+
}
366+
}
367+
289368
//
290369
// Local Variables:
291370
// mode: C++

src/rt/rust_kernel.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,15 @@ class rust_kernel {
6363
// on the main thread
6464
rust_sched_driver *osmain_driver;
6565

66+
// An atomically updated count of the live, 'non-weak' tasks
67+
uintptr_t non_weak_tasks;
68+
// Protects weak_task_chans
69+
lock_and_signal weak_task_lock;
70+
// A list of weak tasks that need to be told when to exit
71+
std::vector<rust_port_id> weak_task_chans;
72+
6673
rust_scheduler* get_scheduler_by_id_nolock(rust_sched_id id);
74+
void end_weak_tasks();
6775

6876
public:
6977
struct rust_env *env;
@@ -102,6 +110,13 @@ class rust_kernel {
102110
void set_exit_status(int code);
103111

104112
rust_sched_id osmain_sched_id() { return osmain_scheduler; }
113+
114+
void register_task();
115+
void unregister_task();
116+
void weaken_task(rust_port_id chan);
117+
void unweaken_task(rust_port_id chan);
118+
119+
bool send_to_port(rust_port_id chan, void *sptr);
105120
};
106121

107122
template <typename T> struct kernel_owned {

src/rt/rust_scheduler.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ rust_scheduler::create_task(rust_task *spawner, const char *name) {
9292
if (cur_thread >= num_threads)
9393
cur_thread = 0;
9494
}
95+
kernel->register_task();
9596
rust_sched_launcher *thread = threads[thread_no];
9697
return thread->get_loop()->create_task(spawner, name);
9798
}
@@ -106,6 +107,7 @@ rust_scheduler::release_task() {
106107
need_exit = true;
107108
}
108109
}
110+
kernel->unregister_task();
109111
if (need_exit) {
110112
exit();
111113
}

src/rt/rustrt.def.in

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ rust_task_yield
5454
rust_task_is_unwinding
5555
rust_get_task
5656
rust_task_config_notify
57+
rust_task_weaken
58+
rust_task_unweaken
5759
sched_threads
5860
shape_log_str
5961
start_task

0 commit comments

Comments
 (0)