Skip to content

Commit 280d3df

Browse files
committed
Add initial structure
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent ac3551c commit 280d3df

File tree

5 files changed

+321
-34
lines changed

5 files changed

+321
-34
lines changed

src/frequenz/dispatch/__init__.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,70 @@
22
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
33

44
"""A highlevel interface for the dispatch API."""
5+
6+
import grpc.aio
7+
from frequenz.channels import Broadcast, Receiver
8+
from frequenz.client.dispatch.types import Dispatch
9+
10+
from frequenz.dispatch.actor import DispatchActor
11+
12+
__all__ = ["Dispatcher"]
13+
14+
15+
class Dispatcher:
16+
"""A highlevel interface for the dispatch API.
17+
18+
This class provides a highlevel interface to the dispatch API. It
19+
allows to receive new dispatches and ready dispatches.
20+
21+
Example:
22+
```python
23+
from frequenz.dispatch import Dispatcher
24+
25+
async def run():
26+
dispatcher = Dispatcher(API_CONNECTION_INFO)
27+
dispatcher.start() # this will start the actor
28+
dispatch_arrived = dispatcher.new_dispatches().new_receiver()
29+
dispatch_ready = dispatcher.ready_dispatches().new_receiver()
30+
```
31+
"""
32+
33+
def __init__(
34+
self, microgrid_id: int, grpc_channel: grpc.aio.Channel, svc_addr: str
35+
):
36+
"""Initialize the dispatcher.
37+
38+
Args:
39+
microgrid_id: The microgrid id.
40+
grpc_channel: The gRPC channel.
41+
svc_addr: The service address.
42+
"""
43+
self._ready_channel = Broadcast[Dispatch]("ready_dispatches")
44+
self._new_channel = Broadcast[Dispatch]("new_dispatches")
45+
self._actor = DispatchActor(
46+
microgrid_id,
47+
grpc_channel,
48+
svc_addr,
49+
self._new_channel.new_sender(),
50+
self._ready_channel.new_sender(),
51+
)
52+
53+
async def start(self) -> None:
54+
"""Start the actor."""
55+
self._actor.start()
56+
57+
def new_dispatches(self) -> Receiver[Dispatch]:
58+
"""Return new dispatches receiver.
59+
60+
Returns:
61+
A new receiver for new dispatches.
62+
"""
63+
return self._new_channel.new_receiver()
64+
65+
def ready_dispatches(self) -> Receiver[Dispatch]:
66+
"""Return ready dispatches receiver.
67+
68+
Returns:
69+
A new receiver for ready dispatches.
70+
"""
71+
return self._ready_channel.new_receiver()

