Skip to content

Fix #18 #26

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 29, 2022
Merged
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 111 additions & 38 deletions adafruit_gc_iot_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,19 @@
https://github.com/adafruit/Adafruit_CircuitPython_Logging

"""

import time

try:
from typing import Any, Callable, Dict, Optional, Type, Union
from types import TracebackType

# pylint: disable=unused-import
from adafruit_esp32spi import adafruit_esp32spi as ESP32SPI
from adafruit_minimqtt import adafruit_minimqtt as MQTT
except ImportError:
pass

import adafruit_logging as logging
from adafruit_jwt import JWT
import rtc
Expand All @@ -46,11 +57,21 @@ class MQTT_API_ERROR(Exception):
class MQTT_API:
"""Client for interacting with Google's Cloud Core MQTT API.

:param MiniMQTT mqtt_client: MiniMQTT Client object.

:param ~MQTT.MQTT mqtt_client: MiniMQTT Client object
"""

def __init__(self, mqtt_client):
device_id: str
logger: bool
on_connect: Optional[Callable[["MQTT_API", Optional[Any], bytearray, int], None]]
on_disconnect: Optional[Callable[["MQTT_API"], None]]
on_message: Optional[Callable[["MQTT_API", str, str], None]]
on_subscribe: Optional[Callable[["MQTT_API", Optional[Any], str, int], None]]
on_unsubscribe: Optional[Callable[["MQTT_API", Optional[Any], str, int], None]]
user: str

_client: "MQTT.MQTT"

def __init__(self, mqtt_client: "MQTT.MQTT") -> None:
# Check that provided object is a MiniMQTT client object
mqtt_client_type = str(type(mqtt_client))
if "MQTT" in mqtt_client_type:
Expand Down Expand Up @@ -98,13 +119,18 @@ def __init__(self, mqtt_client):
# Set up a device identifier by splitting out the full CID
self.device_id = self._client.client_id.split("/")[7]

def __enter__(self):
def __enter__(self) -> "MQTT_API":
return self

def __exit__(self, exception_type, exception_value, traceback):
def __exit__(
self,
exception_type: Optional[Type[type]],
exception_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
self.disconnect()

def disconnect(self):
def disconnect(self) -> None:
"""Disconnects from the Google MQTT Broker."""
try:
self._client.disconnect()
Expand All @@ -120,25 +146,31 @@ def disconnect(self):
# De-initialize MiniMQTT Client
self._client.deinit()

def reconnect(self):
def reconnect(self) -> None:
"""Reconnects to the Google MQTT Broker."""
try:
self._client.reconnect()
except Exception as err:
raise MQTT_API_ERROR("Error reconnecting to Google MQTT.") from err

def connect(self):
def connect(self) -> None:
"""Connects to the Google MQTT Broker."""
self._client.connect()
self._connected = True

@property
def is_connected(self):
def is_connected(self) -> bool:
"""Returns if client is connected to Google's MQTT broker."""
return self._connected

# pylint: disable=not-callable, unused-argument
def _on_connect_mqtt(self, client, userdata, flags, return_code):
def _on_connect_mqtt(
self,
client: "MQTT.MQTT",
user_data: Optional[Any],
flags: bytearray,
return_code: int,
) -> None:
"""Runs when the mqtt client calls on_connect."""
if self.logger:
self._client.logger.debug("Client called on_connect.")
Expand All @@ -148,10 +180,15 @@ def _on_connect_mqtt(self, client, userdata, flags, return_code):
raise MQTT_API_ERROR(return_code)
# Call the user-defined on_connect callback if defined
if self.on_connect is not None:
self.on_connect(self, userdata, flags, return_code)
self.on_connect(self, user_data, flags, return_code)

# pylint: disable=not-callable, unused-argument
def _on_disconnect_mqtt(self, client, userdata, return_code):
def _on_disconnect_mqtt(
self,
client: "MQTT.MQTT",
user_data: Optional[Any],
return_code: int,
) -> None:
"""Runs when the client calls on_disconnect."""
if self.logger:
self._client.logger.debug("Client called on_disconnect")
Expand All @@ -161,30 +198,42 @@ def _on_disconnect_mqtt(self, client, userdata, return_code):
self.on_disconnect(self)

