|
5 | 5 |
|
6 | 6 | from asynctest import CoroutineMock
|
7 | 7 |
|
| 8 | +import zigpy |
8 | 9 | import zigpy_znp.types as t
|
9 | 10 | import zigpy_znp.commands as c
|
10 | 11 | import zigpy_znp.config as conf
|
@@ -79,12 +80,18 @@ async def callback():
|
79 | 80 |
|
80 | 81 | def reply_to(self, request, responses):
|
81 | 82 | async def callback():
|
| 83 | + callback.call_count += 1 |
| 84 | + |
82 | 85 | for response in responses:
|
83 | 86 | await asyncio.sleep(0.1)
|
84 | 87 | self.send(response)
|
85 | 88 |
|
| 89 | + callback.call_count = 0 |
| 90 | + |
86 | 91 | self.callback_for_response(request, lambda _: asyncio.create_task(callback()))
|
87 | 92 |
|
| 93 | + return callback |
| 94 | + |
88 | 95 | def ping_replier(self, request):
|
89 | 96 | self.send(c.SysCommands.Ping.Rsp(Capabilities=t.MTCapabilities(1625)))
|
90 | 97 |
|
@@ -219,6 +226,21 @@ def register_endpoint(request):
|
219 | 226 | assert len(endpoints) == 5
|
220 | 227 |
|
221 | 228 |
|
| 229 | +@pytest_mark_asyncio_timeout(seconds=3) |
| 230 | +async def test_application_startup_tx_power(application): |
| 231 | + app, znp_server = application |
| 232 | + |
| 233 | + set_tx_power = znp_server.reply_once_to( |
| 234 | + request=c.SysCommands.SetTxPower.Req(TXPower=19), |
| 235 | + responses=[c.SysCommands.SetTxPower.Rsp(Status=t.Status.Success)], |
| 236 | + ) |
| 237 | + |
| 238 | + app.update_config({conf.CONF_ZNP_CONFIG: {conf.CONF_TX_POWER: 19}}) |
| 239 | + |
| 240 | + await app.startup(auto_form=False) |
| 241 | + await set_tx_power |
| 242 | + |
| 243 | + |
222 | 244 | @pytest_mark_asyncio_timeout(seconds=3)
|
223 | 245 | async def test_permit_join(application):
|
224 | 246 | app, znp_server = application
|
|
0 commit comments