29
29
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
30
30
//---------------------------------------------------------------------------
31
31
32
+ using System ;
32
33
using System . Threading ;
33
34
using System . Threading . Tasks ;
35
+ using System . Threading . Tasks . Sources ;
34
36
35
37
namespace RabbitMQ . Client . client . impl
36
38
{
37
39
/// <summary>
38
40
/// Inspired by http://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266920.aspx
39
41
/// </summary>
40
- sealed class AsyncManualResetEvent
42
+ sealed class AsyncManualResetEvent : IValueTaskSource
41
43
{
42
- public AsyncManualResetEvent ( bool initialState )
44
+ private ManualResetValueTaskSourceCore < bool > _valueTaskSource ;
45
+ private volatile bool _isSet ;
46
+
47
+ public AsyncManualResetEvent ( bool initialState = false )
43
48
{
49
+ _isSet = initialState ;
50
+ _valueTaskSource . Reset ( ) ;
44
51
if ( initialState )
45
52
{
46
- _taskCompletionSource . SetResult ( true ) ;
53
+ _valueTaskSource . SetResult ( true ) ;
47
54
}
48
55
}
49
56
50
- public bool IsSet => _taskCompletionSource . Task . IsCompleted ;
57
+ public bool IsSet => _isSet ;
51
58
52
- public async Task WaitAsync ( CancellationToken cancellationToken )
59
+ public async ValueTask WaitAsync ( CancellationToken cancellationToken )
53
60
{
61
+ if ( _isSet )
62
+ {
63
+ return ;
64
+ }
65
+
66
+ cancellationToken . ThrowIfCancellationRequested ( ) ;
67
+
54
68
CancellationTokenRegistration tokenRegistration =
55
69
#if NET6_0_OR_GREATER
56
70
cancellationToken . UnsafeRegister (
57
- state => ( ( TaskCompletionSource < bool > ) state ! ) . TrySetCanceled ( ) , _taskCompletionSource ) ;
71
+ static state =>
72
+ {
73
+ var ( source , token ) = ( ( ManualResetValueTaskSourceCore < bool > , CancellationToken ) ) state ! ;
74
+ source . SetException ( new OperationCanceledException ( token ) ) ;
75
+ } , ( _valueTaskSource , cancellationToken ) ) ;
58
76
#else
59
77
cancellationToken . Register (
60
- state => ( ( TaskCompletionSource < bool > ) state ! ) . TrySetCanceled ( ) ,
61
- state : _taskCompletionSource , useSynchronizationContext : false ) ;
78
+ static state =>
79
+ {
80
+ var ( source , token ) = ( ( ManualResetValueTaskSourceCore < bool > , CancellationToken ) ) state ! ;
81
+ source . SetException ( new OperationCanceledException ( token ) ) ;
82
+ } ,
83
+ state : ( _valueTaskSource , cancellationToken ) , useSynchronizationContext : false ) ;
62
84
#endif
63
85
try
64
86
{
65
- await _taskCompletionSource . Task . ConfigureAwait ( false ) ;
87
+ await new ValueTask ( this , _valueTaskSource . Version )
88
+ . ConfigureAwait ( false ) ;
66
89
}
67
90
finally
68
91
{
@@ -77,32 +100,30 @@ await tokenRegistration.DisposeAsync()
77
100
78
101
public void Set ( )
79
102
{
80
- _taskCompletionSource . TrySetResult ( true ) ;
103
+ if ( _isSet )
104
+ {
105
+ return ;
106
+ }
107
+
108
+ _isSet = true ;
109
+ _valueTaskSource . SetResult ( true ) ;
81
110
}
82
111
83
112
public void Reset ( )
84
113
{
85
- var sw = new SpinWait ( ) ;
86
-
87
- do
114
+ if ( ! _isSet )
88
115
{
89
- var currentTaskCompletionSource = _taskCompletionSource ;
90
- if ( ! currentTaskCompletionSource . Task . IsCompleted )
91
- {
92
- return ;
93
- }
94
-
95
- var nextTaskCompletionSource = new TaskCompletionSource < bool > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
96
- if ( Interlocked . CompareExchange ( ref _taskCompletionSource , nextTaskCompletionSource , currentTaskCompletionSource ) == currentTaskCompletionSource )
97
- {
98
- return ;
99
- }
100
-
101
- sw . SpinOnce ( ) ;
116
+ return ;
102
117
}
103
- while ( true ) ;
118
+
119
+ _isSet = false ;
120
+ _valueTaskSource . Reset ( ) ;
104
121
}
105
122
106
- volatile TaskCompletionSource < bool > _taskCompletionSource = new TaskCompletionSource < bool > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
123
+ void IValueTaskSource . GetResult ( short token ) => _valueTaskSource . GetResult ( token ) ;
124
+
125
+ ValueTaskSourceStatus IValueTaskSource . GetStatus ( short token ) => _valueTaskSource . GetStatus ( token ) ;
126
+
127
+ void IValueTaskSource . OnCompleted ( Action < object ? > continuation , object ? state , short token , ValueTaskSourceOnCompletedFlags flags ) => _valueTaskSource . OnCompleted ( continuation , state , token , flags ) ;
107
128
}
108
129
}
0 commit comments