@@ -107,9 +107,20 @@ fn compare_and_swap(address: &mut int, oldval: int, newval: int) -> bool {
107
107
* Shared state & exclusive ARC
108
108
****************************************************************************/
109
109
110
+ struct UnwrapProtoInner {
111
+ contents : Option < ( comm:: ChanOne < ( ) > , comm:: PortOne < bool > ) > ,
112
+ }
113
+
114
+ // An unwrapper uses this protocol to communicate with the "other" task that
115
+ // drops the last refcount on an arc. Unfortunately this can't be a proper
116
+ // pipe protocol because the unwrapper has to access both stages at once.
117
+ type UnwrapProto = ~UnwrapProtoInner ;
118
+
110
119
struct ArcData < T > {
111
120
mut count : libc:: intptr_t ,
112
- // FIXME(#3224) should be able to make this non-option to save memory
121
+ mut unwrapper : int , // either a UnwrapProto or 0
122
+ // FIXME(#3224) should be able to make this non-option to save memory, and
123
+ // in unwrap() use "let ~ArcData { data: result, _ } = thing" to unwrap it
113
124
mut data : Option < T > ,
114
125
}
115
126
@@ -120,13 +131,37 @@ struct ArcDestruct<T> {
120
131
impl < T > Drop for ArcDestruct < T > {
121
132
fn finalize ( & self ) {
122
133
unsafe {
134
+ if self . data . is_null ( ) {
135
+ return ; // Happens when destructing an unwrapper's handle.
136
+ }
123
137
do task:: unkillable {
124
138
let data : ~ArcData <T > = cast:: reinterpret_cast ( & self . data ) ;
125
139
let new_count =
126
140
intrinsics:: atomic_xsub ( & mut data. count , 1 ) - 1 ;
127
141
assert new_count >= 0 ;
128
142
if new_count == 0 {
129
- // drop glue takes over.
143
+ // Were we really last, or should we hand off to an
144
+ // unwrapper? It's safe to not xchg because the unwrapper
145
+ // will set the unwrap lock *before* dropping his/her
146
+ // reference. In effect, being here means we're the only
147
+ // *awake* task with the data.
148
+ if data. unwrapper != 0 {
149
+ let mut p: UnwrapProto =
150
+ cast:: reinterpret_cast ( & data. unwrapper ) ;
151
+ let ( message, response) =
152
+ option:: swap_unwrap ( & mut p. contents ) ;
153
+ // Send 'ready' and wait for a response.
154
+ comm:: send_one ( message, ( ) ) ;
155
+ // Unkillable wait. Message guaranteed to come.
156
+ if comm:: recv_one ( response) {
157
+ // Other task got the data.
158
+ cast:: forget ( data) ;
159
+ } else {
160
+ // Other task was killed. drop glue takes over.
161
+ }
162
+ } else {
163
+ // drop glue takes over.
164
+ }
130
165
} else {
131
166
cast:: forget ( data) ;
132
167
}
@@ -141,6 +176,79 @@ fn ArcDestruct<T>(data: *libc::c_void) -> ArcDestruct<T> {
141
176
}
142
177
}
143
178
179
+ pub unsafe fn unwrap_shared_mutable_state < T : Owned > ( rc : SharedMutableState < T > )
180
+ -> T {
181
+ struct DeathThroes < T > {
182
+ mut ptr : Option < ~ArcData < T > > ,
183
+ mut response : Option < comm:: ChanOne < bool > > ,
184
+ }
185
+
186
+ impl < T > Drop for DeathThroes < T > {
187
+ fn finalize ( & self ) {
188
+ unsafe {
189
+ let response = option:: swap_unwrap ( & mut self . response ) ;
190
+ // In case we get killed early, we need to tell the person who
191
+ // tried to wake us whether they should hand-off the data to
192
+ // us.
193
+ if task:: failing ( ) {
194
+ comm:: send_one ( response, false ) ;
195
+ // Either this swap_unwrap or the one below (at "Got
196
+ // here") ought to run.
197
+ cast:: forget ( option:: swap_unwrap ( & mut self . ptr ) ) ;
198
+ } else {
199
+ assert self. ptr . is_none ( ) ;
200
+ comm:: send_one ( response, true ) ;
201
+ }
202
+ }
203
+ }
204
+ }
205
+
206
+ do task:: unkillable {
207
+ let ptr : ~ArcData <T > = cast:: reinterpret_cast ( & rc. data ) ;
208
+ let ( p1, c1) = comm:: oneshot ( ) ; // ()
209
+ let ( p2, c2) = comm:: oneshot ( ) ; // bool
210
+ let mut server: UnwrapProto = ~UnwrapProtoInner {
211
+ contents : Some ( ( c1, p2) )
212
+ } ;
213
+ let serverp: int = cast:: transmute ( server) ;
214
+ // Try to put our server end in the unwrapper slot.
215
+ if compare_and_swap ( & mut ptr. unwrapper , 0 , serverp) {
216
+ // Got in. Step 0: Tell destructor not to run. We are now it.
217
+ rc. data = ptr:: null ( ) ;
218
+ // Step 1 - drop our own reference.
219
+ let new_count = intrinsics:: atomic_xsub ( & mut ptr. count , 1 ) - 1 ;
220
+ //assert new_count >= 0;
221
+ if new_count == 0 {
222
+ // We were the last owner. Can unwrap immediately.
223
+ // Also we have to free the server endpoints.
224
+ let _server: UnwrapProto = cast:: transmute ( serverp) ;
225
+ option:: swap_unwrap ( & mut ptr. data )
226
+ // drop glue takes over.
227
+ } else {
228
+ // The *next* person who sees the refcount hit 0 will wake us.
229
+ let end_result =
230
+ DeathThroes { ptr : Some ( ptr) ,
231
+ response : Some ( c2) } ;
232
+ let mut p1 = Some ( p1) ; // argh
233
+ do task:: rekillable {
234
+ comm : : recv_one ( option:: swap_unwrap ( & mut p1) ) ;
235
+ }
236
+ // Got here. Back in the 'unkillable' without getting killed.
237
+ // Recover ownership of ptr, then take the data out.
238
+ let ptr = option:: swap_unwrap ( & mut end_result. ptr ) ;
239
+ option:: swap_unwrap ( & mut ptr. data )
240
+ // drop glue takes over.
241
+ }
242
+ } else {
243
+ // Somebody else was trying to unwrap. Avoid guaranteed deadlock.
244
+ cast : : forget ( ptr) ;
245
+ // Also we have to free the (rejected) server endpoints.
246
+ let _server: UnwrapProto = cast:: transmute ( serverp) ;
247
+ fail ! ( ~"Another task is already unwrapping this ARC !");
248
+ }
249
+ }
250
+ }
251
+
144
252
/**
145
253
* COMPLETELY UNSAFE. Used as a primitive for the safe versions in std::arc.
146
254
*
@@ -151,7 +259,7 @@ pub type SharedMutableState<T> = ArcDestruct<T>;
151
259
152
260
pub unsafe fn shared_mutable_state<T:Owned>(data: T) ->
153
261
SharedMutableState<T> {
154
- let data = ~ArcData { count : 1 , data : Some ( data) } ;
262
+ let data = ~ArcData { count: 1, unwrapper: 0, data: Some(data) };
155
263
unsafe {
156
264
let ptr = cast::transmute(data);
157
265
ArcDestruct(ptr)
@@ -305,14 +413,22 @@ pub impl<T:Owned> Exclusive<T> {
305
413
}
306
414
}
307
415
416
+ // FIXME(#3724) make this a by-move method on the exclusive
417
+ pub fn unwrap_exclusive<T:Owned>(arc: Exclusive<T>) -> T {
418
+ let Exclusive { x: x } = arc;
419
+ let inner = unsafe { unwrap_shared_mutable_state(x) };
420
+ let ExData { data: data, _ } = inner;
421
+ data
422
+ }
423
+
308
424
#[cfg(test)]
309
425
pub mod tests {
310
426
use core::option::{None, Some};
311
427
312
428
use cell::Cell;
313
429
use comm;
314
430
use option;
315
- use private:: exclusive;
431
+ use private::{ exclusive, unwrap_exclusive} ;
316
432
use result;
317
433
use task;
318
434
use uint;
@@ -363,4 +479,70 @@ pub mod tests {
363
479
assert *one == 1;
364
480
}
365
481
}
482
+
483
+ #[test]
484
+ pub fn exclusive_unwrap_basic() {
485
+ let x = exclusive(~~" hello");
486
+ assert unwrap_exclusive(x) == ~~" hello";
487
+ }
488
+
489
+ #[test]
490
+ pub fn exclusive_unwrap_contended() {
491
+ let x = exclusive(~~" hello");
492
+ let x2 = Cell(x.clone());
493
+ do task::spawn {
494
+ let x2 = x2.take();
495
+ do x2.with |_hello| { }
496
+ task::yield();
497
+ }
498
+ assert unwrap_exclusive(x) == ~~" hello";
499
+
500
+ // Now try the same thing, but with the child task blocking.
501
+ let x = exclusive(~~" hello");
502
+ let x2 = Cell(x.clone());
503
+ let mut res = None;
504
+ do task::task().future_result(|+r| res = Some(r)).spawn {
505
+ let x2 = x2.take();
506
+ assert unwrap_exclusive(x2) == ~~" hello";
507
+ }
508
+ // Have to get rid of our reference before blocking.
509
+ { let _x = x; } // FIXME(#3161) util::ignore doesn't work here
510
+ let res = option::swap_unwrap(&mut res);
511
+ res.recv();
512
+ }
513
+
514
+ #[test] #[should_fail] #[ignore(cfg(windows))]
515
+ pub fn exclusive_unwrap_conflict() {
516
+ let x = exclusive(~~" hello");
517
+ let x2 = Cell(x.clone());
518
+ let mut res = None;
519
+ do task::task().future_result(|+r| res = Some(r)).spawn {
520
+ let x2 = x2.take();
521
+ assert unwrap_exclusive(x2) == ~~" hello";
522
+ }
523
+ assert unwrap_exclusive(x) == ~~" hello";
524
+ let res = option::swap_unwrap(&mut res);
525
+ // See #4689 for why this can't be just " res. recv( ) ".
526
+ assert res.recv() == task::Success;
527
+ }
528
+
529
+ #[test] #[ignore(cfg(windows))]
530
+ pub fn exclusive_unwrap_deadlock() {
531
+ // This is not guaranteed to get to the deadlock before being killed,
532
+ // but it will show up sometimes, and if the deadlock were not there,
533
+ // the test would nondeterministically fail.
534
+ let result = do task::try {
535
+ // a task that has two references to the same exclusive will
536
+ // deadlock when it unwraps. nothing to be done about that.
537
+ let x = exclusive(~~" hello" ) ;
538
+ let x2 = x. clone ( ) ;
539
+ do task:: spawn {
540
+ for 10 . times { task : : yield ( ) ; } // try to let the unwrapper go
541
+ fail ! ( ) ; // punt it awake from its deadlock
542
+ }
543
+ let _z = unwrap_exclusive ( x) ;
544
+ do x2. with |_hello| { }
545
+ } ;
546
+ assert result. is_err ( ) ;
547
+ }
366
548
}
0 commit comments