17
17
/// High level implementation details can be found in the comment of the parent
18
18
/// module.
19
19
20
+ use cmp;
20
21
use comm:: Port ;
21
22
use int;
22
23
use iter:: Iterator ;
@@ -32,6 +33,9 @@ use sync::atomics;
32
33
use vec:: OwnedVector ;
33
34
34
35
static DISCONNECTED : int = int:: MIN ;
36
+ #[ cfg( test) ]
37
+ static MAX_STEALS : int = 5 ;
38
+ #[ cfg( not( test) ) ]
35
39
static MAX_STEALS : int = 1 << 20 ;
36
40
37
41
pub struct Packet < T > {
@@ -198,19 +202,28 @@ impl<T: Send> Packet<T> {
198
202
pub fn try_recv ( & mut self ) -> Result < T , Failure < T > > {
199
203
match self . queue . pop ( ) {
200
204
// If we stole some data, record to that effect (this will be
201
- // factored into cnt later on). Note that we don't allow steals to
202
- // grow without bound in order to prevent eventual overflow of
203
- // either steals or cnt as an overflow would have catastrophic
204
- // results. Also note that we don't unconditionally set steals to 0
205
- // because it can be true that steals > cnt.
205
+ // factored into cnt later on).
206
+ //
207
+ // Note that we don't allow steals to grow without bound in order to
208
+ // prevent eventual overflow of either steals or cnt as an overflow
209
+ // would have catastrophic results. Sometimes, steals > cnt, but
210
+ // other times cnt > steals, so we don't know the relation between
211
+ // steals and cnt. This code path is executed only rarely, so we do
212
+ // a pretty slow operation, of swapping 0 into cnt, taking steals
213
+ // down as much as possible (without going negative), and then
214
+ // adding back in whatever we couldn't factor into steals.
206
215
Some ( data) => {
207
216
self . steals += 1 ;
208
217
if self . steals > MAX_STEALS {
209
218
match self . cnt . swap ( 0 , atomics:: SeqCst ) {
210
219
DISCONNECTED => {
211
220
self . cnt . store ( DISCONNECTED , atomics:: SeqCst ) ;
212
221
}
213
- n => { self . steals -= n; }
222
+ n => {
223
+ let m = cmp:: min ( n, self . steals ) ;
224
+ self . steals -= m;
225
+ self . cnt . fetch_add ( n - m, atomics:: SeqCst ) ;
226
+ }
214
227
}
215
228
assert ! ( self . steals >= 0 ) ;
216
229
}
0 commit comments