Skip to content

Commit a24d1c3

Browse files
authored
Fix sensor data streaming without metrics filter (#148)
When no metric filter was passed to `stream_sensor_data()` the transform callback failed because the `metrics` argument wasn't passed when calling `sensor_data_samples_from_proto()` internally, making the gRPC streaming task stop with an unhandled exception. This PR makes the `metrics` argument optional for `sensor_data_samples_from_proto()` to fix that. A test case with no metric filters is added too, and the test case testing metric filters is improved to actually test the filtering.
2 parents ec02eeb + da40a59 commit a24d1c3

File tree

2 files changed

+61
-5
lines changed

2 files changed

+61
-5
lines changed

src/frequenz/client/microgrid/_sensor_proto.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,14 @@ def sensor_from_proto_with_issues(
106106

107107
def sensor_data_samples_from_proto(
108108
message: microgrid_pb2.ComponentData,
109-
metrics: Set[sensor_pb2.SensorMetric.ValueType],
109+
metrics: Set[sensor_pb2.SensorMetric.ValueType] | None = None,
110110
) -> SensorDataSamples:
111111
"""Convert a protobuf component data message to a sensor data object.
112112
113113
Args:
114114
message: The protobuf message to convert.
115-
metrics: A set of metrics to filter the samples.
115+
metrics: If not `None`, only the specified metrics will be retrieved.
116+
Otherwise all available metrics will be retrieved.
116117
117118
Returns:
118119
The resulting `SensorDataSamples` object.
@@ -128,7 +129,7 @@ def sensor_data_samples_from_proto(
128129
metrics=[
129130
sensor_metric_sample_from_proto(ts, sample)
130131
for sample in message.sensor.data.sensor_data
131-
if sample.sensor_metric in metrics
132+
if metrics is None or sample.sensor_metric in metrics
132133
],
133134
states=[sensor_state_sample_from_proto(ts, message.sensor)],
134135
)

tests/test_client.py

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -912,7 +912,7 @@ async def test_set_bounds_grpc_error(client: _TestClient) -> None:
912912
await client.set_bounds(ComponentId(99), 0.0, 100.0)
913913

914914

915-
async def test_stream_sensor_data_success(
915+
async def test_stream_sensor_data_one_metric(
916916
sensor201: microgrid_pb2.Component, client: _TestClient
917917
) -> None:
918918
"""Test successful streaming of sensor data."""
@@ -933,7 +933,11 @@ async def stream_data_impl(
933933
sensor_pb2.SensorData(
934934
value=1.0,
935935
sensor_metric=sensor_pb2.SensorMetric.SENSOR_METRIC_TEMPERATURE,
936-
)
936+
),
937+
sensor_pb2.SensorData(
938+
value=2.0,
939+
sensor_metric=sensor_pb2.SensorMetric.SENSOR_METRIC_PRESSURE,
940+
),
937941
],
938942
),
939943
),
@@ -960,6 +964,57 @@ async def stream_data_impl(
960964
]
961965

962966

967+
async def test_stream_sensor_data_all_metrics(
968+
sensor201: microgrid_pb2.Component, client: _TestClient
969+
) -> None:
970+
"""Test successful streaming of sensor data."""
971+
now = datetime.now(timezone.utc)
972+
973+
async def stream_data_impl(
974+
*_: Any, **__: Any
975+
) -> AsyncIterator[microgrid_pb2.ComponentData]:
976+
yield microgrid_pb2.ComponentData(
977+
id=int(sensor201.id),
978+
ts=conversion.to_timestamp(now),
979+
sensor=sensor_pb2.Sensor(
980+
state=sensor_pb2.State(
981+
component_state=sensor_pb2.ComponentState.COMPONENT_STATE_OK
982+
),
983+
data=sensor_pb2.Data(
984+
sensor_data=[
985+
sensor_pb2.SensorData(
986+
value=1.0,
987+
sensor_metric=sensor_pb2.SensorMetric.SENSOR_METRIC_TEMPERATURE,
988+
),
989+
sensor_pb2.SensorData(
990+
value=2.0,
991+
sensor_metric=sensor_pb2.SensorMetric.SENSOR_METRIC_PRESSURE,
992+
),
993+
],
994+
),
995+
),
996+
)
997+
998+
client.mock_stub.StreamComponentData.side_effect = stream_data_impl
999+
receiver = client.stream_sensor_data(SensorId(sensor201.id))
1000+
sample = await receiver.receive()
1001+
1002+
assert isinstance(sample, SensorDataSamples)
1003+
assert int(sample.sensor_id) == sensor201.id
1004+
assert sample.states == [
1005+
SensorStateSample(
1006+
sampled_at=now,
1007+
states=frozenset({SensorStateCode.ON}),
1008+
warnings=frozenset(),
1009+
errors=frozenset(),
1010+
)
1011+
]
1012+
assert sample.metrics == [
1013+
SensorMetricSample(sampled_at=now, metric=SensorMetric.TEMPERATURE, value=1.0),
1014+
SensorMetricSample(sampled_at=now, metric=SensorMetric.PRESSURE, value=2.0),
1015+
]
1016+
1017+
9631018
async def test_stream_sensor_data_grpc_error(
9641019
sensor201: microgrid_pb2.Component, caplog: pytest.LogCaptureFixture
9651020
) -> None:

0 commit comments

Comments
 (0)