# pylint: disable=not-callable
def _on_message_mqtt(self, client, topic, payload):
def _on_message_mqtt(self, client: "MQTT.MQTT", topic: str, payload: str) -> None:
"""Runs when the client calls on_message."""
if self.logger:
self._client.logger.debug("Client called on_message")
if self.on_message is not None:
self.on_message(self, topic, payload)

# pylint: disable=not-callable
def _on_subscribe_mqtt(self, client, user_data, topic, qos):
def _on_subscribe_mqtt(
self,
client: "MQTT.MQTT",
user_data: Optional[Any],
topic: str,
qos: int,
) -> None:
"""Runs when the client calls on_subscribe."""
if self.logger:
self._client.logger.debug("Client called on_subscribe")
if self.on_subscribe is not None:
self.on_subscribe(self, user_data, topic, qos)

# pylint: disable=not-callable
def _on_unsubscribe_mqtt(self, client, user_data, topic, pid):
def _on_unsubscribe_mqtt(
self,
client: "MQTT.MQTT",
user_data: Optional[Any],
topic: str,
pid: int,
) -> None:
"""Runs when the client calls on_unsubscribe."""
if self.logger:
self._client.logger.debug("Client called on_unsubscribe")
if self.on_unsubscribe is not None:
self.on_unsubscribe(self, user_data, topic, pid)

def loop(self):
def loop(self) -> None:
"""Maintains a connection with Google Cloud IoT Core's MQTT broker. You will
need to manually call this method within a loop to retain connection.

Expand All @@ -194,73 +243,83 @@ def loop(self):

while True:
google_iot.loop()

"""
if self._connected:
self._client.loop()

def unsubscribe(self, topic, subfolder=None):
def unsubscribe(self, topic: str, subfolder: Optional[str] = None) -> None:
"""Unsubscribes from a Google Cloud IoT device topic.

:param str topic: Required MQTT topic. Defaults to events.
:param str subfolder: Optional MQTT topic subfolder. Defaults to None.

"""
if subfolder is not None:
mqtt_topic = "/devices/{}/{}/{}".format(self.device_id, topic, subfolder)
else:
mqtt_topic = "/devices/{}/{}".format(self.device_id, topic)
self._client.unsubscribe(mqtt_topic)

def unsubscribe_from_all_commands(self):
def unsubscribe_from_all_commands(self) -> None:
"""Unsubscribes from a device's "commands/#" topic.

:param int qos: Quality of Service level for the message.

"""
self.unsubscribe("commands/#")

def subscribe(self, topic, subfolder=None, qos=1):
def subscribe(
self,
topic: str,
subfolder: Optional[str] = None,
qos: int = 1,
) -> None:
"""Subscribes to a Google Cloud IoT device topic.

:param str topic: Required MQTT topic. Defaults to events.
:param str subfolder: Optional MQTT topic subfolder. Defaults to None.
:param int qos: Quality of Service level for the message.

"""
if subfolder is not None:
mqtt_topic = "/devices/{}/{}/{}".format(self.device_id, topic, subfolder)
else:
mqtt_topic = "/devices/{}/{}".format(self.device_id, topic)
self._client.subscribe(mqtt_topic, qos)

def subscribe_to_subfolder(self, topic, subfolder, qos=1):
def subscribe_to_subfolder(
self,
topic: str,
subfolder: Optional[str] = None,
qos: int = 1,
) -> None:
"""Subscribes to a Google Cloud IoT device's topic subfolder

:param str topic: Required MQTT topic.
:param str subfolder: Optional MQTT topic subfolder. Defaults to None.
:param int qos: Quality of Service level for the message.

"""
self.subscribe(topic, subfolder, qos)

