Skip to content

Commit d646715

Browse files
committed
Use new type DispatchEvent for new/updated/deleted dispatches
Also add some tests for it. Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 4c7e08f commit d646715

File tree

4 files changed

+169
-42
lines changed

4 files changed

+169
-42
lines changed

README.md

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,30 @@ It provides two channels for clients:
1515
## Example Usage
1616

1717
```python
18-
async def run():
19-
# dispatch helper sends out dispatches when they are due
20-
dispatch_arrived = dispatch_helper.new_dispatches().new_receiver()
21-
dispatch_ready = dispatch_helper.ready_dispatches().new_receiver()
22-
23-
async for selected in select(dispatch_ready, dispatch_arrived):
24-
if selected_from(selected, dispatch_arrived):
25-
dispatch = selected.value
26-
match dispatch.type:
27-
case DISPATCH_TYPE_BATTERY_CHARGE:
28-
battery_pool = microgrid.battery_pool(dispatch.battery_set, task_id)
29-
battery_pool.set_charge(dispatch.power)
30-
...
31-
if selected_from(selected, dispatch_ready):
32-
dispatch = selected.value
33-
log.info("New dispatch arrived %s", dispatch)
18+
async def run():
19+
# dispatch helper sends out dispatches when they are due
20+
dispatch_arrived = dispatch_helper.updated_dispatches().new_receiver()
21+
dispatch_ready = dispatch_helper.ready_dispatches().new_receiver()
22+
23+
async for selected in select(dispatch_ready, dispatch_arrived):
24+
if selected_from(selected, dispatch_ready):
25+
dispatch = selected.value
26+
match dispatch.type:
27+
case DISPATCH_TYPE_BATTERY_CHARGE:
28+
battery_pool = microgrid.battery_pool(dispatch.battery_set, task_id)
29+
battery_pool.set_charge(dispatch.power)
30+
...
31+
if selected_from(selected, dispatch_arrived):
32+
match selected.value:
33+
case Created(dispatch):
34+
log.info("New dispatch arrived %s", dispatch)
35+
...
36+
case Updated(dispatch):
37+
log.info("Dispatch updated %s", dispatch)
38+
...
39+
case Deleted(dispatch):
40+
log.info("Dispatch deleted %s", dispatch)
41+
...
3442
```
3543

3644
## Supported Platforms

src/frequenz/dispatch/__init__.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from frequenz.channels import Broadcast, Receiver
88
from frequenz.client.dispatch.types import Dispatch
99

10-
from frequenz.dispatch.actor import DispatchActor
10+
from frequenz.dispatch.actor import DispatchActor, DispatchEvent
1111

1212
__all__ = ["Dispatcher"]
1313

