Skip to content

Fix #18 #26

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 29, 2022
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 98 additions & 27 deletions adafruit_gc_iot_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,24 @@
https://github.com/adafruit/Adafruit_CircuitPython_Logging

"""

from __future__ import annotations


import time

try:
from typing import Any, Dict, Optional, Union
from types import TracebackType

from adafruit_esp32spi import adafruit_esp32spi as ESP32SPI
from adafruit_minimqtt import adafruit_minimqtt as MQTT
except ImportError:
pass

import adafruit_logging as logging
from adafruit_jwt import JWT
import rtc
from adafruit_jwt import JWT

__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_GC_IOT_Core.git"
Expand All @@ -50,7 +63,7 @@ class MQTT_API:

"""

def __init__(self, mqtt_client):
def __init__(self, mqtt_client: MQTT.MQTT):
# Check that provided object is a MiniMQTT client object
mqtt_client_type = str(type(mqtt_client))
if "MQTT" in mqtt_client_type:
Expand Down Expand Up @@ -98,13 +111,18 @@ def __init__(self, mqtt_client):
# Set up a device identifier by splitting out the full CID
self.device_id = self._client.client_id.split("/")[7]

def __enter__(self):
def __enter__(self) -> MQTT_API:
return self

def __exit__(self, exception_type, exception_value, traceback):
def __exit__(
self,
exception_type: Exception,
exception_value: Exception,
traceback: TracebackType,
) -> None:
self.disconnect()

def disconnect(self):
def disconnect(self) -> None:
"""Disconnects from the Google MQTT Broker."""
try:
self._client.disconnect()
Expand All @@ -120,25 +138,34 @@ def disconnect(self):
# De-initialize MiniMQTT Client
self._client.deinit()

def reconnect(self):
def reconnect(self) -> None:
"""Reconnects to the Google MQTT Broker."""
try:
self._client.reconnect()
except Exception as err:
raise MQTT_API_ERROR("Error reconnecting to Google MQTT.") from err

def connect(self):
def connect(self) -> None:
"""Connects to the Google MQTT Broker."""
self._client.connect()
self._connected = True

@property
def is_connected(self):
def is_connected(self) -> bool:
"""Returns if client is connected to Google's MQTT broker."""
return self._connected

# pylint: disable=not-callable, unused-argument
def _on_connect_mqtt(self, client, userdata, flags, return_code):
# TODO: Remove line-too-long; temporary with TODO comments below
# pylint: disable=not-callable, unused-argument, line-too-long
def _on_connect_mqtt(
self,
client: MQTT.MQTT,
user_data: Optional[
Any
], # TODO: Annotate. Unknown type other than "None" based on inspection of adafruit_minimqtt
flags: bytearray,
return_code: int,
) -> None:
"""Runs when the mqtt client calls on_connect."""
if self.logger:
self._client.logger.debug("Client called on_connect.")
Expand All @@ -148,10 +175,17 @@ def _on_connect_mqtt(self, client, userdata, flags, return_code):
raise MQTT_API_ERROR(return_code)
# Call the user-defined on_connect callback if defined
if self.on_connect is not None:
self.on_connect(self, userdata, flags, return_code)
self.on_connect(self, user_data, flags, return_code)

# pylint: disable=not-callable, unused-argument
def _on_disconnect_mqtt(self, client, userdata, return_code):
def _on_disconnect_mqtt(
self,
client: MQTT.MQTT,
user_data: Optional[
Any
], # TODO: Annotate. Unknown type other than "None" based on inspection of adafruit_minimqtt
return_code: int,
) -> None:
"""Runs when the client calls on_disconnect."""
if self.logger:
self._client.logger.debug("Client called on_disconnect")
Expand All @@ -161,30 +195,46 @@ def _on_disconnect_mqtt(self, client, userdata, return_code):
self.on_disconnect(self)

# pylint: disable=not-callable
def _on_message_mqtt(self, client, topic, payload):
def _on_message_mqtt(self, client: MQTT.MQTT, topic: str, payload: str) -> None:
"""Runs when the client calls on_message."""
if self.logger:
self._client.logger.debug("Client called on_message")
if self.on_message is not None:
self.on_message(self, topic, payload)

# pylint: disable=not-callable
def _on_subscribe_mqtt(self, client, user_data, topic, qos):
def _on_subscribe_mqtt(
self,
client: MQTT.MQTT,
user_data: Optional[
Any
], # TODO: Annotate. Unknown type other than "None" based on inspection of adafruit_minimqtt
topic: str,
qos: int,
) -> None:
"""Runs when the client calls on_subscribe."""
if self.logger:
self._client.logger.debug("Client called on_subscribe")
if self.on_subscribe is not None:
self.on_subscribe(self, user_data, topic, qos)

# pylint: disable=not-callable
def _on_unsubscribe_mqtt(self, client, user_data, topic, pid):
def _on_unsubscribe_mqtt(
self,
client: MQTT.MQTT,
user_data: Optional[
Any
], # TODO: Annotate. Unknown type other than "None" based on inspection of adafruit_minimqtt
topic: str,
pid: int,
) -> None:
"""Runs when the client calls on_unsubscribe."""
if self.logger:
self._client.logger.debug("Client called on_unsubscribe")
if self.on_unsubscribe is not None:
self.on_unsubscribe(self, user_data, topic, pid)

