Skip to content

Commit 6b53d5f

Browse files
gh-112202: Ensure that condition.notify() succeeds even when racing with Task.cancel() (#112201)
Also did a general cleanup of asyncio locks.py comments and docstrings.
1 parent 96bce03 commit 6b53d5f

File tree

4 files changed

+165
-52
lines changed

4 files changed

+165
-52
lines changed

Doc/library/asyncio-sync.rst

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,8 @@ Condition
216216

217217
.. method:: notify(n=1)
218218

219-
Wake up at most *n* tasks (1 by default) waiting on this
220-
condition. The method is no-op if no tasks are waiting.
219+
Wake up *n* tasks (1 by default) waiting on this
220+
condition. If fewer than *n* tasks are waiting they are all awakened.
221221

222222
The lock must be acquired before this method is called and
223223
released shortly after. If called with an *unlocked* lock
@@ -257,12 +257,18 @@ Condition
257257
Once awakened, the Condition re-acquires its lock and this method
258258
returns ``True``.
259259

260+
Note that a task *may* return from this call spuriously,
261+
which is why the caller should always re-check the state
262+
and be prepared to :meth:`wait` again. For this reason, you may
263+
prefer to use :meth:`wait_for` instead.
264+
260265
.. coroutinemethod:: wait_for(predicate)
261266

262267
Wait until a predicate becomes *true*.
263268

264269
The predicate must be a callable which result will be
265-
interpreted as a boolean value. The final value is the
270+
interpreted as a boolean value. The method will repeatedly
271+
:meth:`wait` until the predicate evaluates to *true*. The final value is the
266272
return value.
267273

268274

Lib/asyncio/locks.py

Lines changed: 63 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,23 @@ class Lock(_ContextManagerMixin, mixins._LoopBoundMixin):
2424
"""Primitive lock objects.
2525
2626
A primitive lock is a synchronization primitive that is not owned
27-
by a particular coroutine when locked. A primitive lock is in one
27+
by a particular task when locked. A primitive lock is in one
2828
of two states, 'locked' or 'unlocked'.
2929
3030
It is created in the unlocked state. It has two basic methods,
3131
acquire() and release(). When the state is unlocked, acquire()
3232
changes the state to locked and returns immediately. When the
3333
state is locked, acquire() blocks until a call to release() in
34-
another coroutine changes it to unlocked, then the acquire() call
34+
another task changes it to unlocked, then the acquire() call
3535
resets it to locked and returns. The release() method should only
3636
be called in the locked state; it changes the state to unlocked
3737
and returns immediately. If an attempt is made to release an
3838
unlocked lock, a RuntimeError will be raised.
3939
40-
When more than one coroutine is blocked in acquire() waiting for
41-
the state to turn to unlocked, only one coroutine proceeds when a
42-
release() call resets the state to unlocked; first coroutine which
43-
is blocked in acquire() is being processed.
44-
45-
acquire() is a coroutine and should be called with 'await'.
40+
When more than one task is blocked in acquire() waiting for
41+
the state to turn to unlocked, only one task proceeds when a
42+
release() call resets the state to unlocked; successive release()
43+
calls will unblock tasks in FIFO order.
4644
4745
Locks also support the asynchronous context management protocol.
4846
'async with lock' statement should be used.
@@ -130,7 +128,7 @@ def release(self):
130128
"""Release a lock.
131129
132130
When the lock is locked, reset it to unlocked, and return.
133-
If any other coroutines are blocked waiting for the lock to become
131+
If any other tasks are blocked waiting for the lock to become
134132
unlocked, allow exactly one of them to proceed.
135133
136134
When invoked on an unlocked lock, a RuntimeError is raised.
@@ -182,8 +180,8 @@ def is_set(self):
182180
return self._value
183181

184182
def set(self):
185-
"""Set the internal flag to true. All coroutines waiting for it to
186-
become true are awakened. Coroutine that call wait() once the flag is
183+
"""Set the internal flag to true. All tasks waiting for it to
184+
become true are awakened. Tasks that call wait() once the flag is
187185
true will not block at all.
188186
"""
189187
if not self._value:
@@ -194,7 +192,7 @@ def set(self):
194192
fut.set_result(True)
195193

196194
def clear(self):
197-
"""Reset the internal flag to false. Subsequently, coroutines calling
195+
"""Reset the internal flag to false. Subsequently, tasks calling
198196
wait() will block until set() is called to set the internal flag
199197
to true again."""
200198
self._value = False
@@ -203,7 +201,7 @@ async def wait(self):
203201
"""Block until the internal flag is true.
204202
205203
If the internal flag is true on entry, return True
206-
immediately. Otherwise, block until another coroutine calls
204+
immediately. Otherwise, block until another task calls
207205
set() to set the flag to true, then return True.
208206
"""
209207
if self._value:
@@ -222,8 +220,8 @@ class Condition(_ContextManagerMixin, mixins._LoopBoundMixin):
222220
"""Asynchronous equivalent to threading.Condition.
223221
224222
This class implements condition variable objects. A condition variable
225-
allows one or more coroutines to wait until they are notified by another
226-
coroutine.
223+
allows one or more tasks to wait until they are notified by another
224+
task.
227225
228226
A new Lock object is created and used as the underlying lock.
229227
"""
@@ -250,50 +248,64 @@ def __repr__(self):
250248
async def wait(self):
251249
"""Wait until notified.
252250
253-
If the calling coroutine has not acquired the lock when this
251+
If the calling task has not acquired the lock when this
254252
method is called, a RuntimeError is raised.
255253
256254
This method releases the underlying lock, and then blocks
257255
until it is awakened by a notify() or notify_all() call for
258-
the same condition variable in another coroutine. Once
256+
the same condition variable in another task. Once
259257
awakened, it re-acquires the lock and returns True.
258+
259+
This method may return spuriously,
260+
which is why the caller should always
261+
re-check the state and be prepared to wait() again.
260262
"""
261263
if not self.locked():
262264
raise RuntimeError('cannot wait on un-acquired lock')
263265

266+
fut = self._get_loop().create_future()
264267
self.release()
265268
try:
266-
fut = self._get_loop().create_future()
267-
self._waiters.append(fut)
268269
try:
269-
await fut
270-
return True
271-
finally:
272-
self._waiters.remove(fut)
273-
274-
finally:
275-
# Must re-acquire lock even if wait is cancelled.
276-
# We only catch CancelledError here, since we don't want any
277-
# other (fatal) errors with the future to cause us to spin.
278-
err = None
279-
while True:
280-
try:
281-
await self.acquire()
282-
break
283-
except exceptions.CancelledError as e:
284-
err = e
285-
286-
if err:
270+
self._waiters.append(fut)
287271
try:
288-
raise err # Re-raise most recent exception instance.
272+
await fut
273+
return True
289274
finally:
290-
err = None # Break reference cycles.
275+
self._waiters.remove(fut)
276+
277+
finally:
278+
# Must re-acquire lock even if wait is cancelled.
279+
# We only catch CancelledError here, since we don't want any
280+
# other (fatal) errors with the future to cause us to spin.
281+
err = None
282+
while True:
283+
try:
284+
await self.acquire()
285+
break
286+
except exceptions.CancelledError as e:
287+
err = e
288+
289+
if err is not None:
290+
try:
291+
raise err # Re-raise most recent exception instance.
292+
finally:
293+
err = None # Break reference cycles.
294+
except BaseException:
295+
# Any error raised out of here _may_ have occurred after this Task
296+
# believed to have been successfully notified.
297+
# Make sure to notify another Task instead. This may result
298+
# in a "spurious wakeup", which is allowed as part of the
299+
# Condition Variable protocol.
300+
self._notify(1)
301+
raise
291302

292303
async def wait_for(self, predicate):
293304
"""Wait until a predicate becomes true.
294305
295-
The predicate should be a callable which result will be
296-
interpreted as a boolean value. The final predicate value is
306+
The predicate should be a callable whose result will be
307+
interpreted as a boolean value. The method will repeatedly
308+
wait() until it evaluates to true. The final predicate value is
297309
the return value.
298310
"""
299311
result = predicate()
@@ -303,20 +315,22 @@ async def wait_for(self, predicate):
303315
return result
304316

305317
def notify(self, n=1):
306-
"""By default, wake up one coroutine waiting on this condition, if any.
307-
If the calling coroutine has not acquired the lock when this method
318+
"""By default, wake up one task waiting on this condition, if any.
319+
If the calling task has not acquired the lock when this method
308320
is called, a RuntimeError is raised.
309321
310-
This method wakes up at most n of the coroutines waiting for the
311-
condition variable; it is a no-op if no coroutines are waiting.
322+
This method wakes up n of the tasks waiting for the condition
323+
variable; if fewer than n are waiting, they are all awoken.
312324
313-
Note: an awakened coroutine does not actually return from its
325+
Note: an awakened task does not actually return from its
314326
wait() call until it can reacquire the lock. Since notify() does
315327
not release the lock, its caller should.
316328
"""
317329
if not self.locked():
318330
raise RuntimeError('cannot notify on un-acquired lock')
331+
self._notify(n)
319332

333+
def _notify(self, n):
320334
idx = 0
321335
for fut in self._waiters:
322336
if idx >= n:
@@ -374,7 +388,7 @@ async def acquire(self):
374388
375389
If the internal counter is larger than zero on entry,
376390
decrement it by one and return True immediately. If it is
377-
zero on entry, block, waiting until some other coroutine has
391+
zero on entry, block, waiting until some other task has
378392
called release() to make it larger than 0, and then return
379393
True.
380394
"""
@@ -414,8 +428,8 @@ async def acquire(self):
414428
def release(self):
415429
"""Release a semaphore, incrementing the internal counter by one.
416430
417-
When it was zero on entry and another coroutine is waiting for it to
418-
become larger than zero again, wake up that coroutine.
431+
When it was zero on entry and another task is waiting for it to
432+
become larger than zero again, wake up that task.
419433
"""
420434
self._value += 1
421435
self._wake_up_next()

Lib/test/test_asyncio/test_locks.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,98 @@ async def func():
816816
# originally raised.
817817
self.assertIs(err.exception, raised)
818818

819+
async def test_cancelled_wakeup(self):
820+
# Test that a task cancelled at the "same" time as it is woken
821+
# up as part of a Condition.notify() does not result in a lost wakeup.
822+
# This test simulates a cancel while the target task is awaiting initial
823+
# wakeup on the wakeup queue.
824+
condition = asyncio.Condition()
825+
state = 0
826+
async def consumer():
827+
nonlocal state
828+
async with condition:
829+
while True:
830+
await condition.wait_for(lambda: state != 0)
831+
if state < 0:
832+
return
833+
state -= 1
834+
835+
# create two consumers
836+
c = [asyncio.create_task(consumer()) for _ in range(2)]
837+
# wait for them to settle
838+
await asyncio.sleep(0)
839+
async with condition:
840+
# produce one item and wake up one
841+
state += 1
842+
condition.notify(1)
843+
844+
# Cancel it while it is awaiting to be run.
845+
# This cancellation could come from the outside
846+
c[0].cancel()
847+
848+
# now wait for the item to be consumed
849+
# if it doesn't means that our "notify" didn"t take hold.
850+
# because it raced with a cancel()
851+
try:
852+
async with asyncio.timeout(0.01):
853+
await condition.wait_for(lambda: state == 0)
854+
except TimeoutError:
855+
pass
856+
self.assertEqual(state, 0)
857+
858+
# clean up
859+
state = -1
860+
condition.notify_all()
861+
await c[1]
862+
863+
async def test_cancelled_wakeup_relock(self):
864+
# Test that a task cancelled at the "same" time as it is woken
865+
# up as part of a Condition.notify() does not result in a lost wakeup.
866+
# This test simulates a cancel while the target task is acquiring the lock
867+
# again.
868+
condition = asyncio.Condition()
869+
state = 0
870+
async def consumer():
871+
nonlocal state
872+
async with condition:
873+
while True:
874+
await condition.wait_for(lambda: state != 0)
875+
if state < 0:
876+
return
877+
state -= 1
878+
879+
# create two consumers
880+
c = [asyncio.create_task(consumer()) for _ in range(2)]
881+
# wait for them to settle
882+
await asyncio.sleep(0)
883+
async with condition:
884+
# produce one item and wake up one
885+
state += 1
886+
condition.notify(1)
887+
888+
# now we sleep for a bit. This allows the target task to wake up and
889+
# settle on re-aquiring the lock
890+
await asyncio.sleep(0)
891+
892+
# Cancel it while awaiting the lock
893+
# This cancel could come the outside.
894+
c[0].cancel()
895+
896+
# now wait for the item to be consumed
897+
# if it doesn't means that our "notify" didn"t take hold.
898+
# because it raced with a cancel()
899+
try:
900+
async with asyncio.timeout(0.01):
901+
await condition.wait_for(lambda: state == 0)
902+
except TimeoutError:
903+
pass
904+
self.assertEqual(state, 0)
905+
906+
# clean up
907+
state = -1
908+
condition.notify_all()
909+
await c[1]
910+
819911
class SemaphoreTests(unittest.IsolatedAsyncioTestCase):
820912

821913
def test_initial_value_zero(self):
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Ensure that a :func:`asyncio.Condition.notify` call does not get lost if the awakened ``Task`` is simultaneously cancelled or encounters any other error.

0 commit comments

Comments
 (0)