Skip to content

Commit 6dcf1fe

Browse files
committed
Add initial structure
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 1a2afde commit 6dcf1fe

File tree

7 files changed

+412
-36
lines changed

7 files changed

+412
-36
lines changed

pyproject.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,11 @@ packages = ["frequenz.dispatch"]
170170
strict = true
171171

172172
[[tool.mypy.overrides]]
173-
module = ["mkdocs_macros.*"]
173+
module = [
174+
"mkdocs_macros.*",
175+
"async_solipsism",
176+
"async_solipsism.*",
177+
]
174178
ignore_missing_imports = true
175179

176180
[tool.setuptools_scm]

src/frequenz/dispatch/__init__.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,70 @@
22
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
33

44
"""A highlevel interface for the dispatch API."""
5+
6+
import grpc.aio
7+
from frequenz.channels import Broadcast, Receiver
8+
from frequenz.client.dispatch.types import Dispatch
9+
10+
from frequenz.dispatch.actor import DispatchActor
11+
12+
__all__ = ["Dispatcher"]
13+
14+
15+
class Dispatcher:
16+
"""A highlevel interface for the dispatch API.
17+
18+
This class provides a highlevel interface to the dispatch API. It
19+
allows to receive new dispatches and ready dispatches.
20+
21+
Example:
22+
```python
23+
from frequenz.dispatch import Dispatcher
24+
25+
async def run():
26+
dispatcher = Dispatcher(API_CONNECTION_INFO)
27+
dispatcher.start() # this will start the actor
28+
dispatch_arrived = dispatcher.new_dispatches().new_receiver()
29+
dispatch_ready = dispatcher.ready_dispatches().new_receiver()
30+
```
31+
"""
32+
33+
def __init__(
34+
self, microgrid_id: int, grpc_channel: grpc.aio.Channel, svc_addr: str
35+
):
36+
"""Initialize the dispatcher.
37+
38+
Args:
39+
microgrid_id: The microgrid id.
40+
grpc_channel: The gRPC channel.
41+
svc_addr: The service address.
42+
"""
43+
self._ready_channel = Broadcast[Dispatch]("ready_dispatches")
44+
self._new_channel = Broadcast[Dispatch]("new_dispatches")
45+
self._actor = DispatchActor(
46+
microgrid_id,
47+
grpc_channel,
48+
svc_addr,
49+
self._new_channel.new_sender(),
50+
self._ready_channel.new_sender(),
51+
)
52+
53+
async def start(self) -> None:
54+
"""Start the actor."""
55+
self._actor.start()
56+
57+
def new_dispatches(self) -> Receiver[Dispatch]:
58+
"""Return new dispatches receiver.
59+
60+
Returns:
61+
A new receiver for new dispatches.
62+
"""
63+
return self._new_channel.new_receiver()
64+
65+
def ready_dispatches(self) -> Receiver[Dispatch]:
66+
"""Return ready dispatches receiver.
67+
68+
Returns:
69+
A new receiver for ready dispatches.
70+
"""
71+
return self._ready_channel.new_receiver()

