Skip to content

Commit 3b7ae02

Browse files
committed
---
yaml --- r: 14327 b: refs/heads/try c: b2cfb7e h: refs/heads/master i: 14325: a741ca5 14323: dd1074b 14319: 9efc0d0 v: v3
1 parent 3df4e7e commit 3b7ae02

File tree

9 files changed

+154
-10
lines changed

9 files changed

+154
-10
lines changed

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22
refs/heads/master: 61b1875c16de39c166b0f4d54bba19f9c6777d1a
33
refs/heads/snap-stage1: e33de59e47c5076a89eadeb38f4934f58a3618a6
44
refs/heads/snap-stage3: 4a81779abd786ff22d71434c6d9a5917ea4cdfff
5-
refs/heads/try: e62ddf48988087d19934e1fdc6abb6de5f7a6a02
5+
refs/heads/try: b2cfb7ef8262ebe47514f016f59054ebcfe15d61
66
refs/tags/release-0.1: 1f5c5126e96c79d22cb7862f75304136e204f105

branches/try/mk/rt.mk

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ RUNTIME_CS_$(1) := \
5252
rt/rust_uv.cpp \
5353
rt/rust_uvtmp.cpp \
5454
rt/rust_log.cpp \
55+
rt/rust_port_selector.cpp \
5556
rt/circular_buffer.cpp \
5657
rt/isaac/randport.cpp \
5758
rt/rust_srv.cpp \
@@ -88,6 +89,7 @@ RUNTIME_HDR_$(1) := rt/globals.h \
8889
rt/rust_stack.h \
8990
rt/rust_task_list.h \
9091
rt/rust_log.h \
92+
rt/rust_port_selector.h \
9193
rt/circular_buffer.h \
9294
rt/util/array_list.h \
9395
rt/util/indexed_list.h \

branches/try/src/rt/rust_builtin.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,14 @@ port_recv(uintptr_t *dptr, rust_port *port,
593593
return;
594594
}
595595

