|
1 | 1 | # The MIT License (MIT)
|
2 | 2 | #
|
3 | 3 | # Copyright (c) 2019 Dan Halbert for Adafruit Industries
|
| 4 | +# Copyright (c) 2019 Scott Shawcroft for Adafruit Industries |
4 | 5 | #
|
5 | 6 | # Permission is hereby granted, free of charge, to any person obtaining a copy
|
6 | 7 | # of this software and associated documentation files (the "Software"), to deal
|
|
26 | 27 | This module provides higher-level BLE (Bluetooth Low Energy) functionality,
|
27 | 28 | building on the native `_bleio` module.
|
28 | 29 |
|
29 |
| -* Author(s): Dan Halbert for Adafruit Industries |
| 30 | +* Author(s): Dan Halbert and Scott Shawcroft for Adafruit Industries |
30 | 31 |
|
31 | 32 | Implementation Notes
|
32 | 33 | --------------------
|
|
42 | 43 |
|
43 | 44 | """
|
44 | 45 |
|
45 |
| -# imports |
| 46 | +import _bleio |
| 47 | +import board |
| 48 | + |
| 49 | +from .services.core import Service |
| 50 | +from .advertising import Advertisement |
46 | 51 |
|
47 | 52 | __version__ = "0.0.0-auto.0"
|
48 | 53 | __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BLE.git"
|
| 54 | + |
| 55 | +# These are internal data structures used throughout the library to recognize certain Services and |
| 56 | +# Advertisements. |
| 57 | +# pylint: disable=invalid-name |
| 58 | +all_services_by_name = {} |
| 59 | +all_services_by_uuid = {} |
| 60 | +known_advertisements = set() |
| 61 | +# pylint: enable=invalid-name |
| 62 | + |
| 63 | +def recognize_services(*service_classes): |
| 64 | + """Instruct the adafruit_ble library to recognize the given Services. |
| 65 | +
|
| 66 | + This will cause the Service related advertisements to show the corresponding class. |
| 67 | + `SmartConnection` will automatically have attributes for any recognized service available |
| 68 | + from the peer.""" |
| 69 | + for service_class in service_classes: |
| 70 | + if not issubclass(service_class, Service): |
| 71 | + raise ValueError("Can only detect subclasses of Service") |
| 72 | + all_services_by_name[service_class.default_field_name] = service_class |
| 73 | + all_services_by_uuid[service_class.uuid] = service_class |
| 74 | + |
| 75 | +def recognize_advertisement(*advertisements): |
| 76 | + """Instruct the adafruit_ble library to recognize the given `Advertisement` types. |
| 77 | +
|
| 78 | + When an advertisement is recognized by the `SmartAdapter`, it will be returned from the |
| 79 | + start_scan iterator instead of a generic `Advertisement`.""" |
| 80 | + known_advertisements.add(*advertisements) |
| 81 | + |
| 82 | +class SmartConnection: |
| 83 | + """This represents a connection to a peer BLE device. |
| 84 | +
|
| 85 | + Its smarts come from its ability to recognize Services available on the peer and make them |
| 86 | + available as attributes on the Connection. Use `recognize_services` to register all services |
| 87 | + of interest. All subsequent Connections will then recognize the service. |
| 88 | +
|
| 89 | + ``dir(connection)`` will show all attributes including recognized Services. |
| 90 | + """ |
| 91 | + def __init__(self, connection): |
| 92 | + self._connection = connection |
| 93 | + |
| 94 | + def __dir__(self): |
| 95 | + discovered = [] |
| 96 | + results = self._connection.discover_remote_services() |
| 97 | + for service in results: |
| 98 | + uuid = service.uuid |
| 99 | + if uuid in all_services_by_uuid: |
| 100 | + service = all_services_by_uuid[uuid] |
| 101 | + discovered.append(service.default_field_name) |
| 102 | + super_dir = dir(super()) |
| 103 | + super_dir.extend(discovered) |
| 104 | + return super_dir |
| 105 | + |
| 106 | + def __getattr__(self, name): |
| 107 | + if name in self.__dict__: |
| 108 | + return self.__dict__[name] |
| 109 | + if name in all_services_by_name: |
| 110 | + service = all_services_by_name[name] |
| 111 | + uuid = service.uuid._uuid |
| 112 | + results = self._connection.discover_remote_services((uuid,)) |
| 113 | + if results: |
| 114 | + remote_service = service(service=results[0]) |
| 115 | + setattr(self, name, remote_service) |
| 116 | + return remote_service |
| 117 | + raise AttributeError() |
| 118 | + |
| 119 | + @property |
| 120 | + def connected(self): |
| 121 | + """True if the connection to the peer is still active.""" |
| 122 | + return self._connection.connected |
| 123 | + |
| 124 | + def disconnect(self): |
| 125 | + """Disconnect from peer.""" |
| 126 | + self._connection.disconnect() |
| 127 | + |
| 128 | +class SmartAdapter: |
| 129 | + """This BLE Adapter class enhances the normal `_bleio.Adapter`. |
| 130 | +
|
| 131 | + It uses the library's `Advertisement` classes and the `SmartConnection` class.""" |
| 132 | + def __init__(self, adapter=None): |
| 133 | + if not adapter: |
| 134 | + adapter = _bleio.adapter |
| 135 | + self._adapter = adapter |
| 136 | + self._current_advertisement = None |
| 137 | + self._connection_cache = {} |
| 138 | + |
| 139 | + def start_advertising(self, advertisement, scan_response=None, **kwargs): |
| 140 | + """Starts advertising the given advertisement. |
| 141 | +
|
| 142 | + It takes most kwargs of `_bleio.Adapter.start_advertising`.""" |
| 143 | + scan_response_data = None |
| 144 | + if scan_response: |
| 145 | + scan_response_data = bytes(scan_response) |
| 146 | + print(advertisement.connectable) |
| 147 | + self._adapter.start_advertising(bytes(advertisement), |
| 148 | + scan_response=scan_response_data, |
| 149 | + connectable=advertisement.connectable, |
| 150 | + **kwargs) |
| 151 | + |
| 152 | + def stop_advertising(self): |
| 153 | + """Stops advertising.""" |
| 154 | + self._adapter.stop_advertising() |
| 155 | + |
| 156 | + def start_scan(self, advertisement_types=None, **kwargs): |
| 157 | + """Starts scanning. Returns an iterator of Advertisements that are either recognized or |
| 158 | + in advertisment_types (which will be subsequently recognized.) The iterator will block |
| 159 | + until an advertisement is heard or the scan times out. |
| 160 | +
|
| 161 | + If a list ``advertisement_types`` is given, only Advertisements of that type are produced |
| 162 | + by the returned iterator.""" |
| 163 | + prefixes = b"" |
| 164 | + if advertisement_types: |
| 165 | + recognize_advertisement(*advertisement_types) |
| 166 | + if len(advertisement_types) == 1: |
| 167 | + prefixes = advertisement_types[0].prefix |
| 168 | + for entry in self._adapter.start_scan(prefixes=prefixes, **kwargs): |
| 169 | + adv_type = Advertisement |
| 170 | + for possible_type in known_advertisements: |
| 171 | + if possible_type.matches(entry) and issubclass(possible_type, adv_type): |
| 172 | + adv_type = possible_type |
| 173 | + advertisement = adv_type.from_entry(entry) |
| 174 | + if advertisement: |
| 175 | + yield advertisement |
| 176 | + |
| 177 | + def stop_scan(self): |
| 178 | + """Stops any active scan. |
| 179 | +
|
| 180 | + The scan results iterator will return any buffered results and then raise StopIteration |
| 181 | + once empty.""" |
| 182 | + self._adapter.stop_scan() |
| 183 | + |
| 184 | + def connect(self, advertisement, *, timeout=4): |
| 185 | + """Initiates a `SmartConnection` to the peer that advertised the given advertisement.""" |
| 186 | + connection = self._adapter.connect(advertisement.address, timeout=timeout) |
| 187 | + self._connection_cache[connection] = SmartConnection(connection) |
| 188 | + return self._connection_cache[connection] |
| 189 | + |
| 190 | + @property |
| 191 | + def connected(self): |
| 192 | + """True if any peers are connected to the adapter.""" |
| 193 | + return self._adapter.connected |
| 194 | + |
| 195 | + @property |
| 196 | + def connections(self): |
| 197 | + """A tuple of active `SmartConnection` objects.""" |
| 198 | + connections = self._adapter.connections |
| 199 | + smart_connections = [None] * len(connections) |
| 200 | + for i, connection in enumerate(self._adapter.connections): |
| 201 | + if connection not in self._connection_cache: |
| 202 | + self._connection_cache[connection] = SmartConnection(connection) |
| 203 | + smart_connections[i] = self._connection_cache[connection] |
| 204 | + |
| 205 | + return tuple(smart_connections) |
0 commit comments