src/frequenz/dispatch/actor.py

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""The dispatch actor."""
5+
6+
import asyncio
7+
import logging
8+
from datetime import datetime, timedelta, timezone
9+
10+
import grpc.aio
11+
from frequenz.channels import Sender
12+
from frequenz.client.dispatch import Client
13+
from frequenz.client.dispatch.types import Dispatch, Frequency
14+
from frequenz.sdk.actor import Actor
15+
16+
_FREQUENCY_MAP = {
17+
Frequency.MINUTELY: timedelta(minutes=1),
18+
Frequency.HOURLY: timedelta(hours=1),
19+
Frequency.DAILY: timedelta(days=1),
20+
Frequency.WEEKLY: timedelta(weeks=1),
21+
}
22+
23+
24+
class DispatchActor(Actor):
25+
"""Dispatch actor.
26+
27+
This actor is responsible for handling dispatches for a microgrid.
28+
29+
This means staying in sync with the API and scheduling
30+
dispatches as necessary.
31+
"""
32+
33+
# pylint: disable=too-many-arguments
34+
def __init__(
35+
self,
36+
microgrid_id: int,
37+
grpc_channel: grpc.aio.Channel,
38+
svc_addr: str,
39+
updated_dispatch_sender: Sender[Dispatch],
40+
ready_dispatch_sender: Sender[Dispatch],
41+
) -> None:
42+
"""Initialize the actor.
43+
44+
Args:
45+
microgrid_id: The microgrid ID to handle dispatches for.
46+
grpc_channel: The gRPC channel to use for communication with the API.
47+
svc_addr: Address of the service to connect to.
48+
updated_dispatch_sender: A sender for new or updated dispatches.
49+
ready_dispatch_sender: A sender for ready dispatches.
50+
"""
51+
super().__init__(name="dispatch")
52+
53+
self._client = Client(grpc_channel, svc_addr)
54+
self._dispatches: dict[int, Dispatch] = {}
55+
self._scheduled: dict[int, asyncio.Task[None]] = {}
56+
self._microgrid_id = microgrid_id
57+
self._updated_dispatch_sender = updated_dispatch_sender
58+
self._ready_dispatch_sender = ready_dispatch_sender
59+
60+
async def _run(self) -> None:
61+
"""Run the actor."""
62+
while True:
63+
await self._fetch()
64+
await asyncio.sleep(timedelta(minutes=5).total_seconds())
65+
66+
async def _fetch(self) -> None:
67+
"""Fetch all relevant dispatches."""
68+
old_dispatches = self._dispatches
69+
self._dispatches = {}
70+
# breakpoint()
71+
try:
72+
logging.info("Fetching dispatches")
73+
async for dispatch in self._client.list(microgrid_id=self._microgrid_id):
74+
self._dispatches[dispatch.id] = dispatch
75+
76+
old_dispatch = old_dispatches.pop(dispatch.id, None)
77+
if not old_dispatch:
78+
self._update_dispatch_schedule(dispatch, None)
79+
logging.info("New dispatch: %s", dispatch)
80+
await self._updated_dispatch_sender.send(dispatch)
81+
elif dispatch.update_time != old_dispatch.update_time:
82+
self._update_dispatch_schedule(dispatch, old_dispatch)
83+
logging.info("Updated dispatch: %s", dispatch)
84+
await self._updated_dispatch_sender.send(dispatch)
85+
86+
except grpc.aio.AioRpcError as error:
87+
logging.error("Error fetching dispatches: %s", error)
88+
self._dispatches = old_dispatches
89+
return
90+
91+
for dispatch in old_dispatches.values():
92+
logging.info("Deleted dispatch: %s", dispatch)
93+
if task := self._scheduled.pop(dispatch.id, None):
94+
task.cancel()
95+
96+
def _update_dispatch_schedule(
97+
self, dispatch: Dispatch, old_dispatch: Dispatch | None
98+
) -> None:
99+
"""Update the schedule for a dispatch.
100+
101+
Schedules, reschedules or cancels the dispatch based on the start_time
102+
and active status.
103+
104+
Args:
105+
dispatch: The dispatch to update the schedule for.
106+
old_dispatch: The old dispatch, if available.
107+
"""
108+
if (
109+
old_dispatch
110+
and old_dispatch.active
111+
and old_dispatch.start_time != dispatch.start_time
112+
):
113+
if task := self._scheduled.pop(dispatch.id, None):
114+
task.cancel()
115+
116+
if dispatch.active and dispatch.id not in self._scheduled:
117+
self._scheduled[dispatch.id] = asyncio.create_task(
118+
self._schedule_task(dispatch)
119+
)
120+
121+
async def _schedule_task(self, dispatch: Dispatch) -> None:
122+
"""Wait for a dispatch to become ready.
123+
124+
Waits for the dispatches next run and then notifies that it is ready.
125+
126+
Args:
127+
dispatch: The dispatch to schedule.
128+
"""
129+
while True:
130+
next_time = self._calculate_next_run(
131+
dispatch, datetime.now().replace(tzinfo=timezone.utc)
132+
)
133+
if next_time is None:
134+
logging.info("Dispatch finished: %s", dispatch)
135+
self._scheduled.pop(dispatch.id)
136+
return
137+
138+
await asyncio.sleep(
139+
(
140+
next_time - datetime.now().replace(tzinfo=timezone.utc)
141+
).total_seconds()
142+
)
143+
144+
await self._ready_dispatch_sender.send(dispatch)
145+
logging.info("Dispatch ready: %s", dispatch)
146+
147+
def _calculate_next_run(
148+
self, dispatch: Dispatch, _from: datetime
149+
) -> datetime | None:
150+
"""Calculate the next run of a dispatch.
151+
152+
Args:
153+
dispatch: The dispatch to calculate the next run for.
154+
_from: The time to calculate the next run from.
155+
156+
Returns:
157+
The next run of the dispatch or None if the dispatch is finished.
158+
"""
159+
next_run = dispatch.start_time
160+
161+
while next_run < _from:
162+
match dispatch.recurrence.frequency:
163+
case (
164+
Frequency.MINUTELY
165+
| Frequency.HOURLY
166+
| Frequency.DAILY
167+
| Frequency.WEEKLY
168+
):
169+
next_run = (
170+
next_run
171+
+ _FREQUENCY_MAP[dispatch.recurrence.frequency]
172+
* dispatch.recurrence.interval
173+
)
174+
case Frequency.MONTHLY:
175+
new_month = next_run.month + dispatch.recurrence.interval
176+
new_year = next_run.year + new_month // 12
177+
next_run = next_run.replace(year=new_year, month=new_month % 12)
178+
case Frequency.UNSPECIFIED:
179+
return None
180+
181+
return next_run

src/frequenz/dispatch/actor/__init__.py

Lines changed: 0 additions & 25 deletions
This file was deleted.

src/frequenz/dispatch/actor/py.typed

Whitespace-only changes.

tests/test_frequenz_dispatch.py

Lines changed: 73 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,81 @@
22
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
33

44
"""Tests for the frequenz.dispatch.actor package."""
5-
import pytest
65

7-
from frequenz.dispatch.actor import delete_me
86

7+
from dataclasses import dataclass
8+
from typing import Iterator
99

10-
def test_frequenz_dispatch_succeeds() -> None: # TODO(cookiecutter): Remove
11-
"""Test that the delete_me function succeeds."""
12-
assert delete_me() is True
10+
import async_solipsism
11+
import grpc.aio
12+
from frequenz.channels import Broadcast, Receiver
13+
from frequenz.client.dispatch.test.client import FakeClient, to_create_params
14+
from frequenz.client.dispatch.test.generator import DispatchGenerator
15+
from frequenz.client.dispatch.types import Dispatch
16+
from pytest import fixture
1317

18+
from frequenz.dispatch.actor import DispatchActor
1419

15-
def test_frequenz_dispatch_fails() -> None: # TODO(cookiecutter): Remove
16-
"""Test that the delete_me function fails."""
17-
with pytest.raises(RuntimeError, match="This function should be removed!"):
18-
delete_me(blow_up=True)
20+
21+
# This method replaces the event loop for all tests in the file.
22+
@fixture()
23+
def event_loop() -> Iterator[async_solipsism.EventLoop]:
24+
"""Replace the loop with one that doesn't interact with the outside world."""
25+
loop = async_solipsism.EventLoop()
26+
yield loop
27+
loop.close()
28+
29+
30+
@dataclass
31+
class ActorTestEnv:
32+
"""Test environment for the actor."""
33+
34+
actor: DispatchActor
35+
updated_dispatches: Receiver[Dispatch]
36+
ready_dispatches: Receiver[Dispatch]
37+
client: FakeClient
38+
39+
40+
@fixture
41+
def actor_env() -> ActorTestEnv:
42+
"""Return an actor test environment."""
43+
updated_dispatches = Broadcast[Dispatch]("updated_dispatches")
44+
ready_dispatches = Broadcast[Dispatch]("ready_dispatches")
45+
46+
actor = DispatchActor(
47+
microgrid_id=1,
48+
grpc_channel=grpc.aio.insecure_channel("mock"),
49+
svc_addr="localhost",
50+
updated_dispatch_sender=updated_dispatches.new_sender(),
51+
ready_dispatch_sender=ready_dispatches.new_sender(),
52+
)
53+
54+
client = FakeClient()
55+
actor._client = client # pylint: disable=protected-access
56+
57+
actor.start()
58+
59+
return ActorTestEnv(
60+
actor,
61+
updated_dispatches.new_receiver(),
62+
ready_dispatches.new_receiver(),
63+
client,
64+
)
65+
66+
67+
@fixture
68+
def generator() -> DispatchGenerator:
69+
"""Return a dispatch generator."""
70+
return DispatchGenerator()
71+
72+
73+
async def test_new_dispatch_created(
74+
actor_env: ActorTestEnv, generator: DispatchGenerator
75+
) -> None:
76+
"""Test that a new dispatch is created."""
77+
breakpoint()
78+
sample = generator.generate_dispatch()
79+
await actor_env.client.create(**to_create_params(sample))
80+
81+
dispatch = await actor_env.updated_dispatches.receive()
82+
assert dispatch == sample

0 commit comments

Comments
 (0)