42
42
using System . Collections . Generic ;
43
43
using System . Diagnostics ;
44
44
using System . IO ;
45
+ using System . Runtime . CompilerServices ;
45
46
using System . Text ;
46
47
using System . Threading ;
47
48
@@ -69,12 +70,15 @@ abstract class ModelBase : IFullModel, IRecoverable
69
70
70
71
private readonly object _shutdownLock = new object ( ) ;
71
72
private readonly object _rpcLock = new object ( ) ;
73
+ private readonly object _confirmLock = new object ( ) ;
72
74
73
- private readonly SynchronizedList < ulong > _unconfirmedSet = new SynchronizedList < ulong > ( ) ;
75
+ private ulong _maxDeliveryId ;
76
+ private ulong _deliveredItems = 0 ;
74
77
75
78
private EventHandler < ShutdownEventArgs > _modelShutdown ;
76
79
77
80
private bool _onlyAcksReceived = true ;
81
+ private ulong _nextPublishSeqNo ;
78
82
79
83
public IConsumerDispatcher ConsumerDispatcher { get ; private set ; }
80
84
@@ -99,7 +103,7 @@ public ModelBase(ISession session, ConsumerWorkService workService)
99
103
protected void Initialise ( ISession session )
100
104
{
101
105
CloseReason = null ;
102
- NextPublishSeqNo = 0 ;
106
+ _nextPublishSeqNo = 0 ;
103
107
Session = session ;
104
108
Session . CommandReceived = HandleCommand ;
105
109
Session . SessionShutdown += OnSessionShutdown ;
@@ -176,7 +180,9 @@ public bool IsOpen
176
180
get { return CloseReason == null ; }
177
181
}
178
182
179
- public ulong NextPublishSeqNo { get ; private set ; }
183
+
184
+
185
+ public ulong NextPublishSeqNo { get => _nextPublishSeqNo ; }
180
186
181
187
public ISession Session { get ; private set ; }
182
188
@@ -490,8 +496,10 @@ public virtual void OnModelShutdown(ShutdownEventArgs reason)
490
496
}
491
497
}
492
498
}
493
- lock ( _unconfirmedSet . SyncRoot )
494
- Monitor . Pulse ( _unconfirmedSet . SyncRoot ) ;
499
+ lock ( _confirmLock )
500
+ {
501
+ Monitor . Pulse ( _confirmLock ) ;
502
+ }
495
503
496
504
_flowControlBlock . Set ( ) ;
497
505
}
@@ -1069,20 +1077,12 @@ public abstract void BasicNack(ulong deliveryTag,
1069
1077
1070
1078
internal void AllocatatePublishSeqNos ( int count )
1071
1079
{
1072
- int c = 0 ;
1073
- lock ( _unconfirmedSet . SyncRoot )
1080
+
1081
+ lock ( _confirmLock )
1074
1082
{
1075
- while ( c < count )
1083
+ if ( _nextPublishSeqNo > 0 )
1076
1084
{
1077
- if ( NextPublishSeqNo > 0 )
1078
- {
1079
- if ( ! _unconfirmedSet . Contains ( NextPublishSeqNo ) )
1080
- {
1081
- _unconfirmedSet . Add ( NextPublishSeqNo ) ;
1082
- }
1083
- NextPublishSeqNo ++ ;
1084
- }
1085
- c ++ ;
1085
+ _nextPublishSeqNo = InterlockedEx . Add ( ref _nextPublishSeqNo , ( ulong ) count ) ;
1086
1086
}
1087
1087
}
1088
1088
}
@@ -1102,17 +1102,14 @@ public void BasicPublish(string exchange,
1102
1102
{
1103
1103
basicProperties = CreateBasicProperties ( ) ;
1104
1104
}
1105
- if ( NextPublishSeqNo > 0 )
1105
+ if ( _nextPublishSeqNo > 0 )
1106
1106
{
1107
- lock ( _unconfirmedSet . SyncRoot )
1107
+ lock ( _confirmLock )
1108
1108
{
1109
- if ( ! _unconfirmedSet . Contains ( NextPublishSeqNo ) )
1110
- {
1111
- _unconfirmedSet . Add ( NextPublishSeqNo ) ;
1112
- }
1113
- NextPublishSeqNo ++ ;
1109
+ _nextPublishSeqNo = InterlockedEx . Increment ( ref _nextPublishSeqNo ) ;
1114
1110
}
1115
1111
}
1112
+
1116
1113
_Private_BasicPublish ( exchange ,
1117
1114
routingKey ,
1118
1115
mandatory ,
@@ -1170,8 +1167,9 @@ public void ConfirmSelect()
1170
1167
{
1171
1168
if ( NextPublishSeqNo == 0UL )
1172
1169
{
1173
- NextPublishSeqNo = 1 ;
1170
+ _nextPublishSeqNo = 1 ;
1174
1171
}
1172
+
1175
1173
_Private_ConfirmSelect ( false ) ;
1176
1174
}
1177
1175
@@ -1331,7 +1329,7 @@ public bool WaitForConfirms(TimeSpan timeout, out bool timedOut)
1331
1329
}
1332
1330
bool isWaitInfinite = timeout . TotalMilliseconds == Timeout . Infinite ;
1333
1331
Stopwatch stopwatch = Stopwatch . StartNew ( ) ;
1334
- lock ( _unconfirmedSet . SyncRoot )
1332
+ lock ( _confirmLock )
1335
1333
{
1336
1334
while ( true )
1337
1335
{
@@ -1340,7 +1338,7 @@ public bool WaitForConfirms(TimeSpan timeout, out bool timedOut)
1340
1338
throw new AlreadyClosedException ( CloseReason ) ;
1341
1339
}
1342
1340
1343
- if ( _unconfirmedSet . Count == 0 )
1341
+ if ( _deliveredItems == _nextPublishSeqNo - 1 )
1344
1342
{
1345
1343
bool aux = _onlyAcksReceived ;
1346
1344
_onlyAcksReceived = true ;
@@ -1349,13 +1347,12 @@ public bool WaitForConfirms(TimeSpan timeout, out bool timedOut)
1349
1347
}
1350
1348
if ( isWaitInfinite )
1351
1349
{
1352
- Monitor . Wait ( _unconfirmedSet . SyncRoot ) ;
1350
+ Monitor . Wait ( _confirmLock ) ;
1353
1351
}
1354
1352
else
1355
1353
{
1356
1354
TimeSpan elapsed = stopwatch . Elapsed ;
1357
- if ( elapsed > timeout || ! Monitor . Wait (
1358
- _unconfirmedSet . SyncRoot , timeout - elapsed ) )
1355
+ if ( elapsed > timeout || ! Monitor . Wait ( _confirmLock , timeout - elapsed ) )
1359
1356
{
1360
1357
timedOut = true ;
1361
1358
return _onlyAcksReceived ;
@@ -1411,32 +1408,27 @@ internal void SendCommands(IList<Command> commands)
1411
1408
1412
1409
protected virtual void handleAckNack ( ulong deliveryTag , bool multiple , bool isNack )
1413
1410
{
1414
- lock ( _unconfirmedSet . SyncRoot )
1411
+ lock ( _confirmLock )
1415
1412
{
1416
- if ( multiple )
1417
- {
1418
- for ( ulong i = _unconfirmedSet [ 0 ] ; i <= deliveryTag ; i ++ )
1419
- {
1420
- // removes potential duplicates
1421
- while ( _unconfirmedSet . Remove ( i ) )
1422
- {
1423
- }
1424
- }
1425
- }
1426
- else
1413
+ _deliveredItems = InterlockedEx . Increment ( ref _deliveredItems ) ;
1414
+
1415
+ if ( multiple && _maxDeliveryId < deliveryTag )
1427
1416
{
1428
- while ( _unconfirmedSet . Remove ( deliveryTag ) )
1429
- {
1430
- }
1417
+ _maxDeliveryId = deliveryTag ;
1431
1418
}
1419
+
1420
+ _deliveredItems = Math . Max ( _maxDeliveryId , _deliveredItems ) ;
1432
1421
_onlyAcksReceived = _onlyAcksReceived && ! isNack ;
1433
- if ( _unconfirmedSet . Count == 0 )
1422
+ if ( _deliveredItems == _nextPublishSeqNo - 1 )
1434
1423
{
1435
- Monitor . Pulse ( _unconfirmedSet . SyncRoot ) ;
1424
+ Monitor . Pulse ( _confirmLock ) ;
1436
1425
}
1437
1426
}
1427
+
1438
1428
}
1439
1429
1430
+
1431
+
1440
1432
private QueueDeclareOk QueueDeclare ( string queue , bool passive , bool durable , bool exclusive ,
1441
1433
bool autoDelete , IDictionary < string , object > arguments )
1442
1434
{
@@ -1478,5 +1470,26 @@ public class QueueDeclareRpcContinuation : SimpleBlockingRpcContinuation
1478
1470
{
1479
1471
public QueueDeclareOk m_result ;
1480
1472
}
1473
+
1474
+ public static class InterlockedEx
1475
+ {
1476
+ public static ulong Increment ( ref ulong location )
1477
+ {
1478
+ long incrementedSigned = Interlocked . Increment ( ref Unsafe . As < ulong , long > ( ref location ) ) ;
1479
+ return Unsafe . As < long , ulong > ( ref incrementedSigned ) ;
1480
+ }
1481
+
1482
+ public static ulong Decrement ( ref ulong location )
1483
+ {
1484
+ long decrementedSigned = Interlocked . Decrement ( ref Unsafe . As < ulong , long > ( ref location ) ) ;
1485
+ return Unsafe . As < long , ulong > ( ref decrementedSigned ) ;
1486
+ }
1487
+
1488
+ public static ulong Add ( ref ulong location , ulong value )
1489
+ {
1490
+ long addSigned = Interlocked . Add ( ref Unsafe . As < ulong , long > ( ref location ) , Unsafe . As < ulong , long > ( ref value ) ) ;
1491
+ return Unsafe . As < long , ulong > ( ref addSigned ) ;
1492
+ }
1493
+ }
1481
1494
}
1482
1495
}
0 commit comments