148
148
#[ cfg( all( test, not( target_os = "emscripten" ) ) ) ]
149
149
mod tests;
150
150
151
+ mod parker;
152
+
151
153
use crate :: any:: Any ;
152
154
use crate :: cell:: UnsafeCell ;
153
155
use crate :: ffi:: { CStr , CString } ;
@@ -158,15 +160,14 @@ use crate::num::NonZeroU64;
158
160
use crate :: panic;
159
161
use crate :: panicking;
160
162
use crate :: str;
161
- use crate :: sync:: atomic:: AtomicUsize ;
162
- use crate :: sync:: atomic:: Ordering :: SeqCst ;
163
- use crate :: sync:: { Arc , Condvar , Mutex } ;
163
+ use crate :: sync:: Arc ;
164
164
use crate :: sys:: thread as imp;
165
165
use crate :: sys_common:: mutex;
166
166
use crate :: sys_common:: thread;
167
167
use crate :: sys_common:: thread_info;
168
168
use crate :: sys_common:: { AsInner , IntoInner } ;
169
169
use crate :: time:: Duration ;
170
+ use parker:: Parker ;
170
171
171
172
////////////////////////////////////////////////////////////////////////////////
172
173
// Thread-local storage
@@ -655,6 +656,8 @@ pub fn current() -> Thread {
655
656
///
656
657
/// [`channel`]: crate::sync::mpsc
657
658
/// [`join`]: JoinHandle::join
659
+ /// [`Condvar`]: crate::sync::Condvar
660
+ /// [`Mutex`]: crate::sync::Mutex
658
661
#[ stable( feature = "rust1" , since = "1.0.0" ) ]
659
662
pub fn yield_now ( ) {
660
663
imp:: Thread :: yield_now ( )
@@ -700,6 +703,8 @@ pub fn yield_now() {
700
703
/// panic!()
701
704
/// }
702
705
/// ```
706
+ ///
707
+ /// [Mutex]: crate::sync::Mutex
703
708
#[ inline]
704
709
#[ stable( feature = "rust1" , since = "1.0.0" ) ]
705
710
pub fn panicking ( ) -> bool {
@@ -767,11 +772,6 @@ pub fn sleep(dur: Duration) {
767
772
imp:: Thread :: sleep ( dur)
768
773
}
769
774
770
- // constants for park/unpark
771
- const EMPTY : usize = 0 ;
772
- const PARKED : usize = 1 ;
773
- const NOTIFIED : usize = 2 ;
774
-
775
775
/// Blocks unless or until the current thread's token is made available.
776
776
///
777
777
/// A call to `park` does not guarantee that the thread will remain parked
@@ -858,45 +858,11 @@ const NOTIFIED: usize = 2;
858
858
///
859
859
/// [`unpark`]: Thread::unpark
860
860
/// [`thread::park_timeout`]: park_timeout
861
- //
862
- // The implementation currently uses the trivial strategy of a Mutex+Condvar
863
- // with wakeup flag, which does not actually allow spurious wakeups. In the
864
- // future, this will be implemented in a more efficient way, perhaps along the lines of
865
- // http://cr.openjdk.java.net/~stefank/6989984.1/raw_files/new/src/os/linux/vm/os_linux.cpp
866
- // or futuxes, and in either case may allow spurious wakeups.
867
861
#[ stable( feature = "rust1" , since = "1.0.0" ) ]
868
862
pub fn park ( ) {
869
- let thread = current ( ) ;
870
-
871
- // If we were previously notified then we consume this notification and
872
- // return quickly.
873
- if thread. inner . state . compare_exchange ( NOTIFIED , EMPTY , SeqCst , SeqCst ) . is_ok ( ) {
874
- return ;
875
- }
876
-
877
- // Otherwise we need to coordinate going to sleep
878
- let mut m = thread. inner . lock . lock ( ) . unwrap ( ) ;
879
- match thread. inner . state . compare_exchange ( EMPTY , PARKED , SeqCst , SeqCst ) {
880
- Ok ( _) => { }
881
- Err ( NOTIFIED ) => {
882
- // We must read here, even though we know it will be `NOTIFIED`.
883
- // This is because `unpark` may have been called again since we read
884
- // `NOTIFIED` in the `compare_exchange` above. We must perform an
885
- // acquire operation that synchronizes with that `unpark` to observe
886
- // any writes it made before the call to unpark. To do that we must
887
- // read from the write it made to `state`.
888
- let old = thread. inner . state . swap ( EMPTY , SeqCst ) ;
889
- assert_eq ! ( old, NOTIFIED , "park state changed unexpectedly" ) ;
890
- return ;
891
- } // should consume this notification, so prohibit spurious wakeups in next park.
892
- Err ( _) => panic ! ( "inconsistent park state" ) ,
893
- }
894
- loop {
895
- m = thread. inner . cvar . wait ( m) . unwrap ( ) ;
896
- match thread. inner . state . compare_exchange ( NOTIFIED , EMPTY , SeqCst , SeqCst ) {
897
- Ok ( _) => return , // got a notification
898
- Err ( _) => { } // spurious wakeup, go back to sleep
899
- }
863
+ // SAFETY: park_timeout is called on the parker owned by this thread.
864
+ unsafe {
865
+ current ( ) . inner . parker . park ( ) ;
900
866
}
901
867
}
902
868
@@ -958,35 +924,9 @@ pub fn park_timeout_ms(ms: u32) {
958
924
/// ```
959
925
#[ stable( feature = "park_timeout" , since = "1.4.0" ) ]
960
926
pub fn park_timeout ( dur : Duration ) {
961
- let thread = current ( ) ;
962
-
963
- // Like `park` above we have a fast path for an already-notified thread, and
964
- // afterwards we start coordinating for a sleep.
965
- // return quickly.
966
- if thread. inner . state . compare_exchange ( NOTIFIED , EMPTY , SeqCst , SeqCst ) . is_ok ( ) {
967
- return ;
968
- }
969
- let m = thread. inner . lock . lock ( ) . unwrap ( ) ;
970
- match thread. inner . state . compare_exchange ( EMPTY , PARKED , SeqCst , SeqCst ) {
971
- Ok ( _) => { }
972
- Err ( NOTIFIED ) => {
973
- // We must read again here, see `park`.
974
- let old = thread. inner . state . swap ( EMPTY , SeqCst ) ;
975
- assert_eq ! ( old, NOTIFIED , "park state changed unexpectedly" ) ;
976
- return ;
977
- } // should consume this notification, so prohibit spurious wakeups in next park.
978
- Err ( _) => panic ! ( "inconsistent park_timeout state" ) ,
979
- }
980
-
981
- // Wait with a timeout, and if we spuriously wake up or otherwise wake up
982
- // from a notification we just want to unconditionally set the state back to
983
- // empty, either consuming a notification or un-flagging ourselves as
984
- // parked.
985
- let ( _m, _result) = thread. inner . cvar . wait_timeout ( m, dur) . unwrap ( ) ;
986
- match thread. inner . state . swap ( EMPTY , SeqCst ) {
987
- NOTIFIED => { } // got a notification, hurray!
988
- PARKED => { } // no notification, alas
989
- n => panic ! ( "inconsistent park_timeout state: {}" , n) ,
927
+ // SAFETY: park_timeout is called on the parker owned by this thread.
928
+ unsafe {
929
+ current ( ) . inner . parker . park_timeout ( dur) ;
990
930
}
991
931
}
992
932
@@ -1065,11 +1005,7 @@ impl ThreadId {
1065
1005
struct Inner {
1066
1006
name : Option < CString > , // Guaranteed to be UTF-8
1067
1007
id : ThreadId ,
1068
-
1069
- // state for thread park/unpark
1070
- state : AtomicUsize ,
1071
- lock : Mutex < ( ) > ,
1072
- cvar : Condvar ,
1008
+ parker : Parker ,
1073
1009
}
1074
1010
1075
1011
#[ derive( Clone ) ]
@@ -1103,13 +1039,7 @@ impl Thread {
1103
1039
let cname =
1104
1040
name. map ( |n| CString :: new ( n) . expect ( "thread name may not contain interior null bytes" ) ) ;
1105
1041
Thread {
1106
- inner : Arc :: new ( Inner {
1107
- name : cname,
1108
- id : ThreadId :: new ( ) ,
1109
- state : AtomicUsize :: new ( EMPTY ) ,
1110
- lock : Mutex :: new ( ( ) ) ,
1111
- cvar : Condvar :: new ( ) ,
1112
- } ) ,
1042
+ inner : Arc :: new ( Inner { name : cname, id : ThreadId :: new ( ) , parker : Parker :: new ( ) } ) ,
1113
1043
}
1114
1044
}
1115
1045
@@ -1145,32 +1075,7 @@ impl Thread {
1145
1075
/// ```
1146
1076
#[ stable( feature = "rust1" , since = "1.0.0" ) ]
1147
1077
pub fn unpark ( & self ) {
1148
- // To ensure the unparked thread will observe any writes we made
1149
- // before this call, we must perform a release operation that `park`
1150
- // can synchronize with. To do that we must write `NOTIFIED` even if
1151
- // `state` is already `NOTIFIED`. That is why this must be a swap
1152
- // rather than a compare-and-swap that returns if it reads `NOTIFIED`
1153
- // on failure.
1154
- match self . inner . state . swap ( NOTIFIED , SeqCst ) {
1155
- EMPTY => return , // no one was waiting
1156
- NOTIFIED => return , // already unparked
1157
- PARKED => { } // gotta go wake someone up
1158
- _ => panic ! ( "inconsistent state in unpark" ) ,
1159
- }
1160
-
1161
- // There is a period between when the parked thread sets `state` to
1162
- // `PARKED` (or last checked `state` in the case of a spurious wake
1163
- // up) and when it actually waits on `cvar`. If we were to notify
1164
- // during this period it would be ignored and then when the parked
1165
- // thread went to sleep it would never wake up. Fortunately, it has
1166
- // `lock` locked at this stage so we can acquire `lock` to wait until
1167
- // it is ready to receive the notification.
1168
- //
1169
- // Releasing `lock` before the call to `notify_one` means that when the
1170
- // parked thread wakes it doesn't get woken only to have to wait for us
1171
- // to release `lock`.
1172
- drop ( self . inner . lock . lock ( ) . unwrap ( ) ) ;
1173
- self . inner . cvar . notify_one ( )
1078
+ self . inner . parker . unpark ( ) ;
1174
1079
}
1175
1080
1176
1081
/// Gets the thread's unique identifier.
0 commit comments