Skip to content

Commit 62bc6b5

Browse files
author
Eric Holk
committed
Per-thread scheduling. Closes #682.
Tasks are spawned on a random thread. Currently they stay there, but we should add task migration and load balancing in the future. This should drammatically improve our task performance benchmarks.
1 parent b51f5c3 commit 62bc6b5

14 files changed

+239
-185
lines changed

src/rt/circular_buffer.cpp

Lines changed: 42 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -5,34 +5,33 @@
55
#include "rust_internal.h"
66

77
circular_buffer::circular_buffer(rust_kernel *kernel, size_t unit_sz) :
8-
sched(kernel->sched),
98
kernel(kernel),
109
unit_sz(unit_sz),
1110
_buffer_sz(initial_size()),
1211
_next(0),
1312
_unread(0),
1413
_buffer((uint8_t *)kernel->malloc(_buffer_sz, "circular_buffer")) {
1514

16-
A(sched, unit_sz, "Unit size must be larger than zero.");
15+
// A(sched, unit_sz, "Unit size must be larger than zero.");
1716

18-
DLOG(sched, mem, "new circular_buffer(buffer_sz=%d, unread=%d)"
19-
"-> circular_buffer=0x%" PRIxPTR,
20-
_buffer_sz, _unread, this);
17+
// DLOG(sched, mem, "new circular_buffer(buffer_sz=%d, unread=%d)"
18+
// "-> circular_buffer=0x%" PRIxPTR,
19+
// _buffer_sz, _unread, this);
2120

22-
A(sched, _buffer, "Failed to allocate buffer.");
21+
// A(sched, _buffer, "Failed to allocate buffer.");
2322
}
2423

2524
circular_buffer::~circular_buffer() {
26-
DLOG(sched, mem, "~circular_buffer 0x%" PRIxPTR, this);
27-
I(sched, _buffer);
28-
W(sched, _unread == 0,
29-
"freeing circular_buffer with %d unread bytes", _unread);
25+
// DLOG(sched, mem, "~circular_buffer 0x%" PRIxPTR, this);
26+
// I(sched, _buffer);
27+
// W(sched, _unread == 0,
28+
// "freeing circular_buffer with %d unread bytes", _unread);
3029
kernel->free(_buffer);
3130
}
3231

3332
size_t
3433
circular_buffer::initial_size() {
35-
I(sched, unit_sz > 0);
34+
// I(sched, unit_sz > 0);
3635
return INITIAL_CIRCULAR_BUFFER_SIZE_IN_UNITS * unit_sz;
3736
}
3837

