Skip to content

Commit 0030ac8

Browse files
committed
rename background_with_channel -> as_safe_channel, rename _raise, don't unwrap user exception groups, add test for multiple receivers, clean up docs
1 parent da8a415 commit 0030ac8

File tree

6 files changed

+94
-64
lines changed

6 files changed

+94
-64
lines changed

docs/source/reference-core.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1609,10 +1609,10 @@ the numbers 0 through 9 with a 1-second delay before each one:
16091609
16101610
Trio supports async generators, but there's several caveats and it's very
16111611
hard to handle them properly. Therefore Trio bundles a helper,
1612-
`trio.background_with_channel` that does it for you.
1612+
`trio.as_safe_channel` that does it for you.
16131613

16141614

1615-
.. autofunction:: trio.background_with_channel
1615+
.. autofunction:: trio.as_safe_channel
16161616

16171617
The details behind the problems are described in the following sections.
16181618

newsfragments/3197.feature.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
Add :func:`@trio.background_with_channel <trio.background_with_channel>`, a wrapper that can be used to make async generators safe. This will be the suggested fix for `ASYNC900 <https://flake8-async.readthedocs.io/en/latest/rules.html#async900>`_.
1+
Add :func:`@trio.as_safe_channel <trio.as_safe_channel>`, a wrapper that can be used to make async generators safe. This will be the suggested fix for `ASYNC900 <https://flake8-async.readthedocs.io/en/latest/rules.html#async900>`_.

src/trio/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
MemoryChannelStatistics as MemoryChannelStatistics,
2828
MemoryReceiveChannel as MemoryReceiveChannel,
2929
MemorySendChannel as MemorySendChannel,
30-
background_with_channel as background_with_channel,
30+
as_safe_channel as as_safe_channel,
3131
open_memory_channel as open_memory_channel,
3232
)
3333
from ._core import (

src/trio/_channel.py

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
NoPublicConstructor,
2323
final,
2424
generic_function,
25-
raise_single_exception_from_group,
25+
raise_saving_context,
2626
)
2727

2828
if sys.version_info < (3, 11):
@@ -468,18 +468,15 @@ class RecvChanWrapper(ReceiveChannel[T]):
468468
def __init__(
469469
self, recv_chan: MemoryReceiveChannel[T], send_semaphore: trio.Semaphore
470470
) -> None:
471-
self.recv_chan = recv_chan
472-
self.send_semaphore = send_semaphore
473-
474-
# TODO: should this allow clones? We'd signal that by inheriting from
475-
# MemoryReceiveChannel.
471+
self._recv_chan = recv_chan
472+
self._send_semaphore = send_semaphore
476473

477474
async def receive(self) -> T:
478-
self.send_semaphore.release()
479-
return await self.recv_chan.receive()
475+
self._send_semaphore.release()
476+
return await self._recv_chan.receive()
480477

481478
async def aclose(self) -> None:
482-
await self.recv_chan.aclose()
479+
await self._recv_chan.aclose()
483480

484481
def __enter__(self) -> Self:
485482
return self
@@ -490,19 +487,14 @@ def __exit__(
490487
exc_value: BaseException | None,
491488
traceback: TracebackType | None,
492489
) -> None:
493-
self.recv_chan.close()
490+
self._recv_chan.close()
494491

495492

