|
1 | 1 | import pytest
|
2 | 2 | from datetime import datetime
|
3 |
| -from typing import List, cast, Mapping, Any |
| 3 | +from typing import List, Tuple, cast, Mapping, Any |
| 4 | +from google.protobuf.timestamp_pb2 import Timestamp |
4 | 5 |
|
5 | 6 | from grpclib.testing import ChannelFor
|
6 | 7 |
|
|
21 | 22 | BINARY_DATA = b"binary_data"
|
22 | 23 | METHOD_NAME = "method_name"
|
23 | 24 | DATETIMES = (datetime.now(), datetime.now())
|
24 |
| -TIMESTAMPS = (datetime_to_timestamp(DATETIMES[0]), datetime_to_timestamp(DATETIMES[1])) |
| 25 | +TIMESTAMPS = cast(Tuple[Timestamp, Timestamp], (datetime_to_timestamp(DATETIMES[0]), datetime_to_timestamp(DATETIMES[1]))) |
25 | 26 | METHOD_PARAMETERS = {}
|
26 | 27 | TABULAR_DATA = [{"key": "value"}]
|
27 | 28 | FILE_NAME = "file_name"
|
@@ -51,11 +52,27 @@ async def test_binary_data_capture_upload(self, service: MockDataSync):
|
51 | 52 | tags=TAGS,
|
52 | 53 | data_request_times=DATETIMES,
|
53 | 54 | binary_data=BINARY_DATA,
|
| 55 | + file_extension=".txt", |
54 | 56 | )
|
55 | 57 | self.assert_sensor_contents(sensor_contents=list(service.sensor_contents), is_binary=True)
|
56 | 58 | self.assert_metadata(metadata=service.metadata)
|
| 59 | + assert service.metadata.file_extension == ".txt" |
57 | 60 | assert file_id == FILE_UPLOAD_RESPONSE
|
58 | 61 |
|
| 62 | + # Test extension dot prepend |
| 63 | + file_id = await client.binary_data_capture_upload( |
| 64 | + part_id=PART_ID, |
| 65 | + component_type=COMPONENT_TYPE, |
| 66 | + component_name=COMPONENT_NAME, |
| 67 | + method_name=METHOD_NAME, |
| 68 | + method_parameters=METHOD_PARAMETERS, |
| 69 | + tags=TAGS, |
| 70 | + data_request_times=DATETIMES, |
| 71 | + binary_data=BINARY_DATA, |
| 72 | + file_extension="txt", |
| 73 | + ) |
| 74 | + assert service.metadata.file_extension == ".txt" |
| 75 | + |
59 | 76 | @pytest.mark.asyncio
|
60 | 77 | async def test_tabular_data_capture_upload(self, service: MockDataSync):
|
61 | 78 | async with ChannelFor([service]) as channel:
|
|
0 commit comments