@@ -28,6 +28,11 @@ type signal_end = pipes::chan_one<()>;
28
28
struct waitqueue { head : pipes:: port < signal_end > ;
29
29
tail: pipes:: chan<signal_end>; }
30
30
31
+ fn new_waitqueue ( ) -> waitqueue {
32
+ let ( block_tail, block_head) = pipes:: stream ( ) ;
33
+ waitqueue { head : block_head, tail : block_tail }
34
+ }
35
+
31
36
// Signals one live task from the queue.
32
37
#[ doc( hidden) ]
33
38
fn signal_waitqueue ( q : & waitqueue ) -> bool {
@@ -70,18 +75,15 @@ enum sem<Q: send> = Exclusive<sem_inner<Q>>;
70
75
71
76
#[ doc( hidden) ]
72
77
fn new_sem<Q : send > ( count: int, +q: Q ) -> sem < Q > {
73
- let ( wait_tail , wait_head ) = pipes:: stream ( ) ;
74
78
sem( exclusive ( sem_inner {
75
- mut count : count,
76
- waiters : waitqueue { head : wait_head, tail : wait_tail } ,
77
- blocked : q } ) )
79
+ mut count : count, waiters : new_waitqueue( ) , blocked : q } ) )
78
80
}
79
81
#[ doc( hidden) ]
80
- fn new_sem_and_signal ( count : int , num_condvars : uint ) -> sem < ~[ waitqueue ] > {
81
- let mut queues = ~[ ] ;
82
+ fn new_sem_and_signal ( count : int , num_condvars : uint )
83
+ -> sem < ~[ mut waitqueue ] > {
84
+ let mut queues = ~[ mut] ;
82
85
for num_condvars . times {
83
- let ( block_tail, block_head) = pipes:: stream( ) ;
84
- vec:: push( queues, waitqueue { head : block_head, tail : block_tail } ) ;
86
+ vec : : push( queues, new_waitqueue( ) ) ;
85
87
}
86
88
new_sem( count, queues)
87
89
}
@@ -136,7 +138,7 @@ impl &sem<()> {
136
138
}
137
139
}
138
140
#[ doc( hidden) ]
139
- impl & sem < ~[ waitqueue ] > {
141
+ impl & sem < ~[ mut waitqueue ] > {
140
142
fn access < U > ( blk : fn ( ) -> U ) -> U {
141
143
let mut release = none;
142
144
unsafe {
@@ -158,13 +160,13 @@ struct sem_release {
158
160
}
159
161
#[ doc( hidden) ]
160
162
struct sem_and_signal_release {
161
- sem : & sem < ~[ waitqueue ] > ;
162
- new ( sem : & sem < ~[ waitqueue ] > ) { self . sem = sem; }
163
+ sem : & sem < ~[ mut waitqueue ] > ;
164
+ new ( sem : & sem < ~[ mut waitqueue ] > ) { self . sem = sem; }
163
165
drop { self. sem . release ( ) ; }
164
166
}
165
167
166
168
/// A mechanism for atomic-unlock-and-deschedule blocking and signalling.
167
- struct condvar { priv sem: & sem<~[ waitqueue] >; drop { } }
169
+ struct condvar { priv sem: & sem < ~[ mut waitqueue ] > ; drop { } }
168
170
169
171
impl & condvar {
170
172
/**
@@ -232,8 +234,8 @@ impl &condvar {
232
234
// mutex during unwinding. As long as the wrapper (mutex, etc) is
233
235
// bounded in when it gets released, this shouldn't hang forever.
234
236
struct sem_and_signal_reacquire {
235
- sem : & sem < ~[ waitqueue ] > ;
236
- new ( sem : & sem < ~[ waitqueue ] > ) { self . sem = sem; }
237
+ sem : & sem < ~[ mut waitqueue ] > ;
238
+ new ( sem : & sem < ~[ mut waitqueue ] > ) { self . sem = sem; }
237
239
drop unsafe {
238
240
// Needs to succeed, instead of itself dying.
239
241
do task:: unkillable {
@@ -268,19 +270,23 @@ impl &condvar {
268
270
/// As broadcast, but with a specified condvar_id. See wait_on.
269
271
fn broadcast_on ( condvar_id : uint ) -> uint {
270
272
let mut out_of_bounds = none;
271
- let mut result = 0 ;
273
+ let mut queue = none ;
272
274
unsafe {
273
275
do ( * * self . sem ) . with |state| {
274
276
if condvar_id < vec:: len ( state. blocked ) {
275
- // FIXME(#3145) fix :broadcast_heavy
276
- result = broadcast_waitqueue ( & state. blocked [ condvar_id] )
277
+ // To avoid :broadcast_heavy, we make a new waitqueue,
278
+ // swap it out with the old one, and broadcast on the
279
+ // old one outside of the little-lock.
280
+ queue = some ( util:: replace ( & mut state. blocked [ condvar_id] ,
281
+ new_waitqueue ( ) ) ) ;
277
282
} else {
278
283
out_of_bounds = some ( vec:: len ( state. blocked ) ) ;
279
284
}
280
285
}
281
286
}
282
287
do check_cvar_bounds ( out_of_bounds, condvar_id, "cond.signal_on()" ) {
283
- result
288
+ let queue = option:: swap_unwrap ( & mut queue) ;
289
+ broadcast_waitqueue ( & queue)
284
290
}
285
291
}
286
292
}
@@ -303,7 +309,7 @@ fn check_cvar_bounds<U>(out_of_bounds: option<uint>, id: uint, act: &str,
303
309
}
304
310
305
311
#[ doc( hidden) ]
306
- impl & sem < ~[ waitqueue ] > {
312
+ impl & sem < ~[ mut waitqueue ] > {
307
313
// The only other place that condvars get built is rwlock_write_mode.
308
314
fn access_cond < U > ( blk : fn ( c : & condvar ) -> U ) -> U {
309
315
do self . access { blk ( & condvar { sem : self } ) }
@@ -354,7 +360,7 @@ impl &semaphore {
354
360
* A task which fails while holding a mutex will unlock the mutex as it
355
361
* unwinds.
356
362
*/
357
- struct mutex { priv sem: sem < ~[ waitqueue ] > ; }
363
+ struct mutex { priv sem: sem < ~[ mut waitqueue ] > ; }
358
364
359
365
/// Create a new mutex, with one associated condvar.
360
366
fn mutex ( ) -> mutex { mutex_with_condvars ( 1 ) }
@@ -402,7 +408,7 @@ struct rwlock_inner {
402
408
*/
403
409
struct rwlock {
404
410
/* priv */ order_lock : semaphore ;
405
- /* priv */ access_lock: sem < ~[ waitqueue ] > ;
411
+ /* priv */ access_lock: sem<~[ mut waitqueue] >;
406
412
/* priv */ state: Exclusive <rwlock_inner>;
407
413
}
408
414
0 commit comments