@@ -14,14 +14,6 @@ extern mod rustrt {
14
14
fn rust_task_weaken ( ch : rust_port_id ) ;
15
15
fn rust_task_unweaken ( ch : rust_port_id ) ;
16
16
17
- #[ rust_stack]
18
- fn rust_atomic_increment ( p : & mut libc:: intptr_t )
19
- -> libc:: intptr_t ;
20
-
21
- #[ rust_stack]
22
- fn rust_atomic_decrement ( p : & mut libc:: intptr_t )
23
- -> libc:: intptr_t ;
24
-
25
17
#[ rust_stack]
26
18
fn rust_compare_and_swap_ptr ( address : & mut libc:: uintptr_t ,
27
19
oldval : libc:: uintptr_t ,
@@ -33,6 +25,15 @@ extern mod rustrt {
33
25
fn rust_unlock_little_lock ( lock : rust_little_lock ) ;
34
26
}
35
27
28
+ #[ abi = "rust-intrinsic" ]
29
+ extern mod rusti {
30
+
31
+ #[ cfg( stage1) ] #[ cfg( stage2) ] #[ cfg( stage3) ]
32
+ fn atomic_cxchg ( dst : & mut int , old : int , src : int ) -> int ;
33
+ fn atomic_xadd ( dst : & mut int , src : int ) -> int ;
34
+ fn atomic_xsub ( dst : & mut int , src : int ) -> int ;
35
+ }
36
+
36
37
#[ allow( non_camel_case_types) ] // runtime type
37
38
type rust_port_id = uint ;
38
39
@@ -43,6 +44,7 @@ type GlobalPtr = *libc::uintptr_t;
43
44
* or, if no channel exists creates and installs a new channel and sets up a
44
45
* new task to receive from it.
45
46
*/
47
+ #[ cfg( stage0) ]
46
48
pub unsafe fn chan_from_global_ptr < T : Send > (
47
49
global : GlobalPtr ,
48
50
task_fn : fn ( ) -> task:: TaskBuilder ,
@@ -103,6 +105,68 @@ pub unsafe fn chan_from_global_ptr<T: Send>(
103
105
}
104
106
}
105
107
108
+ #[ cfg( stage1) ] #[ cfg( stage2) ] #[ cfg( stage3) ]
109
+ pub unsafe fn chan_from_global_ptr < T : Send > (
110
+ global : GlobalPtr ,
111
+ task_fn : fn ( ) -> task:: TaskBuilder ,
112
+ f : fn ~( comm:: Port < T > )
113
+ ) -> comm:: Chan < T > {
114
+
115
+ enum Msg {
116
+ Proceed ,
117
+ Abort
118
+ }
119
+
120
+ log ( debug, ~"ENTERING chan_from_global_ptr, before is_prob_zero check") ;
121
+ let is_probably_zero = * global == 0 u;
122
+ log ( debug, ~"after is_prob_zero check") ;
123
+ if is_probably_zero {
124
+ log ( debug, ~"is probably zero...");
125
+ // There's no global channel. We must make it
126
+
127
+ let (setup_po, setup_ch) = do task_fn().spawn_conversation
128
+ |move f, setup_po, setup_ch| {
129
+ let po = comm::Port::<T>();
130
+ let ch = comm::Chan(&po);
131
+ comm::send(setup_ch, ch);
132
+
133
+ // Wait to hear if we are the official instance of
134
+ // this global task
135
+ match comm::recv::<Msg>(setup_po) {
136
+ Proceed => f(move po),
137
+ Abort => ()
138
+ }
139
+ };
140
+
141
+ log(debug,~" before setup recv..");
142
+ // This is the proposed global channel
143
+ let ch = comm::recv(setup_po);
144
+ // 0 is our sentinal value. It is not a valid channel
145
+ assert *ch != 0;
146
+
147
+ // Install the channel
148
+ log(debug,~" BEFORE COMPARE AND SWAP ") ;
149
+ rusti:: atomic_cxchg (
150
+ cast:: reinterpret_cast ( & global) ,
151
+ 0 , cast:: reinterpret_cast ( & ch) ) ;
152
+ let swapped = * global != 0 ;
153
+ log ( debug, fmt ! ( "AFTER .. swapped? %?" , swapped) ) ;
154
+
155
+ if swapped {
156
+ // Success!
157
+ comm:: send ( setup_ch, Proceed ) ;
158
+ ch
159
+ } else {
160
+ // Somebody else got in before we did
161
+ comm:: send ( setup_ch, Abort ) ;
162
+ cast:: reinterpret_cast ( & * global)
163
+ }
164
+ } else {
165
+ log ( debug, ~"global != 0 ") ;
166
+ cast:: reinterpret_cast ( & * global)
167
+ }
168
+ }
169
+
106
170
#[ test]
107
171
pub fn test_from_global_chan1 ( ) {
108
172
@@ -305,7 +369,7 @@ struct ArcDestruct<T> {
305
369
}
306
370
do task:: unkillable {
307
371
let data: ~ArcData <T > = cast:: reinterpret_cast( & self . data) ;
308
- let new_count = rustrt :: rust_atomic_decrement ( & mut data. count) ;
372
+ let new_count = rusti :: atomic_xsub ( & mut data. count, 1 ) - 1 ;
309
373
assert new_count >= 0 ;
310
374
if new_count == 0 {
311
375
// Were we really last, or should we hand off to an unwrapper?
@@ -341,6 +405,7 @@ fn ArcDestruct<T>(data: *libc::c_void) -> ArcDestruct<T> {
341
405
}
342
406
}
343
407
408
+ #[ cfg( stage0) ]
344
409
pub unsafe fn unwrap_shared_mutable_state<T : Send >( rc: SharedMutableState <T >)
345
410
-> T {
346
411
struct DeathThroes <T > {
@@ -373,8 +438,76 @@ pub unsafe fn unwrap_shared_mutable_state<T: Send>(rc: SharedMutableState<T>)
373
438
// Got in. Step 0: Tell destructor not to run. We are now it.
374
439
rc. data = ptr:: null( ) ;
375
440
// Step 1 - drop our own reference.
376
- let new_count = rustrt:: rust_atomic_decrement( & mut ptr. count) ;
377
- // assert new_count >= 0;
441
+ let new_count = rusti:: atomic_xsub( & mut ptr. count, 1 ) - 1 ;
442
+ //assert new_count >= 0;
443
+ if new_count == 0 {
444
+ // We were the last owner. Can unwrap immediately.
445
+ // Also we have to free the server endpoints.
446
+ let _server: UnwrapProto = cast:: transmute( move serverp) ;
447
+ option:: swap_unwrap( & mut ptr. data)
448
+ // drop glue takes over.
449
+ } else {
450
+ // The *next* person who sees the refcount hit 0 will wake us.
451
+ let end_result =
452
+ DeathThroes { ptr: Some ( move ptr) ,
453
+ response: Some ( move c2) } ;
454
+ let mut p1 = Some ( move p1) ; // argh
455
+ do task:: rekillable {
456
+ pipes:: recv_one( option:: swap_unwrap( & mut p1) ) ;
457
+ }
458
+ // Got here. Back in the 'unkillable' without getting killed.
459
+ // Recover ownership of ptr, then take the data out.
460
+ let ptr = option:: swap_unwrap( & mut end_result. ptr) ;
461
+ option:: swap_unwrap( & mut ptr. data)
462
+ // drop glue takes over.
463
+ }
464
+ } else {
465
+ // Somebody else was trying to unwrap. Avoid guaranteed deadlock.
466
+ cast:: forget( move ptr) ;
467
+ // Also we have to free the (rejected) server endpoints.
468
+ let _server: UnwrapProto = cast:: transmute( move serverp) ;
469
+ fail ~"Another task is already unwrapping this ARC !";
470
+ }
471
+ }
472
+ }
473
+
474
+ #[ cfg( stage1) ] #[ cfg( stage2) ] #[ cfg( stage3) ]
475
+ pub unsafe fn unwrap_shared_mutable_state<T : Send >( rc: SharedMutableState <T >)
476
+ -> T {
477
+ struct DeathThroes <T > {
478
+ mut ptr: Option <~ArcData <T >>,
479
+ mut response: Option <pipes:: ChanOne <bool >>,
480
+ drop unsafe {
481
+ let response = option:: swap_unwrap( & mut self . response) ;
482
+ // In case we get killed early, we need to tell the person who
483
+ // tried to wake us whether they should hand-off the data to us.
484
+ if task:: failing( ) {
485
+ pipes:: send_one( move response, false) ;
486
+ // Either this swap_unwrap or the one below (at "Got here")
487
+ // ought to run.
488
+ cast:: forget( option:: swap_unwrap( & mut self . ptr) ) ;
489
+ } else {
490
+ assert self . ptr. is_none( ) ;
491
+ pipes:: send_one( move response, true) ;
492
+ }
493
+ }
494
+ }
495
+
496
+ do task:: unkillable {
497
+ let ptr: ~ArcData <T > = cast:: reinterpret_cast( & rc. data) ;
498
+ let ( c1, p1) = pipes:: oneshot( ) ; // ()
499
+ let ( c2, p2) = pipes:: oneshot( ) ; // bool
500
+ let server: UnwrapProto = ~mut Some ( ( move c1, move p2) ) ;
501
+ let serverp: libc:: uintptr_t = cast:: transmute( move server) ;
502
+ // Try to put our server end in the unwrapper slot.
503
+ rusti:: atomic_cxchg( cast:: reinterpret_cast( & ptr. unwrapper) ,
504
+ 0 , serverp as int) ;
505
+ if ptr. unwrapper != 0 {
506
+ // Got in. Step 0: Tell destructor not to run. We are now it.
507
+ rc. data = ptr:: null( ) ;
508
+ // Step 1 - drop our own reference.
509
+ let new_count = rusti:: atomic_xsub( & mut ptr. count, 1 ) - 1 ;
510
+ //assert new_count >= 0;
378
511
if new_count == 0 {
379
512
// We were the last owner. Can unwrap immediately.
380
513
// Also we have to free the server endpoints.
@@ -452,7 +585,7 @@ pub unsafe fn clone_shared_mutable_state<T: Send>(rc: &SharedMutableState<T>)
452
585
-> SharedMutableState <T > {
453
586
unsafe {
454
587
let ptr: ~ArcData <T > = cast:: reinterpret_cast( & ( * rc) . data) ;
455
- let new_count = rustrt :: rust_atomic_increment ( & mut ptr. count) ;
588
+ let new_count = rusti :: atomic_xadd ( & mut ptr. count, 1 ) + 1 ;
456
589
assert new_count >= 2 ;
457
590
cast:: forget( move ptr) ;
458
591
}
0 commit comments