src/frequenz/dispatch/actor.py

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""The dispatch actor."""
5+
6+
import asyncio
7+
import logging
8+
from datetime import datetime, timedelta, timezone
9+
10+
import grpc.aio
11+
from frequenz.channels import Sender
12+
from frequenz.client.dispatch import Client
13+
from frequenz.client.dispatch.types import Dispatch, Frequency
14+
from frequenz.sdk.actor import Actor
15+
16+
_FREQUENCY_MAP = {
17+
Frequency.MINUTELY: timedelta(minutes=1),
18+
Frequency.HOURLY: timedelta(hours=1),
19+
Frequency.DAILY: timedelta(days=1),
20+
Frequency.WEEKLY: timedelta(weeks=1),
21+
}
22+
23+
_MAX_AHEAD_SCHEDULE = timedelta(hours=5)
24+
25+
26+
class DispatchActor(Actor):
27+
"""Dispatch actor.
28+
29+
This actor is responsible for handling dispatches for a microgrid.
30+
31+
This means staying in sync with the API and scheduling
32+
dispatches as necessary.
33+
"""
34+
35+
# pylint: disable=too-many-arguments
36+
def __init__(
37+
self,
38+
microgrid_id: int,
39+
grpc_channel: grpc.aio.Channel,
40+
svc_addr: str,
41+
updated_dispatch_sender: Sender[Dispatch],
42+
ready_dispatch_sender: Sender[Dispatch],
43+
) -> None:
44+
"""Initialize the actor.
45+
46+
Args:
47+
microgrid_id: The microgrid ID to handle dispatches for.
48+
grpc_channel: The gRPC channel to use for communication with the API.
49+
svc_addr: Address of the service to connect to.
50+
updated_dispatch_sender: A sender for new or updated dispatches.
51+
ready_dispatch_sender: A sender for ready dispatches.
52+
"""
53+
super().__init__(name="dispatch")
54+
55+
self._client = Client(grpc_channel, svc_addr)
56+
self._dispatches: dict[int, Dispatch] = {}
57+
self._scheduled: dict[int, asyncio.Task[None]] = {}
58+
self._microgrid_id = microgrid_id
59+
self._updated_dispatch_sender = updated_dispatch_sender
60+
self._ready_dispatch_sender = ready_dispatch_sender
61+
62+
async def _run(self) -> None:
63+
"""Run the actor."""
64+
try:
65+
while True:
66+
# for _ in range(30):
67+
await self._fetch()
68+
await asyncio.sleep(timedelta(hours=5).total_seconds())
69+
except asyncio.CancelledError:
70+
for task in self._scheduled.values():
71+
task.cancel()
72+
raise
73+
74+
async def _fetch(self) -> None:
75+
"""Fetch all relevant dispatches."""
76+
old_dispatches = self._dispatches
77+
self._dispatches = {}
78+
79+
try:
80+
logging.info("Fetching dispatches for microgrid %s", self._microgrid_id)
81+
async for dispatch in self._client.list(microgrid_id=self._microgrid_id):
82+
self._dispatches[dispatch.id] = dispatch
83+
84+
old_dispatch = old_dispatches.pop(dispatch.id, None)
85+
if not old_dispatch:
86+
self._update_dispatch_schedule(dispatch, None)
87+
logging.info("New dispatch: %s", dispatch)
88+
await self._updated_dispatch_sender.send(dispatch)
89+
elif dispatch.update_time != old_dispatch.update_time:
90+
self._update_dispatch_schedule(dispatch, old_dispatch)
91+
logging.info("Updated dispatch: %s", dispatch)
92+
await self._updated_dispatch_sender.send(dispatch)
93+
94+
except grpc.aio.AioRpcError as error:
95+
logging.error("Error fetching dispatches: %s", error)
96+
self._dispatches = old_dispatches
97+
return
98+
99+
for dispatch in old_dispatches.values():
100+
logging.info("Deleted dispatch: %s", dispatch)
101+
if task := self._scheduled.pop(dispatch.id, None):
102+
task.cancel()
103+
104+
def _update_dispatch_schedule(
105+
self, dispatch: Dispatch, old_dispatch: Dispatch | None
106+
) -> None:
107+
"""Update the schedule for a dispatch.
108+
109+
Schedules, reschedules or cancels the dispatch based on the start_time
110+
and active status.
111+
112+
Args:
113+
dispatch: The dispatch to update the schedule for.
114+
old_dispatch: The old dispatch, if available.
115+
"""
116+
if (
117+
old_dispatch
118+
and old_dispatch.active
119+
and old_dispatch.start_time != dispatch.start_time
120+
):
121+
if task := self._scheduled.pop(dispatch.id, None):
122+
task.cancel()
123+
124+
if dispatch.active and dispatch.id not in self._scheduled:
125+
self._scheduled[dispatch.id] = asyncio.create_task(
126+
self._schedule_task(dispatch)
127+
)
128+
129+
async def _schedule_task(self, dispatch: Dispatch) -> None:
130+
"""Wait for a dispatch to become ready.
131+
132+
Waits for the dispatches next run and then notifies that it is ready.
133+
134+
Args:
135+
dispatch: The dispatch to schedule.
136+
"""
137+
while True:
138+
next_time = self.calculate_next_run(
139+
dispatch, datetime.now().replace(tzinfo=timezone.utc)
140+
)
141+
if next_time is None:
142+
logging.info("Dispatch finished: %s", dispatch)
143+
self._scheduled.pop(dispatch.id)
144+
return
145+
146+
if (
147+
next_time - datetime.now().replace(tzinfo=timezone.utc)
148+
> _MAX_AHEAD_SCHEDULE
149+
):
150+
await asyncio.sleep(_MAX_AHEAD_SCHEDULE.total_seconds())
151+
continue
152+
153+
logging.info("Dispatch %s scheduled for %s", dispatch.id, next_time)
154+
logging.info(
155+
"Sleeping for %s",
156+
next_time - datetime.now().replace(tzinfo=timezone.utc),
157+
)
158+
await asyncio.sleep(
159+
(
160+
next_time - datetime.now().replace(tzinfo=timezone.utc)
161+
).total_seconds()
162+
)
163+
logging.info("Dispatch ready: %s", dispatch)
164+
await self._ready_dispatch_sender.send(dispatch)
165+
166+
@classmethod
167+
def calculate_next_run(cls, dispatch: Dispatch, _from: datetime) -> datetime | None:
168+
"""Calculate the next run of a dispatch.
169+
170+
Args:
171+
dispatch: The dispatch to calculate the next run for.
172+
_from: The time to calculate the next run from.
173+
174+
Returns:
175+
The next run of the dispatch or None if the dispatch is finished.
176+
"""
177+
next_run = dispatch.start_time
178+
179+
while next_run < _from:
180+
match dispatch.recurrence.frequency:
181+
case (
182+
Frequency.MINUTELY
183+
| Frequency.HOURLY
184+
| Frequency.DAILY
185+
| Frequency.WEEKLY
186+
):
187+
next_run = (
188+
next_run
189+
+ _FREQUENCY_MAP[dispatch.recurrence.frequency]
190+
* dispatch.recurrence.interval
191+
)
192+
case Frequency.MONTHLY:
193+
new_month = next_run.month + dispatch.recurrence.interval
194+
new_year = next_run.year + new_month // 12
195+
next_run = next_run.replace(year=new_year, month=new_month % 12)
196+
case Frequency.UNSPECIFIED:
197+
return None
198+
199+
return next_run

src/frequenz/dispatch/actor/__init__.py

Lines changed: 0 additions & 25 deletions
This file was deleted.

src/frequenz/dispatch/actor/py.typed

Whitespace-only changes.

tests/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tests for the frequenz.dispatch package."""

0 commit comments

Comments
 (0)