Skip to content

Commit f541548

Browse files
committed
Add initial structure
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent ac3551c commit f541548

File tree

5 files changed

+140
-39
lines changed

5 files changed

+140
-39
lines changed

src/frequenz/dispatch/__init__.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,70 @@
22
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
33

44
"""A highlevel interface for the dispatch API."""
5+
6+
import grpc
7+
from frequenz.channels import Broadcast, Receiver
8+
from frequenz.client.dispatch.types import Dispatch
9+
10+
from frequenz.dispatch.actor import DispatchActor
11+
12+
__all__ = ["Dispatcher"]
13+
14+
15+
class Dispatcher:
16+
"""A highlevel interface for the dispatch API.
17+
18+
This class provides a highlevel interface to the dispatch API. It
19+
allows to receive new dispatches and ready dispatches.
20+
21+
Example:
22+
```python
23+
from frequenz.dispatch import Dispatcher
24+
25+
async def run():
26+
dispatcher = Dispatcher(API_CONNECTION_INFO)
27+
dispatcher.start() # this will start the actor
28+
dispatch_arrived = dispatcher.new_dispatches().new_receiver()
29+
dispatch_ready = dispatcher.ready_dispatches().new_receiver()
30+
```
31+
"""
32+
33+
def __init__(
34+
self, microgrid_id: int, grpc_channel: grpc.aio.Channel, svc_addr: str
35+
):
36+
"""Initialize the dispatcher.
37+
38+
Args:
39+
microgrid_id: The microgrid id.
40+
grpc_channel: The gRPC channel.
41+
svc_addr: The service address.
42+
"""
43+
self._ready_channel = Broadcast[Dispatch]("ready_dispatches")
44+
self._new_channel = Broadcast[Dispatch]("new_dispatches")
45+
self._actor = DispatchActor(
46+
microgrid_id,
47+
grpc_channel,
48+
svc_addr,
49+
self._new_channel.new_sender(),
50+
self._ready_channel.new_sender(),
51+
)
52+
53+
async def start(self) -> None:
54+
"""Start the actor."""
55+
self._actor.start()
56+
57+
def new_dispatches(self) -> Receiver[Dispatch]:
58+
"""Return new dispatches receiver.
59+
60+
Returns:
61+
A new receiver for new dispatches.
62+
"""
63+
return self._new_channel.new_receiver()
64+
65+
def ready_dispatches(self) -> Receiver[Dispatch]:
66+
"""Return ready dispatches receiver.
67+
68+
Returns:
69+
A new receiver for ready dispatches.
70+
"""
71+
return self._ready_channel.new_receiver()

src/frequenz/dispatch/actor.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""The dispatch actor."""
5+
6+
import asyncio
7+
import logging
8+
9+
import grpc
10+
from frequenz.channels import Sender
11+
from frequenz.client.dispatch import Client
12+
from frequenz.client.dispatch.types import Dispatch
13+
from frequenz.sdk.actor import Actor
14+
15+
16+
class DispatchActor(Actor):
17+
"""Dispatch actor.
18+
19+
This actor is responsible for handling dispatches for a microgrid.
20+
21+
This means staying in sync with the API and scheduling
22+
dispatches as necessary.
23+
"""
24+
25+
# pylint: disable=too-many-arguments
26+
def __init__(
27+
self,
28+
microgrid_id: int,
29+
grpc_channel: grpc.aio.Channel,
30+
svc_addr: str,
31+
new_dispatch_sender: Sender[Dispatch],
32+
ready_dispatch_sender: Sender[Dispatch],
33+
) -> None:
34+
"""Initialize the actor.
35+
36+
Args:
37+
microgrid_id: The microgrid ID to handle dispatches for.
38+
grpc_channel: The gRPC channel to use for communication with the API.
39+
svc_addr: Address of the service to connect to.
40+
new_dispatch_sender: A sender for new dispatches.
41+
ready_dispatch_sender: A sender for ready dispatches.
42+
"""
43+
super().__init__(name="dispatch")
44+
45+
self._client = Client(grpc_channel, svc_addr)
46+
self._dispatches: dict[int, Dispatch] = {}
47+
self._microgrid_id = microgrid_id
48+
self._new_dispatch_sender = new_dispatch_sender
49+
self._ready_dispatch_sender = ready_dispatch_sender
50+
51+
async def _run(self) -> None:
52+
"""Run the actor."""
53+
while True:
54+
await self._fetch()
55+
await asyncio.sleep(5)
56+
57+
async def _fetch(self) -> None:
58+
"""Fetch all relevant dispatches."""
59+
old_dispatches = self._dispatches
60+
self._dispatches = {}
61+
62+
try:
63+
logging.info("Fetching dispatches")
64+
async for dispatch in self._client.list(microgrid_id=1):
65+
self._dispatches[dispatch.id] = dispatch
66+
67+
if dispatch.id not in old_dispatches:
68+
logging.info("New dispatch: %s", dispatch)
69+
await self._new_dispatch_sender.send(dispatch)
70+
71+
except grpc.aio.AioRpcError as error:
72+
logging.error("Error fetching dispatches: %s", error)
73+
self._dispatches = old_dispatches

src/frequenz/dispatch/actor/__init__.py

Lines changed: 0 additions & 25 deletions
This file was deleted.

src/frequenz/dispatch/actor/py.typed

Whitespace-only changes.

tests/test_frequenz_dispatch.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,3 @@
22
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
33

44
"""Tests for the frequenz.dispatch.actor package."""
5-
import pytest
6-
7-
from frequenz.dispatch.actor import delete_me
8-
9-
10-
def test_frequenz_dispatch_succeeds() -> None: # TODO(cookiecutter): Remove
11-
"""Test that the delete_me function succeeds."""
12-
assert delete_me() is True
13-
14-
15-
def test_frequenz_dispatch_fails() -> None: # TODO(cookiecutter): Remove
16-
"""Test that the delete_me function fails."""
17-
with pytest.raises(RuntimeError, match="This function should be removed!"):
18-
delete_me(blow_up=True)

0 commit comments

Comments
 (0)