def loop(self):
def loop(self) -> None:
"""Maintains a connection with Google Cloud IoT Core's MQTT broker. You will
need to manually call this method within a loop to retain connection.

Expand All @@ -199,7 +249,7 @@ def loop(self):
if self._connected:
self._client.loop()

def unsubscribe(self, topic, subfolder=None):
def unsubscribe(self, topic: str, subfolder: Optional[str] = None) -> None:
"""Unsubscribes from a Google Cloud IoT device topic.

:param str topic: Required MQTT topic. Defaults to events.
Expand All @@ -212,15 +262,20 @@ def unsubscribe(self, topic, subfolder=None):
mqtt_topic = "/devices/{}/{}".format(self.device_id, topic)
self._client.unsubscribe(mqtt_topic)

def unsubscribe_from_all_commands(self):
def unsubscribe_from_all_commands(self) -> None:
"""Unsubscribes from a device's "commands/#" topic.

:param int qos: Quality of Service level for the message.

"""
self.unsubscribe("commands/#")

def subscribe(self, topic, subfolder=None, qos=1):
def subscribe(
self,
topic: str,
subfolder: Optional[str] = None,
qos: int = 1,
) -> None:
"""Subscribes to a Google Cloud IoT device topic.

:param str topic: Required MQTT topic. Defaults to events.
Expand All @@ -234,7 +289,12 @@ def subscribe(self, topic, subfolder=None, qos=1):
mqtt_topic = "/devices/{}/{}".format(self.device_id, topic)
self._client.subscribe(mqtt_topic, qos)

def subscribe_to_subfolder(self, topic, subfolder, qos=1):
def subscribe_to_subfolder(
self,
topic: str,
subfolder: Optional[str] = None,
qos: int = 1,
) -> None:
"""Subscribes to a Google Cloud IoT device's topic subfolder

:param str topic: Required MQTT topic.
Expand All @@ -244,7 +304,7 @@ def subscribe_to_subfolder(self, topic, subfolder, qos=1):
"""
self.subscribe(topic, subfolder, qos)

def subscribe_to_config(self, qos=1):
def subscribe_to_config(self, qos: int = 1) -> None:
"""Subscribes to a Google Cloud IoT device's configuration
topic.

Expand All @@ -253,14 +313,20 @@ def subscribe_to_config(self, qos=1):
"""
self.subscribe("config", qos=qos)

def subscribe_to_all_commands(self, qos=1):
def subscribe_to_all_commands(self, qos: int = 1) -> None:
"""Subscribes to a device's "commands/#" topic.
:param int qos: Quality of Service level for the message.

"""
self.subscribe("commands/#", qos=qos)

def publish(self, payload, topic="events", subfolder=None, qos=0):
def publish(
self,
payload: Union[float, int, str],
topic: str = "events",
subfolder: Optional[str] = None,
qos: int = 0,
) -> None:
"""Publishes a payload from the device to its Google Cloud IoT
device topic, defaults to "events" topic. To send state, use the
publish_state method.
Expand All @@ -283,7 +349,7 @@ def publish(self, payload, topic="events", subfolder=None, qos=0):
raise TypeError("A topic string must be specified.")
self._client.publish(mqtt_topic, payload, qos=qos)

def publish_state(self, payload):
def publish_state(self, payload: Union[float, int, str]) -> None:
"""Publishes a device state message to the Cloud IoT MQTT API. Data
sent by this method should be information about the device itself (such as number of
crashes, battery level, or device health). This method is unidirectional,
Expand All @@ -303,7 +369,12 @@ class Cloud_Core:

"""

def __init__(self, esp=None, secrets=None, log=False):
def __init__(
self,
esp: Optional[ESP32SPI.ESP_SPIcontrol] = None,
secrets: Optional[Dict[str, Any]] = None,
log: bool = False,
) -> None:
self._esp = esp
# Validate Secrets
if secrets and hasattr(secrets, "keys"):
Expand All @@ -328,7 +399,7 @@ def __init__(self, esp=None, secrets=None, log=False):
self.cid = self.client_id

@property
def client_id(self):
def client_id(self) -> str:
"""Returns a Google Cloud IOT Core Client ID."""
client_id = "projects/{0}/locations/{1}/registries/{2}/devices/{3}".format(
self._proj_id, self._region, self._reg_id, self._device_id
Expand All @@ -337,7 +408,7 @@ def client_id(self):
self.logger.debug("Client ID: {}".format(client_id))
return client_id

def generate_jwt(self, ttl=43200, algo="RS256"):
def generate_jwt(self, ttl: int = 43200, algo: str = "RS256") -> str:
"""Generates a JSON Web Token (https://jwt.io/) using network time.

:param int jwt_ttl: When the JWT token expires, defaults to 43200 minutes (or 12 hours).
Expand Down