496-
def background_with_channel(
493+
def as_safe_channel(
497494
fn: Callable[P, AsyncGenerator[T, None]],
498495
) -> Callable[P, AbstractAsyncContextManager[ReceiveChannel[T]]]:
499496
"""Decorate an async generator function to make it cancellation-safe.
500497
501-
This is mostly a drop-in replacement, except for the fact that it will
502-
wrap errors in exception groups due to the internal nursery. Although when
503-
using it without a buffer it should be exceedingly rare to get multiple
504-
exceptions.
505-
506498
The ``yield`` keyword offers a very convenient way to write iterators...
507499
which makes it really unfortunate that async generators are so difficult
508500
to call correctly. Yielding from the inside of a cancel scope or a nursery
@@ -516,7 +508,7 @@ def background_with_channel(
516508
offering only the safe interface, and you can still write your iterables
517509
with the convenience of ``yield``. For example::
518510
519-
@background_with_channel
511+
@as_safe_channel
520512
async def my_async_iterable(arg, *, kwarg=True):
521513
while ...:
522514
item = await ...
@@ -529,9 +521,6 @@ async def my_async_iterable(arg, *, kwarg=True):
529521
While the combined async-with-async-for can be inconvenient at first,
530522
the context manager is indispensable for both correctness and for prompt
531523
cleanup of resources.
532-
533-
If you specify ``max_buffer_size>0`` the async generator will run concurrently
534-
with your iterator, until the buffer is full.
535524
"""
536525
# Perhaps a future PEP will adopt `async with for` syntax, like
537526
# https://coconut.readthedocs.io/en/master/DOCS.html#async-with-for
@@ -559,12 +548,15 @@ async def context_manager(
559548
# abandoned generator if it's still alive.
560549
nursery.cancel_scope.cancel()
561550
except BaseExceptionGroup as eg:
562-
try:
563-
raise_single_exception_from_group(eg)
564-
except AssertionError:
565-
raise RuntimeError(
566-
"Encountered exception during cleanup of generator object, as well as exception in the contextmanager body."
567-
) from eg
551+
first, *rest = eg.exceptions
552+
if rest:
553+
# In case user has except* we make it possible for them to handle the
554+
# exceptions.
555+
raise BaseExceptionGroup(
556+
"Encountered exception during cleanup of generator object, as well as exception in the contextmanager body - unable to unwrap.",
557+
[eg],
558+
) from None
559+
raise_saving_context(first)
568560

569561
async def _move_elems_to_channel(
570562
agen: AsyncGenerator[T, None],

src/trio/_tests/test_channel.py

Lines changed: 68 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
from __future__ import annotations
22

3+
import sys
34
from typing import TYPE_CHECKING, Union
45

56
import pytest
67

78
import trio
8-
from trio import EndOfChannel, background_with_channel, open_memory_channel
9+
from trio import EndOfChannel, as_safe_channel, open_memory_channel
910

1011
from ..testing import Matcher, RaisesGroup, assert_checkpoints, wait_all_tasks_blocked
1112

13+
if sys.version_info < (3, 11):
14+
from exceptiongroup import ExceptionGroup
15+
1216
if TYPE_CHECKING:
1317
from collections.abc import AsyncGenerator
1418

@@ -416,8 +420,8 @@ async def do_send(s: trio.MemorySendChannel[int], v: int) -> None:
416420
r.receive_nowait()
417421

418422

419-
async def test_background_with_channel_exhaust() -> None:
420-
@background_with_channel
423+
async def test_as_safe_channel_exhaust() -> None:
424+
@as_safe_channel
421425
async def agen() -> AsyncGenerator[int]:
422426
yield 1
423427

@@ -426,8 +430,8 @@ async def agen() -> AsyncGenerator[int]:
426430
assert x == 1
427431

428432

429-
async def test_background_with_channel_broken_resource() -> None:
430-
@background_with_channel
433+
async def test_as_safe_channel_broken_resource() -> None:
434+
@as_safe_channel
431435
async def agen() -> AsyncGenerator[int]:
432436
yield 1
433437
yield 2
@@ -445,10 +449,10 @@ async def agen() -> AsyncGenerator[int]:
445449
# but we don't get an error on exit of the cm
446450

447451

448-
async def test_background_with_channel_cancelled() -> None:
452+
async def test_as_safe_channel_cancelled() -> None:
449453
with trio.CancelScope() as cs:
450454

451-
@background_with_channel
455+
@as_safe_channel
452456
async def agen() -> AsyncGenerator[None]: # pragma: no cover
453457
raise AssertionError(
454458
"cancel before consumption means generator should not be iterated"
@@ -459,12 +463,12 @@ async def agen() -> AsyncGenerator[None]: # pragma: no cover
459463
cs.cancel()
460464

461465

462-
async def test_background_with_channel_recv_closed(
466+
async def test_as_safe_channel_recv_closed(
463467
autojump_clock: trio.testing.MockClock,
464468
) -> None:
465469
event = trio.Event()
466470

467-
@background_with_channel
471+
@as_safe_channel
468472
async def agen() -> AsyncGenerator[int]:
469473
await event.wait()
470474
yield 1
@@ -476,10 +480,10 @@ async def agen() -> AsyncGenerator[int]:
476480
await trio.sleep(1)
477481

478482

479-
async def test_background_with_channel_no_race() -> None:
483+
async def test_as_safe_channel_no_race() -> None:
480484
# this previously led to a race condition due to
481485
# https://github.com/python-trio/trio/issues/1559
482-
@background_with_channel
486+
@as_safe_channel
483487
async def agen() -> AsyncGenerator[int]:
484488
yield 1
485489
raise ValueError("oae")
@@ -490,10 +494,10 @@ async def agen() -> AsyncGenerator[int]:
490494
assert x == 1
491495

492496

493-
async def test_background_with_channel_buffer_size_too_small(
497+
async def test_as_safe_channel_buffer_size_too_small(
494498
autojump_clock: trio.testing.MockClock,
495499
) -> None:
496-
@background_with_channel
500+
@as_safe_channel
497501
async def agen() -> AsyncGenerator[int]:
498502
yield 1
499503
raise AssertionError(
@@ -507,8 +511,8 @@ async def agen() -> AsyncGenerator[int]:
507511
await trio.sleep_forever()
508512

509513

510-
async def test_background_with_channel_no_interleave() -> None:
511-
@background_with_channel
514+
async def test_as_safe_channel_no_interleave() -> None:
515+
@as_safe_channel
512516
async def agen() -> AsyncGenerator[int]:
513517
yield 1
514518
raise AssertionError # pragma: no cover
@@ -518,10 +522,10 @@ async def agen() -> AsyncGenerator[int]:
518522
await trio.lowlevel.checkpoint()
519523

520524

521-
async def test_background_with_channel_genexit_finally() -> None:
525+
async def test_as_safe_channel_genexit_finally() -> None:
522526
events: list[str] = []
523527

524-
@background_with_channel
528+
@as_safe_channel
525529
async def agen(stuff: list[str]) -> AsyncGenerator[int]:
526530
try:
527531
yield 1
@@ -532,24 +536,23 @@ async def agen(stuff: list[str]) -> AsyncGenerator[int]:
532536
stuff.append("finally")
533537
raise ValueError("agen")
534538

535-
with pytest.raises(
536-
RuntimeError,
537-
match=r"^Encountered exception during cleanup of generator object, as well as exception in the contextmanager body.$",
538-
) as excinfo:
539+
with RaisesGroup(
540+
RaisesGroup(
541+
Matcher(ValueError, match="^agen$"),
542+
Matcher(TypeError, match="^iterator$"),
543+
),
544+
match=r"^Encountered exception during cleanup of generator object, as well as exception in the contextmanager body - unable to unwrap.$",
545+
):
539546
async with agen(events) as recv_chan:
540547
async for i in recv_chan: # pragma: no branch
541548
assert i == 1
542549
raise TypeError("iterator")
543550

544551
assert events == ["GeneratorExit()", "finally"]
545-
RaisesGroup(
546-
Matcher(ValueError, match="^agen$"),
547-
Matcher(TypeError, match="^iterator$"),
548-
).matches(excinfo.value.__cause__)
549552

550553

551-
async def test_background_with_channel_nested_loop() -> None:
552-
@background_with_channel
554+
async def test_as_safe_channel_nested_loop() -> None:
555+
@as_safe_channel
553556
async def agen() -> AsyncGenerator[int]:
554557
for i in range(2):
555558
yield i
@@ -565,15 +568,49 @@ async def agen() -> AsyncGenerator[int]:
565568
ii += 1
566569

567570

568-
async def test_doesnt_leak_cancellation() -> None:
569-
@background_with_channel
570-
async def agenfn() -> AsyncGenerator[None]:
571+
async def test_as_safe_channel_doesnt_leak_cancellation() -> None:
572+
@as_safe_channel
573+
async def agen() -> AsyncGenerator[None]:
571574
with trio.CancelScope() as cscope:
572575
cscope.cancel()
573576
yield
574577

575578
with pytest.raises(AssertionError):
576-
async with agenfn() as recv_chan:
579+
async with agen() as recv_chan:
577580
async for _ in recv_chan:
578581
pass
579582
raise AssertionError("should be reachable")
583+
584+
585+
async def test_as_safe_channel_dont_unwrap_user_exceptiongroup() -> None:
586+
@as_safe_channel
587+
async def agen() -> AsyncGenerator[None]:
588+
yield
589+
590+
with RaisesGroup(Matcher(ValueError, match="bar"), match="foo"):
591+
async with agen() as _:
592+
raise ExceptionGroup("foo", [ValueError("bar")])
593+
594+
595+
async def test_as_safe_channel_multiple_receiver() -> None:
596+
event = trio.Event()
597+
598+
@as_safe_channel
599+
async def agen() -> AsyncGenerator[int]:
600+
await event.wait()
601+
for i in range(2):
602+
yield i
603+
604+
async def handle_value(
605+
recv_chan: trio.abc.ReceiveChannel[int],
606+
value: int,
607+
task_status: trio.TaskStatus,
608+
) -> None:
609+
task_status.started()
610+
assert await recv_chan.receive() == value
611+
612+
async with agen() as recv_chan:
613+
async with trio.open_nursery() as nursery:
614+
await nursery.start(handle_value, recv_chan, 0)
615+
await nursery.start(handle_value, recv_chan, 1)
616+
event.set()

src/trio/_util.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -359,9 +359,10 @@ def wraps( # type: ignore[explicit-any]
359359
from functools import wraps # noqa: F401 # this is re-exported
360360

361361

362-
def _raise(exc: BaseException) -> NoReturn:
362+
def raise_saving_context(exc: BaseException) -> NoReturn:
363363
"""This helper allows re-raising an exception without __context__ being set."""
364364
# cause does not need special handling, we simply avoid using `raise .. from ..`
365+
# __suppress_context__ also does not need handling, it's only set if modifying cause
365366
__tracebackhide__ = True
366367
context = exc.__context__
367368
try:
@@ -412,6 +413,6 @@ def _parse_excg(e: BaseException) -> None:
412413
"Attempted to unwrap exceptiongroup with multiple non-cancelled exceptions. This is often caused by a bug in the caller."
413414
) from eg
414415
if len(noncancelled_exceptions) == 1:
415-
_raise(noncancelled_exceptions[0])
416+
raise_saving_context(noncancelled_exceptions[0])
416417
assert cancelled_exceptions, "internal error"
417-
_raise(cancelled_exceptions[0])
418+
raise_saving_context(cancelled_exceptions[0])

0 commit comments

Comments
 (0)