Skip to content

Commit 4b5d908

Browse files
Fix issue with actor restarting instead of stopping (#1223)
When an actor threw an exception during cancellation, it restarted rather than stopping as expected. Now it stops and raises and just prints exception.
2 parents 15bcff5 + 02e30d7 commit 4b5d908

File tree

3 files changed

+101
-1
lines changed

3 files changed

+101
-1
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@
1414

1515
## Bug Fixes
1616

17-
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
17+
- Fixed issue where actors would restart instead of stopping when exceptions occurred during cancellation. Actors now properly stop and surface the unhandled exception.

src/frequenz/sdk/actor/_actor.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,24 @@ class Actor(BackgroundService, abc.ABC):
4444
This is mostly used for testing purposes and shouldn't be set in production.
4545
"""
4646

47+
def __init__(self, *, name: str | None = None) -> None:
48+
"""Create actor instance.
49+
50+
Args:
51+
name: The name of this background service.
52+
"""
53+
super().__init__(name=name)
54+
self._is_cancelled = False
55+
4756
def start(self) -> None:
4857
"""Start this actor.
4958
5059
If this actor is already running, this method does nothing.
5160
"""
5261
if self.is_running:
5362
return
63+
64+
self._is_cancelled = False
5465
self._tasks.clear()
5566
self._tasks.add(asyncio.create_task(self._run_loop()))
5667

@@ -94,6 +105,17 @@ async def _run_loop(self) -> None:
94105
_logger.info("Actor %s: Cancelled.", self)
95106
raise
96107
except Exception: # pylint: disable=broad-except
108+
if self._is_cancelled:
109+
# If actor was cancelled, but any tasks have failed with an exception
110+
# other than asyncio.CancelledError, those exceptions are combined
111+
# in an ExceptionGroup or BaseExceptionGroup.
112+
# We have to handle that case separately to stop actor instead
113+
# of restarting it.
114+
_logger.exception(
115+
"Actor %s: Raised an unhandled exception during stop.", self
116+
)
117+
break
118+
97119
_logger.exception("Actor %s: Raised an unhandled exception.", self)
98120
limit_str = "∞" if self._restart_limit is None else self._restart_limit
99121
limit_str = f"({n_restarts}/{limit_str})"
@@ -113,3 +135,14 @@ async def _run_loop(self) -> None:
113135
break
114136

115137
_logger.info("Actor %s: Stopped.", self)
138+
139+
def cancel(self, msg: str | None = None) -> None:
140+
"""Cancel actor.
141+
142+
Cancelled actor can't be started again.
143+
144+
Args:
145+
msg: The message to be passed to the tasks being cancelled.
146+
"""
147+
self._is_cancelled = True
148+
return super().cancel(msg)

tests/actor/test_actor.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,31 @@ async def _run(self) -> None:
5353
print(f"{self} done")
5454

5555

56+
class RaiseExceptionOnCancelActor(BaseTestActor):
57+
"""Actor that raises exception during stop."""
58+
59+
def __init__(
60+
self, *, name: str | None = None, recv: Receiver[int], sender: Sender[int]
61+
) -> None:
62+
"""Create an instance."""
63+
super().__init__(name=name)
64+
self._recv = recv
65+
self._sender = sender
66+
67+
async def _run(self) -> None:
68+
"""Start the actor and raise exception after receiving CancelledError."""
69+
self.inc_restart_count()
70+
if BaseTestActor.restart_count == 1:
71+
# This actor should not restart
72+
# If it does we just return to avoid infinite await on `stop`
73+
return
74+
try:
75+
async for msg in self._recv:
76+
await self._sender.send(msg)
77+
except asyncio.CancelledError as exc:
78+
raise RuntimeError("Actor should stop.") from exc
79+
80+
5681
class RaiseExceptionActor(BaseTestActor):
5782
"""A faulty actor that raises an Exception as soon as it receives a message."""
5883

@@ -333,3 +358,45 @@ async def cancel_actor() -> None:
333358
(*RUN_INFO, "Actor EchoActor[EchoActor]: Cancelled while running."),
334359
(*RUN_INFO, "All 1 actor(s) finished."),
335360
]
361+
362+
363+
async def test_actor_stop_if_error_was_raised_during_cancel(
364+
actor_auto_restart_once: None, # pylint: disable=unused-argument
365+
caplog: pytest.LogCaptureFixture,
366+
) -> None:
367+
"""If actor raises exception during cancellation it should stop.
368+
369+
And throw unhandled exception to the user..
370+
"""
371+
caplog.set_level("DEBUG", logger="frequenz.sdk.actor._actor")
372+
caplog.set_level("DEBUG", logger="frequenz.sdk.actor._run_utils")
373+
374+
input_chan: Broadcast[int] = Broadcast(name="TestChannel")
375+
376+
echo_chan: Broadcast[int] = Broadcast(name="echo output")
377+
echo_rx = echo_chan.new_receiver()
378+
379+
actor = RaiseExceptionOnCancelActor(
380+
name="test",
381+
recv=input_chan.new_receiver(),
382+
sender=echo_chan.new_sender(),
383+
)
384+
385+
# Start actor and make sure it is running
386+
actor.start()
387+
await input_chan.new_sender().send(5)
388+
msg = await echo_rx.receive()
389+
assert msg == 5
390+
assert actor.is_running is True
391+
392+
await actor.stop()
393+
assert actor.restart_count == 0
394+
395+
assert caplog.record_tuples == [
396+
(*ACTOR_INFO, "Actor RaiseExceptionOnCancelActor[test]: Started."),
397+
(
398+
*ACTOR_ERROR,
399+
"Actor RaiseExceptionOnCancelActor[test]: Raised an unhandled exception during stop.",
400+
),
401+
(*ACTOR_INFO, "Actor RaiseExceptionOnCancelActor[test]: Stopped."),
402+
]

0 commit comments

Comments
 (0)