Skip to content

Commit 4b62a78

Browse files
committed
Make Dispatcher return receiver fetchers
This makes it more clear that new receivers are created each time the method is called. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 8077a82 commit 4b62a78

File tree

3 files changed

+36
-14
lines changed

3 files changed

+36
-14
lines changed

src/frequenz/dispatch/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,14 @@
33

44
"""A highlevel interface for the dispatch API."""
55

6-
from frequenz.dispatch._dispatcher import Dispatcher
6+
from frequenz.dispatch._dispatcher import Dispatcher, ReceiverFetcher
77
from frequenz.dispatch._event import Created, Deleted, DispatchEvent, Updated
88

99
__all__ = [
1010
"Created",
1111
"Deleted",
1212
"DispatchEvent",
1313
"Dispatcher",
14+
"ReceiverFetcher",
1415
"Updated",
1516
]

src/frequenz/dispatch/_dispatcher.py

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,37 @@
33

44
"""A highlevel interface for the dispatch API."""
55

6+
import abc
7+
from typing import Protocol, TypeVar
8+
69
import grpc.aio
710
from frequenz.channels import Broadcast, Receiver
811
from frequenz.client.dispatch.types import Dispatch
912

1013
from frequenz.dispatch._event import DispatchEvent
1114
from frequenz.dispatch.actor import DispatchActor
1215

16+
ReceivedT = TypeVar("ReceivedT")
17+
"""The type being received."""
18+
19+
20+
class ReceiverFetcher(Protocol[ReceivedT]):
21+
"""An interface that just exposes a `new_receiver` method."""
22+
23+
@abc.abstractmethod
24+
def new_receiver(
25+
self, name: str | None = None, maxsize: int = 50
26+
) -> Receiver[ReceivedT]:
27+
"""Get a receiver from the channel.
28+
29+
Args:
30+
name: A name to identify the receiver in the logs.
31+
maxsize: The maximum size of the receiver.
32+
33+
Returns:
34+
A receiver instance.
35+
"""
36+
1337

1438
class Dispatcher:
1539
"""A highlevel interface for the dispatch API.
@@ -36,8 +60,8 @@ async def run():
3660
service_address = "localhost:50051"
3761
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
3862
dispatcher.start() # this will start the actor
39-
dispatch_arrived = dispatcher.updated_dispatches()
40-
dispatch_ready = dispatcher.ready_dispatches()
63+
dispatch_arrived = dispatcher.updated_dispatches.new_receiver()
64+
dispatch_ready = dispatcher.ready_dispatches.new_receiver()
4165
```
4266
"""
4367

@@ -65,18 +89,20 @@ async def start(self) -> None:
6589
"""Start the actor."""
6690
self._actor.start()
6791

68-
def updated_dispatches(self) -> Receiver[DispatchEvent]:
92+
@property
93+
def updated_dispatches(self) -> ReceiverFetcher[DispatchEvent]:
6994
"""Return new, updated or deleted dispatches receiver.
7095
7196
Returns:
7297
A new receiver for new dispatches.
7398
"""
74-
return self._updated_channel.new_receiver()
99+
return self._updated_channel
75100

76-
def ready_dispatches(self) -> Receiver[Dispatch]:
101+
@property
102+
def ready_dispatches(self) -> ReceiverFetcher[Dispatch]:
77103
"""Return ready dispatches receiver.
78104
79105
Returns:
80106
A new receiver for ready dispatches.
81107
"""
82-
return self._ready_channel.new_receiver()
108+
return self._ready_channel

tests/test_frequenz_dispatch.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,8 @@
1919
from frequenz.client.dispatch.types import Dispatch, Frequency
2020
from pytest import fixture
2121

22-
from frequenz.dispatch.actor import (
23-
Created,
24-
Deleted,
25-
DispatchActor,
26-
DispatchEvent,
27-
Updated,
28-
)
22+
from frequenz.dispatch import Created, Deleted, DispatchEvent, Updated
23+
from frequenz.dispatch.actor import DispatchActor
2924

3025

3126
# This method replaces the event loop for all tests in the file.

0 commit comments

Comments
 (0)