|
3 | 3 |
|
4 | 4 | import pytest
|
5 | 5 |
|
| 6 | +from asynctest import CoroutineMock |
| 7 | + |
6 | 8 | import zigpy_znp.types as t
|
7 | 9 | import zigpy_znp.commands as c
|
8 | 10 | import zigpy_znp.config as conf
|
|
13 | 15 |
|
14 | 16 | from zigpy_znp.api import ZNP
|
15 | 17 | from zigpy_znp.uart import connect as uart_connect
|
| 18 | +from zigpy_znp.types.nvids import NwkNvIds |
16 | 19 | from zigpy_znp.zigbee.application import ControllerApplication
|
17 | 20 |
|
18 | 21 |
|
@@ -435,7 +438,7 @@ async def test_reconnect(event_loop, application):
|
435 | 438 | assert app._znp._uart is not None
|
436 | 439 |
|
437 | 440 |
|
438 |
| -@pytest_mark_asyncio_timeout() |
| 441 | +@pytest_mark_asyncio_timeout(seconds=3) |
439 | 442 | async def test_auto_connect(mocker, application):
|
440 | 443 | AUTO_DETECTED_PORT = "/dev/ttyFAKE0"
|
441 | 444 |
|
@@ -525,3 +528,122 @@ async def test_zdo_request_interception(application, mocker):
|
525 | 528 | await active_ep_req
|
526 | 529 |
|
527 | 530 | assert status == t.Status.Success
|
| 531 | + |
| 532 | + |
| 533 | +@pytest_mark_asyncio_timeout() |
| 534 | +async def test_update_network_noop(mocker, application): |
| 535 | + app, znp_server = application |
| 536 | + |
| 537 | + await app.startup(auto_form=False) |
| 538 | + |
| 539 | + app._znp = mocker.NonCallableMock() |
| 540 | + |
| 541 | + # Nothing should be called |
| 542 | + await app.update_network(reset=False) |
| 543 | + |
| 544 | + # This will call _znp.request and fail |
| 545 | + with pytest.raises(TypeError): |
| 546 | + await app.update_network(reset=True) |
| 547 | + |
| 548 | + |
| 549 | +@pytest_mark_asyncio_timeout(seconds=5) |
| 550 | +async def test_update_network(mocker, caplog, application): |
| 551 | + app, znp_server = application |
| 552 | + |
| 553 | + await app.startup(auto_form=False) |
| 554 | + mocker.patch.object(app, "_reset", new=CoroutineMock()) |
| 555 | + |
| 556 | + channel = t.uint8_t(15) |
| 557 | + pan_id = t.PanId(0x1234) |
| 558 | + extended_pan_id = t.ExtendedPanId(range(8)) |
| 559 | + channels = t.Channels.from_channel_list([11, 15, 20]) |
| 560 | + network_key = t.KeyData(range(16)) |
| 561 | + |
| 562 | + channels_updated = znp_server.reply_once_to( |
| 563 | + request=c.UtilCommands.SetChannels.Req(Channels=channels), |
| 564 | + responses=[c.UtilCommands.SetChannels.Rsp(Status=t.Status.Success)], |
| 565 | + ) |
| 566 | + |
| 567 | + bdb_set_primary_channel = znp_server.reply_once_to( |
| 568 | + request=c.APPConfigCommands.BDBSetChannel.Req(IsPrimary=True, Channel=channels), |
| 569 | + responses=[c.APPConfigCommands.BDBSetChannel.Rsp(Status=t.Status.Success)], |
| 570 | + ) |
| 571 | + |
| 572 | + bdb_set_secondary_channel = znp_server.reply_once_to( |
| 573 | + request=c.APPConfigCommands.BDBSetChannel.Req( |
| 574 | + IsPrimary=False, Channel=t.Channels.NO_CHANNELS |
| 575 | + ), |
| 576 | + responses=[c.APPConfigCommands.BDBSetChannel.Rsp(Status=t.Status.Success)], |
| 577 | + ) |
| 578 | + |
| 579 | + set_pan_id = znp_server.reply_once_to( |
| 580 | + request=c.UtilCommands.SetPanId.Req(PanId=pan_id), |
| 581 | + responses=[c.UtilCommands.SetPanId.Rsp(Status=t.Status.Success)], |
| 582 | + ) |
| 583 | + |
| 584 | + set_extended_pan_id = znp_server.reply_once_to( |
| 585 | + request=c.SysCommands.OSALNVWrite.Req( |
| 586 | + Id=NwkNvIds.EXTENDED_PAN_ID, Offset=0, Value=extended_pan_id.serialize() |
| 587 | + ), |
| 588 | + responses=[c.SysCommands.OSALNVWrite.Rsp(Status=t.Status.Success)], |
| 589 | + ) |
| 590 | + |
| 591 | + set_network_key_util = znp_server.reply_once_to( |
| 592 | + request=c.UtilCommands.SetPreConfigKey.Req(PreConfigKey=network_key), |
| 593 | + responses=[c.UtilCommands.SetPreConfigKey.Rsp(Status=t.Status.Success)], |
| 594 | + ) |
| 595 | + |
| 596 | + set_network_key_nvram = znp_server.reply_once_to( |
| 597 | + request=c.SysCommands.OSALNVWrite.Req( |
| 598 | + Id=NwkNvIds.PRECFGKEYS_ENABLE, Offset=0, Value=t.Bool(True).serialize() |
| 599 | + ), |
| 600 | + responses=[c.SysCommands.OSALNVWrite.Rsp(Status=t.Status.Success)], |
| 601 | + ) |
| 602 | + |
| 603 | + # But it does succeed with a warning if you explicitly allow it |
| 604 | + with caplog.at_level(logging.WARNING): |
| 605 | + await app.update_network( |
| 606 | + channel=channel, |
| 607 | + channels=channels, |
| 608 | + extended_pan_id=extended_pan_id, |
| 609 | + network_key=network_key, |
| 610 | + pan_id=pan_id, |
| 611 | + tc_address=t.EUI64(range(8)), |
| 612 | + tc_link_key=t.KeyData(range(8)), |
| 613 | + update_id=0, |
| 614 | + reset=True, |
| 615 | + ) |
| 616 | + |
| 617 | + # We should receive a warning about setting a specific channel |
| 618 | + assert len(caplog.records) >= 1 |
| 619 | + assert any( |
| 620 | + "Cannot set a specific channel in config" in r.message for r in caplog.records |
| 621 | + ) |
| 622 | + |
| 623 | + await channels_updated |
| 624 | + await bdb_set_primary_channel |
| 625 | + await bdb_set_secondary_channel |
| 626 | + await set_pan_id |
| 627 | + await set_extended_pan_id |
| 628 | + await set_network_key_util |
| 629 | + await set_network_key_nvram |
| 630 | + |
| 631 | + app._reset.assert_called_once_with() |
| 632 | + |
| 633 | + # Ensure we set everything we could |
| 634 | + assert app.channel is None # We can't set it |
| 635 | + assert app.nwk_update_id is None # We can't use it |
| 636 | + assert app.channels == channels |
| 637 | + assert app.pan_id == pan_id |
| 638 | + assert app.extended_pan_id == extended_pan_id |
| 639 | + |
| 640 | + |
| 641 | +@pytest_mark_asyncio_timeout(seconds=5) |
| 642 | +async def test_update_network_bad_channel(mocker, caplog, application): |
| 643 | + app, znp_server = application |
| 644 | + |
| 645 | + with pytest.raises(ValueError): |
| 646 | + # 12 is not in the mask |
| 647 | + await app.update_network( |
| 648 | + channel=t.uint8_t(12), channels=t.Channels.from_channel_list([11, 15, 20]), |
| 649 | + ) |
0 commit comments