Skip to content

Commit 1954f92

Browse files
authored
Add timeouts to all api calls (#169)
2 parents fa059ed + 74690e6 commit 1954f92

File tree

3 files changed

+53
-8
lines changed

3 files changed

+53
-8
lines changed

RELEASE_NOTES.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
## New Features
1212

1313
* `dispatch-cli` supports now the parameter `--type` and `--running` to filter the list of running services by type and status, respectively.
14+
* Every call now has a default timeout of 60 seconds, streams terminate after five minutes. This can be influenced by the two new parameters for`DispatchApiClient.__init__()`:
15+
* `default_timeout: timedelta` (default: 60 seconds)
16+
* `stream_timeout: timedelta` (default: 5 minutes)
1417

1518
## Bug Fixes
1619

src/frequenz/client/dispatch/_client.py

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,19 +53,24 @@
5353
class DispatchApiClient(BaseApiClient[dispatch_pb2_grpc.MicrogridDispatchServiceStub]):
5454
"""Dispatch API client."""
5555

56+
# pylint: disable-next=too-many-arguments
5657
def __init__(
5758
self,
5859
*,
5960
server_url: str,
6061
key: str,
6162
connect: bool = True,
63+
call_timeout: timedelta = timedelta(seconds=60),
64+
stream_timeout: timedelta = timedelta(minutes=5),
6265
) -> None:
6366
"""Initialize the client.
6467
6568
Args:
6669
server_url: The URL of the server to connect to.
6770
key: API key to use for authentication.
6871
connect: Whether to connect to the service immediately.
72+
call_timeout: Timeout for gRPC calls, default is 60 seconds.
73+
stream_timeout: Timeout for gRPC streams, default is 5 minutes.
6974
"""
7075
super().__init__(
7176
server_url,
@@ -82,6 +87,19 @@ def __init__(
8287
] = {}
8388
"""A dictionary of streamers, keyed by microgrid_id."""
8489

90+
self._call_timeout_seconds = call_timeout.total_seconds()
91+
self._stream_timeout_seconds = stream_timeout.total_seconds()
92+
93+
@property
94+
def call_timeout(self) -> timedelta:
95+
"""Get the call timeout."""
96+
return timedelta(seconds=self._call_timeout_seconds)
97+
98+
@property
99+
def stream_timeout(self) -> timedelta:
100+
"""Get the stream timeout."""
101+
return timedelta(seconds=self._stream_timeout_seconds)
102+
85103
@property
86104
def stub(self) -> dispatch_pb2_grpc.MicrogridDispatchServiceAsyncStub:
87105
"""The stub for the service."""
@@ -177,7 +195,9 @@ def to_interval(
177195
while True:
178196
response = await cast(
179197
Awaitable[ListMicrogridDispatchesResponse],
180-
self.stub.ListMicrogridDispatches(request, metadata=self._metadata),
198+
self.stub.ListMicrogridDispatches(
199+
request, metadata=self._metadata, timeout=self._call_timeout_seconds
200+
),
181201
)
182202

183203
yield (Dispatch.from_protobuf(dispatch) for dispatch in response.dispatches)
@@ -234,7 +254,9 @@ def _get_stream(
234254
stream_method=lambda: cast(
235255
AsyncIterator[StreamMicrogridDispatchesResponse],
236256
self.stub.StreamMicrogridDispatches(
237-
request, metadata=self._metadata
257+
request,
258+
metadata=self._metadata,
259+
timeout=self._stream_timeout_seconds,
238260
),
239261
),
240262
transform=DispatchEvent.from_protobuf,
@@ -303,7 +325,9 @@ async def create( # pylint: disable=too-many-positional-arguments
303325
response = await cast(
304326
Awaitable[CreateMicrogridDispatchResponse],
305327
self.stub.CreateMicrogridDispatch(
306-
request.to_protobuf(), metadata=self._metadata
328+
request.to_protobuf(),
329+
metadata=self._metadata,
330+
timeout=self._call_timeout_seconds,
307331
),
308332
)
309333

@@ -394,7 +418,9 @@ async def update(
394418

395419
response = await cast(
396420
Awaitable[UpdateMicrogridDispatchResponse],
397-
self.stub.UpdateMicrogridDispatch(msg, metadata=self._metadata),
421+
self.stub.UpdateMicrogridDispatch(
422+
msg, metadata=self._metadata, timeout=self._call_timeout_seconds
423+
),
398424
)
399425

400426
return Dispatch.from_protobuf(response.dispatch)
@@ -414,7 +440,9 @@ async def get(self, *, microgrid_id: int, dispatch_id: int) -> Dispatch:
414440
)
415441
response = await cast(
416442
Awaitable[GetMicrogridDispatchResponse],
417-
self.stub.GetMicrogridDispatch(request, metadata=self._metadata),
443+
self.stub.GetMicrogridDispatch(
444+
request, metadata=self._metadata, timeout=self._call_timeout_seconds
445+
),
418446
)
419447
return Dispatch.from_protobuf(response.dispatch)
420448

@@ -430,5 +458,7 @@ async def delete(self, *, microgrid_id: int, dispatch_id: int) -> None:
430458
)
431459
await cast(
432460
Awaitable[None],
433-
self.stub.DeleteMicrogridDispatch(request, metadata=self._metadata),
461+
self.stub.DeleteMicrogridDispatch(
462+
request, metadata=self._metadata, timeout=self._call_timeout_seconds
463+
),
434464
)

src/frequenz/client/dispatch/test/_service.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,17 @@ def _check_access(self, metadata: grpc.aio.Metadata) -> None:
112112

113113
# pylint: disable=invalid-name
114114
async def ListMicrogridDispatches(
115-
self, request: ListMicrogridDispatchesRequest, metadata: grpc.aio.Metadata
115+
self,
116+
request: ListMicrogridDispatchesRequest,
117+
metadata: grpc.aio.Metadata,
118+
timeout: int = 5, # pylint: disable=unused-argument
116119
) -> ListMicrogridDispatchesResponse:
117120
"""List microgrid dispatches.
118121
119122
Args:
120123
request: The request.
121124
metadata: The metadata.
125+
timeout: timeout for the request, ignored in this mock.
122126
123127
Returns:
124128
The dispatch list.
@@ -141,13 +145,17 @@ async def ListMicrogridDispatches(
141145
)
142146

143147
async def StreamMicrogridDispatches(
144-
self, request: StreamMicrogridDispatchesRequest, metadata: grpc.aio.Metadata
148+
self,
149+
request: StreamMicrogridDispatchesRequest,
150+
metadata: grpc.aio.Metadata,
151+
timeout: int = 5, # pylint: disable=unused-argument
145152
) -> AsyncIterator[StreamMicrogridDispatchesResponse]:
146153
"""Stream microgrid dispatches changes.
147154
148155
Args:
149156
request: The request.
150157
metadata: The metadata.
158+
timeout: timeout for the request, ignored in this mock.
151159
152160
Returns:
153161
An async generator for dispatch changes.
@@ -212,6 +220,7 @@ async def CreateMicrogridDispatch(
212220
self,
213221
request: PBDispatchCreateRequest,
214222
metadata: grpc.aio.Metadata,
223+
timeout: int = 5, # pylint: disable=unused-argument
215224
) -> CreateMicrogridDispatchResponse:
216225
"""Create a new dispatch."""
217226
self._check_access(metadata)
@@ -240,6 +249,7 @@ async def UpdateMicrogridDispatch(
240249
self,
241250
request: UpdateMicrogridDispatchRequest,
242251
metadata: grpc.aio.Metadata,
252+
timeout: int = 5, # pylint: disable=unused-argument
243253
) -> UpdateMicrogridDispatchResponse:
244254
"""Update a dispatch."""
245255
self._check_access(metadata)
@@ -327,6 +337,7 @@ async def GetMicrogridDispatch(
327337
self,
328338
request: GetMicrogridDispatchRequest,
329339
metadata: grpc.aio.Metadata,
340+
timeout: int = 5, # pylint: disable=unused-argument
330341
) -> GetMicrogridDispatchResponse:
331342
"""Get a single dispatch."""
332343
self._check_access(metadata)
@@ -349,6 +360,7 @@ async def DeleteMicrogridDispatch(
349360
self,
350361
request: DeleteMicrogridDispatchRequest,
351362
metadata: grpc.aio.Metadata,
363+
timeout: int = 5, # pylint: disable=unused-argument
352364
) -> Empty:
353365
"""Delete a given dispatch."""
354366
self._check_access(metadata)

0 commit comments

Comments
 (0)