@@ -41,26 +41,26 @@ def __init__(
4141
svc_addr: The service address.
4242
"""
4343
self._ready_channel = Broadcast[Dispatch]("ready_dispatches")
44-
self._new_channel = Broadcast[Dispatch]("new_dispatches")
44+
self._updated_channel = Broadcast[DispatchEvent]("new_dispatches")
4545
self._actor = DispatchActor(
4646
microgrid_id,
4747
grpc_channel,
4848
svc_addr,
49-
self._new_channel.new_sender(),
49+
self._updated_channel.new_sender(),
5050
self._ready_channel.new_sender(),
5151
)
5252

5353
async def start(self) -> None:
5454
"""Start the actor."""
5555
self._actor.start()
5656

57-
def new_dispatches(self) -> Receiver[Dispatch]:
58-
"""Return new dispatches receiver.
57+
def updated_dispatches(self) -> Receiver[DispatchEvent]:
58+
"""Return new, updated or deleted dispatches receiver.
5959
6060
Returns:
6161
A new receiver for new dispatches.
6262
"""
63-
return self._new_channel.new_receiver()
63+
return self._updated_channel.new_receiver()
6464

6565
def ready_dispatches(self) -> Receiver[Dispatch]:
6666
"""Return ready dispatches receiver.

src/frequenz/dispatch/actor.py

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import asyncio
77
import logging
8+
from dataclasses import dataclass
89
from datetime import datetime, timedelta, timezone
910
from typing import cast
1011

@@ -51,6 +52,36 @@
5152
"""To map from our Weekday enum to the dateutil library enum."""
5253

5354

55+
@dataclass(frozen=True)
56+
class _DispatchEventBase:
57+
dispatch: Dispatch
58+
"""The dispatch that this event is about.
59+
60+
Objects of this base class are sent over the channel when a dispatch is
61+
created, updated or deleted.
62+
"""
63+
64+
65+
class Created(_DispatchEventBase):
66+
"""Wraps a dispatch that was created."""
67+
68+
69+
class Updated(_DispatchEventBase):
70+
"""Wraps a dispatch that was updated."""
71+
72+
73+
class Deleted(_DispatchEventBase):
74+
"""Wraps a dispatch that was deleted."""
75+
76+
77+
DispatchEvent = Created | Updated | Deleted
78+
"""Type that is sent over the channel for dispatch updates.
79+
80+
This type is used to send dispatches that were created, updated or deleted
81+
over the channel.
82+
"""
83+
84+
5485
class DispatchActor(Actor):
5586
"""Dispatch actor.
5687
@@ -66,7 +97,7 @@ def __init__(
6697
microgrid_id: int,
6798
grpc_channel: grpc.aio.Channel,
6899
svc_addr: str,
69-
updated_dispatch_sender: Sender[Dispatch],
100+
updated_dispatch_sender: Sender[DispatchEvent],
70101
ready_dispatch_sender: Sender[Dispatch],
71102
) -> None:
72103
"""Initialize the actor.
@@ -112,11 +143,11 @@ async def _fetch(self) -> None:
112143
if not old_dispatch:
113144
self._update_dispatch_schedule(dispatch, None)
114145
_logger.info("New dispatch: %s", dispatch)
115-
await self._updated_dispatch_sender.send(dispatch)
146+
await self._updated_dispatch_sender.send(Created(dispatch=dispatch))
116147
elif dispatch.update_time != old_dispatch.update_time:
117148
self._update_dispatch_schedule(dispatch, old_dispatch)
118149
_logger.info("Updated dispatch: %s", dispatch)
119-
await self._updated_dispatch_sender.send(dispatch)
150+
await self._updated_dispatch_sender.send(Updated(dispatch=dispatch))
120151

121152
except grpc.aio.AioRpcError as error:
122153
_logger.error("Error fetching dispatches: %s", error)
@@ -125,6 +156,7 @@ async def _fetch(self) -> None:
125156

126157
for dispatch in old_dispatches.values():
127158
_logger.info("Deleted dispatch: %s", dispatch)
159+
await self._updated_dispatch_sender.send(Deleted(dispatch=dispatch))
128160
if task := self._scheduled.pop(dispatch.id, None):
129161
task.cancel()
130162

tests/test_frequenz_dispatch.py

Lines changed: 104 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
"""Tests for the frequenz.dispatch.actor package."""
55

66
import asyncio
7-
from dataclasses import dataclass
7+
from dataclasses import dataclass, replace
88
from datetime import datetime, timedelta, timezone
99
from random import randint
10-
from typing import AsyncIterator, Iterator
10+
from typing import AsyncIterator, Iterator, TypeVar
1111
from unittest.mock import MagicMock
1212

1313
import async_solipsism
@@ -16,10 +16,16 @@
1616
from frequenz.channels._broadcast import Sender
1717
from frequenz.client.dispatch.test.client import FakeClient, to_create_params
1818
from frequenz.client.dispatch.test.generator import DispatchGenerator
19-
from frequenz.client.dispatch.types import Dispatch
19+
from frequenz.client.dispatch.types import Dispatch, Frequency
2020
from pytest import fixture
2121

22-
from frequenz.dispatch.actor import DispatchActor
22+
from frequenz.dispatch.actor import (
23+
Created,
24+
Deleted,
25+
DispatchActor,
26+
DispatchEvent,
27+
Updated,
28+
)
2329

2430

2531
# This method replaces the event loop for all tests in the file.
@@ -45,13 +51,13 @@ def _now() -> datetime:
4551
return datetime.now(tz=timezone.utc)
4652

4753

48-
@dataclass
54+
@dataclass(frozen=True)
4955
class ActorTestEnv:
5056
"""Test environment for the actor."""
5157

5258
actor: DispatchActor
5359
"""The actor under test."""
54-
updated_dispatches: Receiver[Dispatch]
60+
updated_dispatches: Receiver[DispatchEvent]
5561
"""The receiver for updated dispatches."""
5662
ready_dispatches: Receiver[Dispatch]
5763
"""The receiver for ready dispatches."""
@@ -64,8 +70,9 @@ class ActorTestEnv:
6470
@fixture
6571
async def actor_env() -> AsyncIterator[ActorTestEnv]:
6672
"""Return an actor test environment."""
73+
T = TypeVar("T")
6774

68-
class YieldingSender(Sender[Dispatch]):
75+
class YieldingSender(Sender[T]):
6976
"""A sender that yields after sending.
7077
7178
For testing we want to manipulate the time after a call to send.
@@ -74,15 +81,12 @@ class YieldingSender(Sender[Dispatch]):
7481
opportunity to manipulate the time.
7582
"""
7683

77-
def __init__(self, channel: Broadcast[Dispatch]) -> None:
78-
super().__init__(channel)
79-
80-
async def send(self, msg: Dispatch) -> None:
84+
async def send(self, msg: T) -> None:
8185
"""Send the value and yield."""
8286
await super().send(msg)
8387
await asyncio.sleep(1)
8488

85-
updated_dispatches = Broadcast[Dispatch]("updated_dispatches")
89+
updated_dispatches = Broadcast[DispatchEvent]("updated_dispatches")
8690
ready_dispatches = Broadcast[Dispatch]("ready_dispatches")
8791
microgrid_id = randint(1, 100)
8892

@@ -122,13 +126,96 @@ async def test_new_dispatch_created(
122126
) -> None:
123127
"""Test that a new dispatch is created."""
124128
sample = generator.generate_dispatch(actor_env.microgrid_id)
129+
130+
await _test_new_dispatch_created(actor_env, sample)
131+
132+
133+
async def _test_new_dispatch_created(
134+
actor_env: ActorTestEnv,
135+
sample: Dispatch,
136+
) -> Dispatch:
137+
"""Test that a new dispatch is created.
138+
139+
Args:
140+
actor_env: The actor environment
141+
sample: The sample dispatch to create
142+
143+
Returns:
144+
The sample dispatch that was created
145+
"""
125146
await actor_env.client.create(**to_create_params(sample))
126147

127-
dispatch = await actor_env.updated_dispatches.receive()
128-
sample.update_time = dispatch.update_time
129-
sample.create_time = dispatch.create_time
130-
sample.id = dispatch.id
131-
assert dispatch == sample
148+
dispatch_event = await actor_env.updated_dispatches.receive()
149+
150+
match dispatch_event:
151+
case Deleted(dispatch) | Updated(dispatch):
152+
assert False, "Expected a created event"
153+
case Created(dispatch):
154+
sample.update_time = dispatch.update_time
155+
sample.create_time = dispatch.create_time
156+
sample.id = dispatch.id
157+
assert dispatch == sample
158+
159+
return sample
160+
161+
162+
async def test_existing_dispatch_updated(
163+
actor_env: ActorTestEnv,
164+
generator: DispatchGenerator,
165+
fake_time: time_machine.Coordinates,
166+
) -> None:
167+
"""Test that an existing dispatch is updated."""
168+
sample = generator.generate_dispatch(actor_env.microgrid_id)
169+
sample.active = False
170+
sample.recurrence.frequency = Frequency.DAILY
171+
172+
fake_time.shift(timedelta(seconds=1))
173+
174+
await _test_new_dispatch_created(actor_env, sample)
175+
fake_time.shift(timedelta(seconds=1))
176+
177+
await actor_env.client.update(
178+
sample.id,
179+
new_fields={
180+
"active": True,
181+
"recurrence.frequency": Frequency.UNSPECIFIED,
182+
},
183+
)
184+
fake_time.shift(timedelta(seconds=1))
185+
186+
dispatch_event = await actor_env.updated_dispatches.receive()
187+
match dispatch_event:
188+
case Created(dispatch) | Deleted(dispatch):
189+
assert False, "Expected an updated event"
190+
case Updated(dispatch):
191+
sample.update_time = dispatch.update_time
192+
sample.active = True
193+
sample.recurrence = replace(
194+
sample.recurrence, frequency=Frequency.UNSPECIFIED
195+
)
196+
assert dispatch == sample
197+
198+
199+
async def test_existing_dispatch_deleted(
200+
actor_env: ActorTestEnv,
201+
generator: DispatchGenerator,
202+
fake_time: time_machine.Coordinates,
203+
) -> None:
204+
"""Test that an existing dispatch is deleted."""
205+
sample = generator.generate_dispatch(actor_env.microgrid_id)
206+
207+
await _test_new_dispatch_created(actor_env, sample)
208+
209+
await actor_env.client.delete(sample.id)
210+
fake_time.shift(timedelta(seconds=1))
211+
212+
dispatch_event = await actor_env.updated_dispatches.receive()
213+
match dispatch_event:
214+
case Created(dispatch) | Updated(dispatch):
215+
assert False, "Expected a deleted event"
216+
case Deleted(dispatch):
217+
sample.create_time = dispatch.create_time
218+
assert dispatch == sample
132219

133220

134221
async def test_dispatch_schedule(

0 commit comments

Comments
 (0)