Skip to content

Commit 80205fc

Browse files
author
Eric Holk
committed
---
yaml --- r: 3705 b: refs/heads/master c: 8acadb1 h: refs/heads/master i: 3703: 42affa0 v: v3
1 parent 81f760a commit 80205fc

File tree

11 files changed

+91
-36
lines changed

11 files changed

+91
-36
lines changed

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
---
2-
refs/heads/master: dcd2563a3a7662d03ab33b67c92652e6e24c5af1
2+
refs/heads/master: 8acadb17c2d679291aa94e94af8cc96513cab830

trunk/src/rt/memory_region.cpp

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,18 @@ memory_region::memory_region(memory_region *parent) :
1919
// Nop.
2020
}
2121

22+
void memory_region::add_alloc() {
23+
//_live_allocations++;
24+
sync::increment(_live_allocations);
25+
}
26+
27+
void memory_region::dec_alloc() {
28+
//_live_allocations--;
29+
sync::decrement(_live_allocations);
30+
}
31+
2232
void memory_region::free(void *mem) {
23-
// printf("free: ptr 0x%" PRIxPTR"\n", (uintptr_t) mem);
33+
// printf("free: ptr 0x%" PRIxPTR" region=%p\n", (uintptr_t) mem, this);
2434
if (!mem) { return; }
2535
if (_synchronized) { _lock.lock(); }
2636
#ifdef TRACK_ALLOCATIONS
@@ -33,7 +43,7 @@ void memory_region::free(void *mem) {
3343
if (_live_allocations < 1) {
3444
_srv->fatal("live_allocs < 1", __FILE__, __LINE__, "");
3545
}
36-
_live_allocations--;
46+
dec_alloc();
3747
_srv->free(mem);
3848
if (_synchronized) { _lock.unlock(); }
3949
}
@@ -42,7 +52,7 @@ void *
4252
memory_region::realloc(void *mem, size_t size) {
4353
if (_synchronized) { _lock.lock(); }
4454
if (!mem) {
45-
_live_allocations++;
55+
add_alloc();
4656
}
4757
void *newMem = _srv->realloc(mem, size);
4858
#ifdef TRACK_ALLOCATIONS
@@ -59,20 +69,21 @@ memory_region::realloc(void *mem, size_t size) {
5969
void *
6070
memory_region::malloc(size_t size) {
6171
if (_synchronized) { _lock.lock(); }
62-
_live_allocations++;
72+
add_alloc();
6373
void *mem = _srv->malloc(size);
6474
#ifdef TRACK_ALLOCATIONS
6575
_allocation_list.append(mem);
6676
#endif
67-
// printf("malloc: ptr 0x%" PRIxPTR "\n", (uintptr_t) mem);
77+
// printf("malloc: ptr 0x%" PRIxPTR " region=%p\n",
78+
// (uintptr_t) mem, this);
6879
if (_synchronized) { _lock.unlock(); }
6980
return mem;
7081
}
7182

7283
void *
7384
memory_region::calloc(size_t size) {
7485
if (_synchronized) { _lock.lock(); }
75-
_live_allocations++;
86+
add_alloc();
7687
void *mem = _srv->malloc(size);
7788
memset(mem, 0, size);
7889
#ifdef TRACK_ALLOCATIONS

trunk/src/rt/memory_region.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ class memory_region {
2222
const bool _detailed_leaks;
2323
const bool _synchronized;
2424
lock_and_signal _lock;
25+
26+
void add_alloc();
27+
void dec_alloc();
2528
public:
2629
memory_region(rust_srv *srv, bool synchronized);
2730
memory_region(memory_region *parent);

trunk/src/rt/rust_chan.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66
*/
77
rust_chan::rust_chan(rust_task *task,
88
maybe_proxy<rust_port> *port,
9-
size_t unit_sz) :
10-
ref_count(1),
11-
task(task),
12-
port(port),
13-
buffer(task, unit_sz) {
9+
size_t unit_sz)
10+
: ref_count(1),
11+
kernel(task->kernel),
12+
task(task),
13+
port(port),
14+
buffer(task, unit_sz) {
1415
++task->ref_count;
1516
if (port) {
1617
associate(port);
@@ -87,6 +88,7 @@ void rust_chan::send(void *sptr) {
8788
buffer.dequeue(NULL);
8889
} else {
8990
rust_port *target_port = port->referent();
91+
scoped_lock right(target_port->lock);
9092
if (target_port->task->blocked_on(target_port)) {
9193
DLOG(sched, comm, "dequeued in rendezvous_ptr");
9294
buffer.dequeue(target_port->task->rendezvous_ptr);
@@ -114,7 +116,7 @@ rust_chan *rust_chan::clone(maybe_proxy<rust_task> *target) {
114116
port = proxy;
115117
target_task = target->as_proxy()->handle()->referent();
116118
}
117-
return new (target_task) rust_chan(target_task, port, unit_sz);
119+
return new (target_task->kernel) rust_chan(target_task, port, unit_sz);
118120
}
119121

120122
/**

trunk/src/rt/rust_chan.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
#ifndef RUST_CHAN_H
22
#define RUST_CHAN_H
33

4-
class rust_chan : public task_owned<rust_chan>,
4+
class rust_chan : public kernel_owned<rust_chan>,
55
public rust_cond {
66
public:
77
RUST_REFCOUNTED_WITH_DTOR(rust_chan, destroy())
88
rust_chan(rust_task *task, maybe_proxy<rust_port> *port, size_t unit_sz);
99

1010
~rust_chan();
1111

12+
rust_kernel *kernel;
1213
rust_task *task;
1314
maybe_proxy<rust_port> *port;
1415
size_t idx;

trunk/src/rt/rust_port.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
#include "rust_internal.h"
22
#include "rust_port.h"
33

4-
rust_port::rust_port(rust_task *task, size_t unit_sz) :
5-
maybe_proxy<rust_port>(this), task(task),
6-
unit_sz(unit_sz), writers(task), chans(task) {
4+
rust_port::rust_port(rust_task *task, size_t unit_sz)
5+
: maybe_proxy<rust_port>(this), kernel(task->kernel), task(task),
6+
unit_sz(unit_sz), writers(task), chans(task) {
77

88
LOG(task, comm,
99
"new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%"
1010
PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this);
1111

1212
// Allocate a remote channel, for remote channel data.
13-
remote_channel = new (task) rust_chan(task, this, unit_sz);
13+
remote_channel = new (task->kernel) rust_chan(task, this, unit_sz);
1414
}
1515

1616
rust_port::~rust_port() {

trunk/src/rt/rust_port.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22
#define RUST_PORT_H
33

44
class rust_port : public maybe_proxy<rust_port>,
5-
public task_owned<rust_port> {
5+
public kernel_owned<rust_port> {
66

77
public:
8+
rust_kernel *kernel;
89
rust_task *task;
910
size_t unit_sz;
1011
ptr_vec<rust_token> writers;
@@ -13,6 +14,8 @@ class rust_port : public maybe_proxy<rust_port>,
1314
// Data sent to this port from remote tasks is buffered in this channel.
1415
rust_chan *remote_channel;
1516

17+
lock_and_signal lock;
18+
1619
rust_port(rust_task *task, size_t unit_sz);
1720
~rust_port();
1821
void log_state();

trunk/src/rt/rust_task.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,7 @@ rust_task::transition(rust_task_list *src, rust_task_list *dst) {
401401

402402
void
403403
rust_task::block(rust_cond *on, const char* name) {
404+
scoped_lock with(lock);
404405
LOG(this, task, "Blocking on 0x%" PRIxPTR ", cond: 0x%" PRIxPTR,
405406
(uintptr_t) on, (uintptr_t) cond);
406407
A(sched, cond == NULL, "Cannot block an already blocked task.");
@@ -413,6 +414,7 @@ rust_task::block(rust_cond *on, const char* name) {
413414

414415
void
415416
rust_task::wakeup(rust_cond *from) {
417+
scoped_lock with(lock);
416418
A(sched, cond != NULL, "Cannot wake up unblocked task.");
417419
LOG(this, task, "Blocked on 0x%" PRIxPTR " woken up on 0x%" PRIxPTR,
418420
(uintptr_t) cond, (uintptr_t) from);
@@ -430,6 +432,7 @@ rust_task::wakeup(rust_cond *from) {
430432

431433
void
432434
rust_task::die() {
435+
scoped_lock with(lock);
433436
transition(&sched->running_tasks, &sched->dead_tasks);
434437
}
435438

trunk/src/rt/rust_task.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ rust_task : public maybe_proxy<rust_task>,
8989

9090
wakeup_callback *_on_wakeup;
9191

92+
lock_and_signal lock;
93+
9294
// Only a pointer to 'name' is kept, so it must live as long as this task.
9395
rust_task(rust_scheduler *sched,
9496
rust_task_list *state,

trunk/src/rt/rust_upcall.cpp

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,9 @@ upcall_new_port(rust_task *task, size_t unit_sz) {
9292
LOG_UPCALL_ENTRY(task);
9393
LOG(task, comm, "upcall_new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)",
9494
(uintptr_t) task, task->name, unit_sz);
95-
return new (task) rust_port(task, unit_sz);
95+
// take a reference on behalf of the port
96+
task->ref();
97+
return new (task->kernel) rust_port(task, unit_sz);
9698
}
9799

98100
extern "C" CDECL void
@@ -101,6 +103,9 @@ upcall_del_port(rust_task *task, rust_port *port) {
101103
LOG(task, comm, "upcall del_port(0x%" PRIxPTR ")", (uintptr_t) port);
102104
I(task->sched, !port->ref_count);
103105
delete port;
106+
107+
// FIXME: We shouldn't ever directly manipulate the ref count.
108+
--task->ref_count;
104109
}
105110

106111
/**
@@ -114,7 +119,7 @@ upcall_new_chan(rust_task *task, rust_port *port) {
114119
"task=0x%" PRIxPTR " (%s), port=0x%" PRIxPTR ")",
115120
(uintptr_t) task, task->name, port);
116121
I(sched, port);
117-
return new (task) rust_chan(task, port, port->unit_sz);
122+
return new (task->kernel) rust_chan(task, port, port->unit_sz);
118123
}
119124

120125
/**
@@ -138,6 +143,8 @@ extern "C" CDECL
138143
void upcall_del_chan(rust_task *task, rust_chan *chan) {
139144
LOG_UPCALL_ENTRY(task);
140145

146+
I(task->sched, chan->task == task);
147+
141148
LOG(task, comm, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan);
142149
chan->destroy();
143150
}
@@ -183,25 +190,27 @@ upcall_send(rust_task *task, rust_chan *chan, void *sptr) {
183190

184191
extern "C" CDECL void
185192
upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
186-
LOG_UPCALL_ENTRY(task);
193+
{
194+
scoped_lock with(port->lock);
195+
LOG_UPCALL_ENTRY(task);
187196

188-
LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR
189-
", size: 0x%" PRIxPTR ", chan_no: %d",
190-
(uintptr_t) port, (uintptr_t) dptr, port->unit_sz,
191-
port->chans.length());
197+
LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR
198+
", size: 0x%" PRIxPTR ", chan_no: %d",
199+
(uintptr_t) port, (uintptr_t) dptr, port->unit_sz,
200+
port->chans.length());
192201

193-
if (port->receive(dptr)) {
194-
return;
195-
}
202+
if (port->receive(dptr)) {
203+
return;
204+
}
196205

197-
// No data was buffered on any incoming channel, so block this task on the
198-
// port. Remember the rendezvous location so that any sender task can
199-
// write to it before waking up this task.
206+
// No data was buffered on any incoming channel, so block this task on
207+
// the port. Remember the rendezvous location so that any sender task
208+
// can write to it before waking up this task.
200209

201-
LOG(task, comm, "<=== waiting for rendezvous data ===");
202-
task->rendezvous_ptr = dptr;
203-
task->block(port, "waiting for rendezvous data");
204-
210+
LOG(task, comm, "<=== waiting for rendezvous data ===");
211+
task->rendezvous_ptr = dptr;
212+
task->block(port, "waiting for rendezvous data");
213+
}
205214
task->yield(3);
206215
}
207216

trunk/src/rt/sync/sync.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
// -*- c++-mode -*-
12
#ifndef SYNC_H
23
#define SYNC_H
34

@@ -10,6 +11,26 @@ class sync {
1011
T oldValue, T newValue) {
1112
return __sync_bool_compare_and_swap(address, oldValue, newValue);
1213
}
14+
15+
template <class T>
16+
static T increment(T *address) {
17+
return __sync_add_and_fetch(address, 1);
18+
}
19+
20+
template <class T>
21+
static T decrement(T *address) {
22+
return __sync_sub_and_fetch(address, 1);
23+
}
24+
25+
template <class T>
26+
static T increment(T &address) {
27+
return __sync_add_and_fetch(&address, 1);
28+
}
29+
30+
template <class T>
31+
static T decrement(T &address) {
32+
return __sync_sub_and_fetch(&address, 1);
33+
}
1334
};
1435

1536
/**

0 commit comments

Comments
 (0)