@@ -41,8 +40,8 @@ circular_buffer::initial_size() {
4140
*/
4241
void
4342
circular_buffer::transfer(void *dst) {
44-
I(sched, dst);
45-
I(sched, _unread <= _buffer_sz);
43+
// I(sched, dst);
44+
// I(sched, _unread <= _buffer_sz);
4645

4746
uint8_t *ptr = (uint8_t *) dst;
4847

@@ -54,13 +53,13 @@ circular_buffer::transfer(void *dst) {
5453
} else {
5554
head_sz = _buffer_sz - _next;
5655
}
57-
I(sched, _next + head_sz <= _buffer_sz);
56+
// I(sched, _next + head_sz <= _buffer_sz);
5857
memcpy(ptr, _buffer + _next, head_sz);
5958

6059
// Then copy any other items from the beginning of the buffer
61-
I(sched, _unread >= head_sz);
60+
// I(sched, _unread >= head_sz);
6261
size_t tail_sz = _unread - head_sz;
63-
I(sched, head_sz + tail_sz <= _buffer_sz);
62+
// I(sched, head_sz + tail_sz <= _buffer_sz);
6463
memcpy(ptr + head_sz, _buffer, tail_sz);
6564
}
6665

@@ -70,37 +69,37 @@ circular_buffer::transfer(void *dst) {
7069
*/
7170
void
7271
circular_buffer::enqueue(void *src) {
73-
I(sched, src);
74-
I(sched, _unread <= _buffer_sz);
75-
I(sched, _buffer);
72+
// I(sched, src);
73+
// I(sched, _unread <= _buffer_sz);
74+
// I(sched, _buffer);
7675

7776
// Grow if necessary.
7877
if (_unread == _buffer_sz) {
7978
grow();
8079
}
8180

82-
DLOG(sched, mem, "circular_buffer enqueue "
83-
"unread: %d, next: %d, buffer_sz: %d, unit_sz: %d",
84-
_unread, _next, _buffer_sz, unit_sz);
81+
// DLOG(sched, mem, "circular_buffer enqueue "
82+
// "unread: %d, next: %d, buffer_sz: %d, unit_sz: %d",
83+
// _unread, _next, _buffer_sz, unit_sz);
8584

86-
I(sched, _unread < _buffer_sz);
87-
I(sched, _unread + unit_sz <= _buffer_sz);
85+
// I(sched, _unread < _buffer_sz);
86+
// I(sched, _unread + unit_sz <= _buffer_sz);
8887

8988
// Copy data
9089
size_t dst_idx = _next + _unread;
9190
I(sched, dst_idx >= _buffer_sz || dst_idx + unit_sz <= _buffer_sz);
9291
if (dst_idx >= _buffer_sz) {
9392
dst_idx -= _buffer_sz;
9493

95-
I(sched, _next >= unit_sz);
96-
I(sched, dst_idx <= _next - unit_sz);
94+
// I(sched, _next >= unit_sz);
95+
// I(sched, dst_idx <= _next - unit_sz);
9796
}
9897

99-
I(sched, dst_idx + unit_sz <= _buffer_sz);
98+
// I(sched, dst_idx + unit_sz <= _buffer_sz);
10099
memcpy(&_buffer[dst_idx], src, unit_sz);
101100
_unread += unit_sz;
102101

103-
DLOG(sched, mem, "circular_buffer pushed data at index: %d", dst_idx);
102+
// DLOG(sched, mem, "circular_buffer pushed data at index: %d", dst_idx);
104103
}
105104

106105
/**
@@ -110,17 +109,17 @@ circular_buffer::enqueue(void *src) {
110109
*/
111110
void
112111
circular_buffer::dequeue(void *dst) {
113-
I(sched, unit_sz > 0);
114-
I(sched, _unread >= unit_sz);
115-
I(sched, _unread <= _buffer_sz);
116-
I(sched, _buffer);
112+
// I(sched, unit_sz > 0);
113+
// I(sched, _unread >= unit_sz);
114+
// I(sched, _unread <= _buffer_sz);
115+
// I(sched, _buffer);
117116

118-
DLOG(sched, mem,
119-
"circular_buffer dequeue "
120-
"unread: %d, next: %d, buffer_sz: %d, unit_sz: %d",
121-
_unread, _next, _buffer_sz, unit_sz);
117+
// DLOG(sched, mem,
118+
// "circular_buffer dequeue "
119+
// "unread: %d, next: %d, buffer_sz: %d, unit_sz: %d",
120+
// _unread, _next, _buffer_sz, unit_sz);
122121

123-
I(sched, _next + unit_sz <= _buffer_sz);
122+
// I(sched, _next + unit_sz <= _buffer_sz);
124123
if (dst != NULL) {
125124
memcpy(dst, &_buffer[_next], unit_sz);
126125
}
@@ -140,8 +139,9 @@ circular_buffer::dequeue(void *dst) {
140139
void
141140
circular_buffer::grow() {
142141
size_t new_buffer_sz = _buffer_sz * 2;
143-
I(sched, new_buffer_sz <= MAX_CIRCULAR_BUFFER_SIZE);
144-
DLOG(sched, mem, "circular_buffer is growing to %d bytes", new_buffer_sz);
142+
// I(sched, new_buffer_sz <= MAX_CIRCULAR_BUFFER_SIZE);
143+
// DLOG(sched, mem, "circular_buffer is growing to %d bytes",
144+
// new_buffer_sz);
145145
void *new_buffer = kernel->malloc(new_buffer_sz,
146146
"new circular_buffer (grow)");
147147
transfer(new_buffer);
@@ -154,9 +154,9 @@ circular_buffer::grow() {
154154
void
155155
circular_buffer::shrink() {
156156
size_t new_buffer_sz = _buffer_sz / 2;
157-
I(sched, initial_size() <= new_buffer_sz);
158-
DLOG(sched, mem, "circular_buffer is shrinking to %d bytes",
159-
new_buffer_sz);
157+
// I(sched, initial_size() <= new_buffer_sz);
158+
// DLOG(sched, mem, "circular_buffer is shrinking to %d bytes",
159+
// new_buffer_sz);
160160
void *new_buffer = kernel->malloc(new_buffer_sz,
161161
"new circular_buffer (shrink)");
162162
transfer(new_buffer);

src/rt/rust.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,10 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) {
140140

141141
update_log_settings(crate_map, getenv("RUST_LOG"));
142142
enable_claims(getenv("CHECK_CLAIMS"));
143+
int num_threads = get_num_threads();
143144

144145
rust_srv *srv = new rust_srv();
145-
rust_kernel *kernel = new rust_kernel(srv);
146+
rust_kernel *kernel = new rust_kernel(srv, num_threads);
146147
kernel->start();
147148
rust_task *root_task = kernel->create_task(NULL, "main");
148149
rust_scheduler *sched = root_task->sched;
@@ -158,11 +159,9 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) {
158159

159160
root_task->start(main_fn, (uintptr_t)args->args);
160161

161-
int num_threads = get_num_threads();
162-
163162
DLOG(sched, dom, "Using %d worker threads.", num_threads);
164163

165-
int ret = kernel->start_task_threads(num_threads);
164+
int ret = kernel->start_task_threads();
166165
delete args;
167166
delete kernel;
168167
delete srv;

src/rt/rust_chan.cpp

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,17 @@ rust_chan::rust_chan(rust_kernel *kernel, maybe_proxy<rust_port> *port,
1313
if (port) {
1414
associate(port);
1515
}
16-
DLOG(kernel->sched, comm, "new rust_chan(task=0x%" PRIxPTR
17-
", port=0x%" PRIxPTR ") -> chan=0x%" PRIxPTR,
18-
(uintptr_t) task, (uintptr_t) port, (uintptr_t) this);
16+
// DLOG(task->sched, comm, "new rust_chan(task=0x%" PRIxPTR
17+
// ", port=0x%" PRIxPTR ") -> chan=0x%" PRIxPTR,
18+
// (uintptr_t) task, (uintptr_t) port, (uintptr_t) this);
1919
}
2020

2121
rust_chan::~rust_chan() {
22-
DLOG(kernel->sched, comm, "del rust_chan(task=0x%" PRIxPTR ")",
23-
(uintptr_t) this);
22+
// DLOG(kernel->sched, comm, "del rust_chan(task=0x%" PRIxPTR ")",
23+
// (uintptr_t) this);
2424

25-
A(kernel->sched, is_associated() == false,
26-
"Channel must be disassociated before being freed.");
25+
// A(kernel->sched, is_associated() == false,
26+
// "Channel must be disassociated before being freed.");
2727
}
2828

2929
/**
@@ -33,9 +33,9 @@ void rust_chan::associate(maybe_proxy<rust_port> *port) {
3333
this->port = port;
3434
if (port->is_proxy() == false) {
3535
scoped_lock with(port->referent()->lock);
36-
DLOG(kernel->sched, task,
37-
"associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR,
38-
this, port);
36+
// DLOG(kernel->sched, task,
37+
// "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR,
38+
// this, port);
3939
++this->ref_count;
4040
this->task = port->referent()->task;
4141
this->task->ref();
@@ -51,14 +51,14 @@ bool rust_chan::is_associated() {
5151
* Unlink this channel from its associated port.
5252
*/
5353
void rust_chan::disassociate() {
54-
A(kernel->sched, is_associated(),
55-
"Channel must be associated with a port.");
54+
// A(kernel->sched, is_associated(),
55+
// "Channel must be associated with a port.");
5656

5757
if (port->is_proxy() == false) {
5858
scoped_lock with(port->referent()->lock);
59-
DLOG(kernel->sched, task,
60-
"disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR,
61-
this, port->referent());
59+
// DLOG(kernel->sched, task,
60+
// "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR,
61+
// this, port->referent());
6262
--this->ref_count;
6363
--this->task->ref_count;
6464
this->task = NULL;
@@ -73,8 +73,8 @@ void rust_chan::disassociate() {
7373
* Attempt to send data to the associated port.
7474
*/
7575
void rust_chan::send(void *sptr) {
76-
rust_scheduler *sched = kernel->sched;
77-
I(sched, !port->is_proxy());
76+
// rust_scheduler *sched = kernel->sched;
77+
// I(sched, !port->is_proxy());
7878

7979
rust_port *target_port = port->referent();
8080
// TODO: We can probably avoid this lock by using atomic operations in
@@ -84,21 +84,21 @@ void rust_chan::send(void *sptr) {
8484
buffer.enqueue(sptr);
8585

8686
if (!is_associated()) {
87-
W(sched, is_associated(),
88-
"rust_chan::transmit with no associated port.");
87+
// W(sched, is_associated(),
88+
// "rust_chan::transmit with no associated port.");
8989
return;
9090
}
9191

92-
A(sched, !buffer.is_empty(),
93-
"rust_chan::transmit with nothing to send.");
92+
// A(sched, !buffer.is_empty(),
93+
// "rust_chan::transmit with nothing to send.");
9494

9595
if (port->is_proxy()) {
9696
data_message::send(buffer.peek(), buffer.unit_sz, "send data",
9797
task->get_handle(), port->as_proxy()->handle());
9898
buffer.dequeue(NULL);
9999
} else {
100100
if (target_port->task->blocked_on(target_port)) {
101-
DLOG(sched, comm, "dequeued in rendezvous_ptr");
101+
// DLOG(sched, comm, "dequeued in rendezvous_ptr");
102102
buffer.dequeue(target_port->task->rendezvous_ptr);
103103
target_port->task->rendezvous_ptr = 0;
104104
target_port->task->wakeup(target_port);
@@ -120,7 +120,7 @@ rust_chan *rust_chan::clone(maybe_proxy<rust_task> *target) {
120120
rust_handle<rust_port> *handle =
121121
task->sched->kernel->get_port_handle(port->as_referent());
122122
maybe_proxy<rust_port> *proxy = new rust_proxy<rust_port> (handle);
123-
DLOG(kernel->sched, mem, "new proxy: " PTR, proxy);
123+
DLOG(task->sched, mem, "new proxy: " PTR, proxy);
124124
port = proxy;
125125
target_task = target->as_proxy()->handle()->referent();
126126
}
@@ -133,8 +133,8 @@ rust_chan *rust_chan::clone(maybe_proxy<rust_task> *target) {
133133
* appear to be live, causing modify-after-free errors.
134134
*/
135135
void rust_chan::destroy() {
136-
A(kernel->sched, ref_count == 0,
137-
"Channel's ref count should be zero.");
136+
// A(kernel->sched, ref_count == 0,
137+
// "Channel's ref count should be zero.");
138138

139139
if (is_associated()) {
140140
if (port->is_proxy()) {

0 commit comments

Comments
 (0)