596+
extern "C" CDECL void
597+
rust_port_select(rust_port **dptr, rust_port **ports,
598+
size_t n_ports, uintptr_t *yield) {
599+
rust_task *task = rust_task_thread::get_task();
600+
rust_port_selector *selector = task->get_port_selector();
601+
selector->select(task, dptr, ports, n_ports, yield);
602+
}
603+
596604
extern "C" CDECL void
597605
rust_set_exit_status(intptr_t code) {
598606
rust_task *task = rust_task_thread::get_task();

branches/try/src/rt/rust_port.cpp

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,34 @@ void rust_port::detach() {
3030

3131
void rust_port::send(void *sptr) {
3232
I(task->thread, !lock.lock_held_by_current_thread());
33-
scoped_lock with(lock);
33+
bool did_rendezvous = false;
34+
{
35+
scoped_lock with(lock);
36+
37+
buffer.enqueue(sptr);
3438

35-
buffer.enqueue(sptr);
39+
A(kernel, !buffer.is_empty(),
40+
"rust_chan::transmit with nothing to send.");
41+
42+
if (task->blocked_on(this)) {
43+
KLOG(kernel, comm, "dequeued in rendezvous_ptr");
44+
buffer.dequeue(task->rendezvous_ptr);
45+
task->rendezvous_ptr = 0;
46+
task->wakeup(this);
47+
did_rendezvous = true;
48+
}
49+
}
3650

37-
A(kernel, !buffer.is_empty(),
38-
"rust_chan::transmit with nothing to send.");
51+
if (!did_rendezvous) {
52+
// If the task wasn't waiting specifically on this port,
53+
// it may be waiting on a group of ports
3954

40-
if (task->blocked_on(this)) {
41-
KLOG(kernel, comm, "dequeued in rendezvous_ptr");
42-
buffer.dequeue(task->rendezvous_ptr);
43-
task->rendezvous_ptr = 0;
44-
task->wakeup(this);
55+
rust_port_selector *port_selector = task->get_port_selector();
56+
// This check is not definitive. The port selector will take a lock
57+
// and check again whether the task is still blocked.
58+
if (task->blocked_on(port_selector)) {
59+
port_selector->msg_sent_on(this);
60+
}
4561
}
4662
}
4763

branches/try/src/rt/rust_port.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#ifndef RUST_PORT_H
22
#define RUST_PORT_H
33

4+
#include "rust_internal.h"
5+
46
class rust_port : public kernel_owned<rust_port>, public rust_cond {
57
public:
68
RUST_REFCOUNTED(rust_port)
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
#include "rust_port.h"
2+
#include "rust_port_selector.h"
3+
4+
rust_port_selector::rust_port_selector()
5+
: ports(NULL), n_ports(0) {
6+
}
7+
8+
void
9+
rust_port_selector::select(rust_task *task, rust_port **dptr,
10+
rust_port **ports,
11+
size_t n_ports, uintptr_t *yield) {
12+
13+
I(task->thread, this->ports == NULL);
14+
I(task->thread, this->n_ports == 0);
15+
I(task->thread, dptr != NULL);
16+
I(task->thread, ports != NULL);
17+
I(task->thread, n_ports != 0);
18+
I(task->thread, yield != NULL);
19+
20+
*yield = false;
21+
size_t locks_taken = 0;
22+
bool found_msg = false;
23+
24+
// Take each port's lock as we iterate through them because
25+
// if none of them contain a usable message then we need to
26+
// block the task before any of them can try to send another
27+
// message.
28+
29+
for (size_t i = 0; i < n_ports; i++) {
30+
rust_port *port = ports[i];
31+
I(task->thread, port != NULL);
32+
33+
port->lock.lock();
34+
locks_taken++;
35+
36+
if (port->buffer.size() > 0) {
37+
*dptr = port;
38+
found_msg = true;
39+
break;
40+
}
41+
}
42+
43+
if (!found_msg) {
44+
this->ports = ports;
45+
this->n_ports = n_ports;
46+
I(task->thread, task->rendezvous_ptr == NULL);
47+
task->rendezvous_ptr = (uintptr_t*)dptr;
48+
*yield = true;
49+
task->block(this, "waiting for select rendezvous");
50+
}
51+
52+
for (size_t i = 0; i < locks_taken; i++) {
53+
rust_port *port = ports[i];
54+
port->lock.unlock();
55+
}
56+
}
57+
58+
void
59+
rust_port_selector::msg_sent_on(rust_port *port) {
60+
rust_task *task = port->task;
61+
62+
I(task->thread, !task->lock.lock_held_by_current_thread());
63+
I(task->thread, !port->lock.lock_held_by_current_thread());
64+
I(task->thread, !rendezvous_lock.lock_held_by_current_thread());
65+
66+
// Prevent two ports from trying to wake up the task
67+
// simultaneously
68+
scoped_lock with(rendezvous_lock);
69+
70+
if (task->blocked_on(this)) {
71+
for (size_t i = 0; i < n_ports; i++) {
72+
if (port == ports[i]) {
73+
// This was one of the ports we were waiting on
74+
ports = NULL;
75+
n_ports = 0;
76+
*task->rendezvous_ptr = (uintptr_t) port;
77+
task->rendezvous_ptr = NULL;
78+
task->wakeup(this);
79+
return;
80+
}
81+
}
82+
}
83+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#ifndef RUST_PORT_SELECTOR_H
2+
#define RUST_PORT_SELECTOR_H
3+
4+
#include "rust_internal.h"
5+
6+
struct rust_task;
7+
class rust_port;
8+
9+
class rust_port_selector : public rust_cond {
10+
private:
11+
rust_port **ports;
12+
size_t n_ports;
13+
lock_and_signal rendezvous_lock;
14+
15+
public:
16+
rust_port_selector();
17+
18+
void select(rust_task *task,
19+
rust_port **dptr,
20+
rust_port **ports,
21+
size_t n_ports,
22+
uintptr_t *yield);
23+
24+
void msg_sent_on(rust_port *port);
25+
};
26+
27+
#endif /* RUST_PORT_SELECTOR_H */

branches/try/src/rt/rust_task.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "rust_obstack.h"
1717
#include "boxed_region.h"
1818
#include "rust_stack.h"
19+
#include "rust_port_selector.h"
1920

2021
// Corresponds to the rust chan (currently _chan) type.
2122
struct chan_handle {
@@ -116,6 +117,8 @@ rust_task : public kernel_owned<rust_task>, rust_cond
116117
uintptr_t next_c_sp;
117118
uintptr_t next_rust_sp;
118119

120+
rust_port_selector port_selector;
121+
119122
// Called when the atomic refcount reaches zero
120123
void delete_this();
121124

@@ -206,6 +209,8 @@ rust_task : public kernel_owned<rust_task>, rust_cond
206209
void call_on_c_stack(void *args, void *fn_ptr);
207210
void call_on_rust_stack(void *args, void *fn_ptr);
208211
bool have_c_stack() { return c_stack != NULL; }
212+
213+
rust_port_selector *get_port_selector() { return &port_selector; }
209214
};
210215

211216
// This stuff is on the stack-switching fast path

branches/try/src/rt/rustrt.def.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ nano_time
1717
new_port
1818
new_task
1919
port_recv
20+
rust_port_select
2021
rand_free
2122
rand_new
2223
rand_next

0 commit comments

Comments
 (0)