6
6
from __future__ import annotations
7
7
8
8
import asyncio
9
+ import itertools
9
10
import logging
10
11
from collections .abc import Callable , Iterable , Set
11
12
from dataclasses import replace
12
- from typing import Any , TypeVar
13
+ from functools import partial
14
+ from typing import Any , NotRequired , TypedDict , TypeVar , assert_never
13
15
14
16
from frequenz .api .common import components_pb2 , metrics_pb2
15
- from frequenz .api .microgrid import microgrid_pb2 , microgrid_pb2_grpc
17
+ from frequenz .api .microgrid import microgrid_pb2 , microgrid_pb2_grpc , sensor_pb2
16
18
from frequenz .channels import Receiver
17
19
from frequenz .client .base import channel , client , retry , streaming
18
20
from google .protobuf .empty_pb2 import Empty
35
37
from ._connection import Connection
36
38
from ._constants import RECEIVER_MAX_SIZE
37
39
from ._exception import ApiClientError , ClientNotConnected
38
- from ._id import ComponentId , MicrogridId
39
40
from ._metadata import Location , Metadata
41
+ from ._sensor_proto import sensor_data_samples_from_proto , sensor_from_proto
42
+ from .id import ComponentId , MicrogridId , SensorId
43
+ from .sensor import Sensor , SensorDataSamples , SensorMetric
40
44
41
45
DEFAULT_GRPC_CALL_TIMEOUT = 60.0
42
46
"""The default timeout for gRPC calls made by this client (in seconds)."""
@@ -96,6 +100,12 @@ def __init__(
96
100
self ._broadcasters : dict [
97
101
ComponentId , streaming .GrpcStreamBroadcaster [Any , Any ]
98
102
] = {}
103
+ self ._sensor_data_broadcasters : dict [
104
+ str ,
105
+ streaming .GrpcStreamBroadcaster [
106
+ microgrid_pb2 .ComponentData , SensorDataSamples
107
+ ],
108
+ ] = {}
99
109
self ._retry_strategy = retry_strategy
100
110
101
111
@property
@@ -117,15 +127,22 @@ async def __aexit__(
117
127
exc_tb : Any | None ,
118
128
) -> bool | None :
119
129
"""Close the gRPC channel and stop all broadcasters."""
120
- exceptions = [
130
+ exceptions = list (
121
131
exc
122
132
for exc in await asyncio .gather (
123
- * (broadcaster .stop () for broadcaster in self ._broadcasters .values ()),
133
+ * (
134
+ broadcaster .stop ()
135
+ for broadcaster in itertools .chain (
136
+ self ._broadcasters .values (),
137
+ self ._sensor_data_broadcasters .values (),
138
+ )
139
+ ),
124
140
return_exceptions = True ,
125
141
)
126
142
if isinstance (exc , BaseException )
127
- ]
143
+ )
128
144
self ._broadcasters .clear ()
145
+ self ._sensor_data_broadcasters .clear ()
129
146
130
147
result = None
131
148
try :
@@ -177,6 +194,33 @@ async def components( # noqa: DOC502 (raises ApiClientError indirectly)
177
194
178
195
return result
179
196
197
+ async def list_sensors ( # noqa: DOC502 (raises ApiClientError indirectly)
198
+ self ,
199
+ ) -> Iterable [Sensor ]:
200
+ """Fetch all the sensors present in the microgrid.
201
+
202
+ Returns:
203
+ Iterator whose elements are all the sensors in the microgrid.
204
+
205
+ Raises:
206
+ ApiClientError: If the are any errors communicating with the Microgrid API,
207
+ most likely a subclass of
208
+ [GrpcError][frequenz.client.microgrid.GrpcError].
209
+ """
210
+ component_list = await client .call_stub_method (
211
+ self ,
212
+ lambda : self .stub .ListComponents (
213
+ microgrid_pb2 .ComponentFilter (
214
+ categories = [
215
+ components_pb2 .ComponentCategory .COMPONENT_CATEGORY_SENSOR
216
+ ]
217
+ ),
218
+ timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
219
+ ),
220
+ method_name = "ListComponents" ,
221
+ )
222
+ return map (sensor_from_proto , component_list .components )
223
+
180
224
async def metadata (self ) -> Metadata :
181
225
"""Fetch the microgrid metadata.
182
226
@@ -539,3 +583,91 @@ async def set_bounds( # noqa: DOC503 (raises ApiClientError indirectly)
539
583
),
540
584
method_name = "AddInclusionBounds" ,
541
585
)
586
+
587
+ # noqa: DOC502 (Raises ApiClientError indirectly)
588
+ def stream_sensor_data (
589
+ self ,
590
+ sensor : SensorId | Sensor ,
591
+ metrics : Iterable [SensorMetric | int ] | None = None ,
592
+ * ,
593
+ buffer_size : int = 50 ,
594
+ ) -> Receiver [SensorDataSamples ]:
595
+ """Stream data samples from a sensor.
596
+
597
+ Warning:
598
+ Sensors may not support all metrics. If a sensor does not support
599
+ a given metric, then the returned data stream will not contain that metric.
600
+
601
+ There is no way to tell if a metric is not being received because the
602
+ sensor does not support it or because there is a transient issue when
603
+ retrieving the metric from the sensor.
604
+
605
+ The supported metrics by a sensor can even change with time, for example,
606
+ if a sensor is updated with new firmware.
607
+
608
+ Args:
609
+ sensor: The sensor to stream data from.
610
+ metrics: If not `None`, only the specified metrics will be retrieved.
611
+ Otherwise all available metrics will be retrieved.
612
+ buffer_size: The maximum number of messages to buffer in the returned
613
+ receiver. After this limit is reached, the oldest messages will be
614
+ dropped.
615
+
616
+ Returns:
617
+ A receiver to retrieve data from the sensor.
618
+ """
619
+ sensor_id = _get_sensor_id (sensor )
620
+ key = str (sensor_id )
621
+
622
+ class _ExtraArgs (TypedDict ):
623
+ metrics : NotRequired [frozenset [sensor_pb2 .SensorMetric .ValueType ]]
624
+
625
+ extra_args : _ExtraArgs = {}
626
+ if metrics is not None :
627
+ extra_args ["metrics" ] = frozenset (
628
+ [_get_sensor_metric_value (m ) for m in metrics ]
629
+ )
630
+ # We use the frozenset because iterables are not hashable
631
+ key += f"{ hash (extra_args ['metrics' ])} "
632
+
633
+ broadcaster = self ._sensor_data_broadcasters .get (key )
634
+ if broadcaster is None :
635
+ client_id = hex (id (self ))[2 :]
636
+ stream_name = f"microgrid-client-{ client_id } -sensor-data-{ key } "
637
+ broadcaster = streaming .GrpcStreamBroadcaster (
638
+ stream_name ,
639
+ lambda : aiter (
640
+ self .stub .StreamComponentData (
641
+ microgrid_pb2 .ComponentIdParam (id = sensor_id ),
642
+ timeout = DEFAULT_GRPC_CALL_TIMEOUT ,
643
+ )
644
+ ),
645
+ partial (sensor_data_samples_from_proto , ** extra_args ),
646
+ retry_strategy = self ._retry_strategy ,
647
+ )
648
+ self ._sensor_data_broadcasters [key ] = broadcaster
649
+ return broadcaster .new_receiver (maxsize = buffer_size )
650
+
651
+
652
+ def _get_sensor_id (sensor : SensorId | Sensor ) -> int :
653
+ """Get the sensor ID from a sensor or sensor ID."""
654
+ match sensor :
655
+ case SensorId ():
656
+ return int (sensor )
657
+ case Sensor ():
658
+ return int (sensor .id )
659
+ case unexpected :
660
+ assert_never (unexpected )
661
+
662
+
663
+ def _get_sensor_metric_value (
664
+ metric : SensorMetric | int ,
665
+ ) -> sensor_pb2 .SensorMetric .ValueType :
666
+ """Get the sensor metric ID from a sensor metric or sensor metric ID."""
667
+ match metric :
668
+ case SensorMetric ():
669
+ return sensor_pb2 .SensorMetric .ValueType (metric .value )
670
+ case int ():
671
+ return sensor_pb2 .SensorMetric .ValueType (metric )
672
+ case unexpected :
673
+ assert_never (unexpected )
0 commit comments