def subscribe_to_config(self, qos=1):
def subscribe_to_config(self, qos: int = 1) -> None:
"""Subscribes to a Google Cloud IoT device's configuration
topic.

:param int qos: Quality of Service level for the message.

"""
self.subscribe("config", qos=qos)

def subscribe_to_all_commands(self, qos=1):
def subscribe_to_all_commands(self, qos: int = 1) -> None:
"""Subscribes to a device's "commands/#" topic.
:param int qos: Quality of Service level for the message.

:param int qos: Quality of Service level for the message.
"""
self.subscribe("commands/#", qos=qos)

def publish(self, payload, topic="events", subfolder=None, qos=0):
def publish(
self,
payload: Union[float, int, str],
topic: str = "events",
subfolder: Optional[str] = None,
qos: int = 0,
) -> None:
"""Publishes a payload from the device to its Google Cloud IoT
device topic, defaults to "events" topic. To send state, use the
publish_state method.
Expand All @@ -271,7 +330,6 @@ def publish(self, payload, topic="events", subfolder=None, qos=0):
:param str topic: Required MQTT topic. Defaults to events.
:param str subfolder: Optional MQTT topic subfolder. Defaults to None.
:param int qos: Quality of Service level for the message.

"""
if subfolder is not None:
mqtt_topic = "/devices/{}/{}/{}".format(self.device_id, topic, subfolder)
Expand All @@ -283,12 +341,11 @@ def publish(self, payload, topic="events", subfolder=None, qos=0):
raise TypeError("A topic string must be specified.")
self._client.publish(mqtt_topic, payload, qos=qos)

def publish_state(self, payload):
def publish_state(self, payload: Union[float, int, str]) -> None:
"""Publishes a device state message to the Cloud IoT MQTT API. Data
sent by this method should be information about the device itself (such as number of
crashes, battery level, or device health). This method is unidirectional,
it communicates Device-to-Cloud only.

"""
self._client.publish(payload, "state")

Expand All @@ -300,10 +357,27 @@ class Cloud_Core:
:param ESP_SPIcontrol esp: ESP32SPI object.
:param dict secrets: Secrets.py file.
:param bool log: Enable Cloud_Core logging, defaults to False.

"""

def __init__(self, esp=None, secrets=None, log=False):
broker: str
username: str
cid: str
logger: Optional[logging.Logger]

_esp: Optional["ESP32SPI.ESP_SPIcontrol"]
_secrets: Optional[Dict[str, Any]]
_proj_id: str
_region: str
_reg_id: str
_device_id: str
_private_key: str

def __init__(
self,
esp: Optional["ESP32SPI.ESP_SPIcontrol"] = None,
secrets: Optional[Dict[str, Any]] = None,
log: bool = False,
) -> None:
self._esp = esp
# Validate Secrets
if secrets and hasattr(secrets, "keys"):
Expand All @@ -328,7 +402,7 @@ def __init__(self, esp=None, secrets=None, log=False):
self.cid = self.client_id

@property
def client_id(self):
def client_id(self) -> str:
"""Returns a Google Cloud IOT Core Client ID."""
client_id = "projects/{0}/locations/{1}/registries/{2}/devices/{3}".format(
self._proj_id, self._region, self._reg_id, self._device_id
Expand All @@ -337,7 +411,7 @@ def client_id(self):
self.logger.debug("Client ID: {}".format(client_id))
return client_id

def generate_jwt(self, ttl=43200, algo="RS256"):
def generate_jwt(self, ttl: int = 43200, algo: str = "RS256") -> str:
"""Generates a JSON Web Token (https://jwt.io/) using network time.

:param int jwt_ttl: When the JWT token expires, defaults to 43200 minutes (or 12 hours).
Expand All @@ -349,7 +423,6 @@ def generate_jwt(self, ttl=43200, algo="RS256"):

jwt = CloudCore.generate_jwt()
print("Generated JWT: ", jwt)

"""
if self.logger:
self.logger.debug("Generating JWT...")
Expand Down