16
16
import time
17
17
from micropython import const
18
18
19
+ try :
20
+ from typing import Optional , Tuple , Sequence
21
+ from adafruit_fona .adafruit_fona import FONA
22
+ except ImportError :
23
+ pass
24
+
19
25
_the_interface = None # pylint: disable=invalid-name
20
26
21
27
22
- def set_interface (iface ) :
28
+ def set_interface (iface : FONA ) -> None :
23
29
"""Helper to set the global internet interface."""
24
30
global _the_interface # pylint: disable=global-statement, invalid-name
25
31
_the_interface = iface
26
32
27
33
28
- def htonl (x ) :
34
+ def htonl (x : int ) -> int :
29
35
"""Convert 32-bit positive integers from host to network byte order."""
30
36
return (
31
37
((x ) << 24 & 0xFF000000 )
@@ -35,7 +41,7 @@ def htonl(x):
35
41
)
36
42
37
43
38
- def htons (x ) :
44
+ def htons (x : int ) -> int :
39
45
"""Convert 16-bit positive integers from host to network byte order."""
40
46
return (((x ) << 8 ) & 0xFF00 ) | (((x ) >> 8 ) & 0xFF )
41
47
@@ -50,7 +56,7 @@ def htons(x):
50
56
SOCKETS = []
51
57
52
58
# pylint: disable=too-many-arguments, unused-argument
53
- def getaddrinfo (host , port , family = 0 , socktype = 0 , proto = 0 , flags = 0 ):
59
+ def getaddrinfo (host , port : int , family = 0 , socktype = 0 , proto = 0 , flags = 0 ):
54
60
"""Translate the host/port argument into a sequence of 5-tuples that
55
61
contain all the necessary arguments for creating a socket connected to that service.
56
62
"""
@@ -59,9 +65,10 @@ def getaddrinfo(host, port, family=0, socktype=0, proto=0, flags=0):
59
65
return [(AF_INET , socktype , proto , "" , (gethostbyname (host ), port ))]
60
66
61
67
62
- def gethostbyname (hostname ) :
68
+ def gethostbyname (hostname : str ) -> str :
63
69
"""Translate a host name to IPv4 address format. The IPv4 address
64
70
is returned as a string.
71
+
65
72
:param str hostname: Desired hostname.
66
73
"""
67
74
addr = _the_interface .get_host_by_name (hostname )
@@ -72,14 +79,19 @@ def gethostbyname(hostname):
72
79
class socket :
73
80
"""A simplified implementation of the Python 'socket' class
74
81
for connecting to a FONA cellular module.
82
+
75
83
:param int family: Socket address (and protocol) family.
76
84
:param int type: Socket type.
77
-
78
85
"""
79
86
80
87
def __init__ (
81
- self , family = AF_INET , type = SOCK_STREAM , proto = 0 , fileno = None , socknum = None
82
- ):
88
+ self ,
89
+ family : int = AF_INET ,
90
+ type : int = SOCK_STREAM ,
91
+ proto : int = 0 ,
92
+ fileno : Optional [int ] = None ,
93
+ socknum : Optional [int ] = None ,
94
+ ) -> None :
83
95
if family != AF_INET :
84
96
raise RuntimeError ("Only AF_INET family supported by cellular sockets." )
85
97
self ._sock_type = type
@@ -94,35 +106,37 @@ def __init__(
94
106
self .settimeout (self ._timeout )
95
107
96
108
@property
97
- def socknum (self ):
109
+ def socknum (self ) -> int :
98
110
"""Returns the socket object's socket number."""
99
111
return self ._socknum
100
112
101
113
@property
102
- def connected (self ):
114
+ def connected (self ) -> bool :
103
115
"""Returns whether or not we are connected to the socket."""
104
116
return _the_interface .socket_status (self .socknum )
105
117
106
- def getpeername (self ):
118
+ def getpeername (self ) -> str :
107
119
"""Return the remote address to which the socket is connected."""
108
120
return _the_interface .remote_ip (self .socknum )
109
121
110
- def inet_aton (self , ip_string ) :
122
+ def inet_aton (self , ip_string : str ) -> bytearray :
111
123
"""Convert an IPv4 address from dotted-quad string format.
112
- :param str ip_string: IP Address, as a dotted-quad string.
113
124
125
+ :param str ip_string: IP Address, as a dotted-quad string.
114
126
"""
115
127
self ._buffer = b""
116
128
self ._buffer = [int (item ) for item in ip_string .split ("." )]
117
129
self ._buffer = bytearray (self ._buffer )
118
130
return self ._buffer
119
131
120
- def connect (self , address , conn_mode = None ):
132
+ def connect (
133
+ self , address : Tuple [str , int ], conn_mode : Optional [int ] = None
134
+ ) -> None :
121
135
"""Connect to a remote socket at address. (The format of address depends
122
136
on the address family — see above.)
137
+
123
138
:param tuple address: Remote socket as a (host, port) tuple.
124
139
:param int conn_mode: Connection mode (TCP/UDP)
125
-
126
140
"""
127
141
assert (
128
142
conn_mode != 0x03
@@ -135,17 +149,18 @@ def connect(self, address, conn_mode=None):
135
149
raise RuntimeError ("Failed to connect to host" , host )
136
150
self ._buffer = b""
137
151
138
- def send (self , data ) :
152
+ def send (self , data : bytes ) -> None :
139
153
"""Send data to the socket. The socket must be connected to
140
154
a remote socket prior to calling this method.
141
- :param bytes data: Desired data to send to the socket.
142
155
156
+ :param bytes data: Desired data to send to the socket.
143
157
"""
144
158
_the_interface .socket_write (self ._socknum , data , self ._timeout )
145
159
gc .collect ()
146
160
147
- def recv (self , bufsize = 0 ) :
161
+ def recv (self , bufsize : int = 0 ) -> Sequence [ int ] :
148
162
"""Reads some bytes from the connected remote address.
163
+
149
164
:param int bufsize: maximum number of bytes to receive
150
165
"""
151
166
# print("Socket read", bufsize)
@@ -189,7 +204,7 @@ def recv(self, bufsize=0):
189
204
gc .collect ()
190
205
return ret
191
206
192
- def readline (self ):
207
+ def readline (self ) -> Sequence [ int ] :
193
208
"""Attempt to return as many bytes as we can up to but not including '\r \n '"""
194
209
# print("Socket readline")
195
210
stamp = time .monotonic ()
@@ -205,26 +220,25 @@ def readline(self):
205
220
gc .collect ()
206
221
return firstline
207
222
208
- def available (self ):
223
+ def available (self ) -> int :
209
224
"""Returns how many bytes are available to be read from the socket."""
210
225
return _the_interface .socket_available (self ._socknum )
211
226
212
- def settimeout (self , value ) :
227
+ def settimeout (self , value : int ) -> None :
213
228
"""Sets socket read timeout.
214
- :param int value: Socket read timeout, in seconds.
215
229
230
+ :param int value: Socket read timeout, in seconds.
216
231
"""
217
232
if value < 0 :
218
233
raise Exception ("Timeout period should be non-negative." )
219
234
self ._timeout = value
220
235
221
- def gettimeout (self ):
236
+ def gettimeout (self ) -> int :
222
237
"""Return the timeout in seconds (float) associated
223
238
with socket operations, or None if no timeout is set.
224
-
225
239
"""
226
240
return self ._timeout
227
241
228
- def close (self ):
242
+ def close (self ) -> bool :
229
243
"""Closes the socket."""
230
244
return _the_interface .socket_close (self ._socknum )
0 commit comments