30
30
#include "shared/runtime/interrupt_char.h"
31
31
#include "py/mperrno.h"
32
32
#include "py/runtime.h"
33
+ #include "shared-bindings/socketpool/SocketPool.h"
34
+ #include "supervisor/port.h"
33
35
#include "supervisor/shared/tick.h"
36
+ #include "supervisor/workflow.h"
34
37
35
38
#include "components/lwip/lwip/src/include/lwip/err.h"
36
39
#include "components/lwip/lwip/src/include/lwip/sockets.h"
37
40
#include "components/lwip/lwip/src/include/lwip/sys.h"
38
41
#include "components/lwip/lwip/src/include/lwip/netdb.h"
42
+ #include "components/vfs/include/esp_vfs_eventfd.h"
43
+
44
+ StackType_t socket_select_stack [2 * configMINIMAL_STACK_SIZE ];
45
+
46
+ STATIC int open_socket_fds [CONFIG_LWIP_MAX_SOCKETS ];
47
+ STATIC bool user_socket [CONFIG_LWIP_MAX_SOCKETS ];
48
+ StaticTask_t socket_select_task_handle ;
49
+ STATIC int socket_change_fd = -1 ;
50
+
51
+ STATIC void socket_select_task (void * arg ) {
52
+ uint64_t signal ;
53
+
54
+ while (true) {
55
+ fd_set readfds ;
56
+ fd_set errfds ;
57
+ FD_ZERO (& readfds );
58
+ FD_ZERO (& errfds );
59
+ FD_SET (socket_change_fd , & readfds );
60
+ FD_SET (socket_change_fd , & errfds );
61
+ int max_fd = socket_change_fd ;
62
+ for (size_t i = 0 ; i < MP_ARRAY_SIZE (open_socket_fds ); i ++ ) {
63
+ if (open_socket_fds [i ] < 0 ) {
64
+ continue ;
65
+ }
66
+ max_fd = MAX (max_fd , open_socket_fds [i ]);
67
+ FD_SET (open_socket_fds [i ], & readfds );
68
+ FD_SET (open_socket_fds [i ], & errfds );
69
+ }
70
+
71
+ int num_triggered = select (max_fd + 1 , & readfds , NULL , & errfds , NULL );
72
+ if (num_triggered < 0 ) {
73
+ // Maybe bad file descriptor
74
+ for (size_t i = 0 ; i < MP_ARRAY_SIZE (open_socket_fds ); i ++ ) {
75
+ int sockfd = open_socket_fds [i ];
76
+ if (sockfd < 0 ) {
77
+ continue ;
78
+ }
79
+ if (FD_ISSET (sockfd , & errfds )) {
80
+ int err ;
81
+ int optlen = sizeof (int );
82
+ int ret = getsockopt (sockfd , SOL_SOCKET , SO_ERROR , & err , (socklen_t * )& optlen );
83
+ if (ret < 0 ) {
84
+ open_socket_fds [i ] = -1 ;
85
+ // Try again.
86
+ continue ;
87
+ }
88
+ }
89
+ }
90
+ }
91
+ assert (num_triggered >= 0 );
92
+
93
+ if (FD_ISSET (socket_change_fd , & readfds )) {
94
+ read (socket_change_fd , & signal , sizeof (signal ));
95
+ num_triggered -= 1 ;
96
+ }
97
+ if (num_triggered > 0 ) {
98
+ supervisor_workflow_request_background ();
39
99
40
- STATIC socketpool_socket_obj_t * open_socket_handles [CONFIG_LWIP_MAX_SOCKETS ];
100
+ // Wake up CircuitPython. We know it is asleep because we are lower
101
+ // priority.
102
+ port_wake_main_task ();
103
+ }
104
+
105
+ }
106
+ close (socket_change_fd );
107
+ vTaskDelete (NULL );
108
+ }
41
109
42
110
void socket_user_reset (void ) {
43
- for (size_t i = 0 ; i < MP_ARRAY_SIZE (open_socket_handles ); i ++ ) {
44
- if (open_socket_handles [i ]) {
45
- if (open_socket_handles [i ]-> num > 0 ) {
46
- // Close automatically clears socket handle
47
- common_hal_socketpool_socket_close (open_socket_handles [i ]);
48
- } else {
49
- open_socket_handles [i ] = NULL ;
50
- }
111
+ if (socket_change_fd < 0 ) {
112
+ esp_vfs_eventfd_config_t config = ESP_VFS_EVENTD_CONFIG_DEFAULT ();
113
+ ESP_ERROR_CHECK (esp_vfs_eventfd_register (& config ));
114
+
115
+ for (size_t i = 0 ; i < MP_ARRAY_SIZE (open_socket_fds ); i ++ ) {
116
+ open_socket_fds [i ] = -1 ;
117
+ user_socket [i ] = false;
118
+ }
119
+ socket_change_fd = eventfd (0 , 0 );
120
+ // This task runs at a lower priority than CircuitPython and is used to wake CircuitPython
121
+ // up when any open sockets have data to read. It allows us to sleep otherwise.
122
+ (void )xTaskCreateStaticPinnedToCore (socket_select_task ,
123
+ "socket_select" ,
124
+ 2 * configMINIMAL_STACK_SIZE ,
125
+ NULL ,
126
+ 0 , // Run this at IDLE priority. We only need it when CP isn't running (at 1).
127
+ socket_select_stack ,
128
+ & socket_select_task_handle ,
129
+ xPortGetCoreID ());
130
+ }
131
+
132
+ for (size_t i = 0 ; i < MP_ARRAY_SIZE (open_socket_fds ); i ++ ) {
133
+ if (open_socket_fds [i ] >= 0 && user_socket [i ]) {
134
+ int num = open_socket_fds [i ];
135
+ // Close automatically clears socket handle
136
+ lwip_shutdown (num , SHUT_RDWR );
137
+ lwip_close (num );
138
+ open_socket_fds [i ] = -1 ;
139
+ user_socket [i ] = false;
51
140
}
52
141
}
53
142
}
54
143
55
- bool register_open_socket (socketpool_socket_obj_t * self ) {
56
- for (size_t i = 0 ; i < MP_ARRAY_SIZE (open_socket_handles ); i ++ ) {
57
- if (open_socket_handles [i ] == NULL ) {
58
- open_socket_handles [i ] = self ;
144
+ // The writes below send an event to the socket select task so that it redoes the
145
+ // select with the new open socket set.
146
+
147
+ STATIC bool register_open_socket (int fd ) {
148
+ for (size_t i = 0 ; i < MP_ARRAY_SIZE (open_socket_fds ); i ++ ) {
149
+ if (open_socket_fds [i ] == -1 ) {
150
+ open_socket_fds [i ] = fd ;
151
+ user_socket [i ] = false;
152
+ uint64_t signal = 1 ;
153
+ write (socket_change_fd , & signal , sizeof (signal ));
59
154
return true;
60
155
}
61
156
}
62
157
return false;
63
158
}
64
159
160
+ STATIC void unregister_open_socket (int fd ) {
161
+ for (size_t i = 0 ; i < MP_ARRAY_SIZE (open_socket_fds ); i ++ ) {
162
+ if (open_socket_fds [i ] == fd ) {
163
+ open_socket_fds [i ] = -1 ;
164
+ user_socket [i ] = false;
165
+ write (socket_change_fd , & fd , sizeof (fd ));
166
+ return ;
167
+ }
168
+ }
169
+ }
170
+
171
+ STATIC void mark_user_socket (int fd ) {
172
+ for (size_t i = 0 ; i < MP_ARRAY_SIZE (open_socket_fds ); i ++ ) {
173
+ if (open_socket_fds [i ] == fd ) {
174
+ user_socket [i ] = true;
175
+ return ;
176
+ }
177
+ }
178
+ }
179
+
180
+ bool socketpool_socket (socketpool_socketpool_obj_t * self ,
181
+ socketpool_socketpool_addressfamily_t family , socketpool_socketpool_sock_t type ,
182
+ socketpool_socket_obj_t * sock ) {
183
+ int addr_family ;
184
+ int ipproto ;
185
+ if (family == SOCKETPOOL_AF_INET ) {
186
+ addr_family = AF_INET ;
187
+ ipproto = IPPROTO_IP ;
188
+ } else { // INET6
189
+ addr_family = AF_INET6 ;
190
+ ipproto = IPPROTO_IPV6 ;
191
+ }
192
+
193
+ int socket_type ;
194
+ if (type == SOCKETPOOL_SOCK_STREAM ) {
195
+ socket_type = SOCK_STREAM ;
196
+ } else if (type == SOCKETPOOL_SOCK_DGRAM ) {
197
+ socket_type = SOCK_DGRAM ;
198
+ } else { // SOCKETPOOL_SOCK_RAW
199
+ socket_type = SOCK_RAW ;
200
+ }
201
+ sock -> type = socket_type ;
202
+ sock -> family = addr_family ;
203
+ sock -> ipproto = ipproto ;
204
+ sock -> pool = self ;
205
+ sock -> timeout_ms = (uint )- 1 ;
206
+
207
+ // Create LWIP socket
208
+ int socknum = -1 ;
209
+ socknum = lwip_socket (sock -> family , sock -> type , sock -> ipproto );
210
+ if (socknum < 0 ) {
211
+ return false;
212
+ }
213
+ // This shouldn't happen since we have room for the same number of sockets as LWIP.
214
+ if (!register_open_socket (socknum )) {
215
+ lwip_close (socknum );
216
+ return false;
217
+ }
218
+ sock -> num = socknum ;
219
+ // Sockets should be nonblocking in most cases
220
+ lwip_fcntl (socknum , F_SETFL , O_NONBLOCK );
221
+ return true;
222
+ }
223
+
224
+ socketpool_socket_obj_t * common_hal_socketpool_socket (socketpool_socketpool_obj_t * self ,
225
+ socketpool_socketpool_addressfamily_t family , socketpool_socketpool_sock_t type ) {
226
+ if (family != SOCKETPOOL_AF_INET ) {
227
+ mp_raise_NotImplementedError (translate ("Only IPv4 sockets supported" ));
228
+ }
229
+
230
+ socketpool_socket_obj_t * sock = m_new_obj_with_finaliser (socketpool_socket_obj_t );
231
+ sock -> base .type = & socketpool_socket_type ;
232
+
233
+ if (!socketpool_socket (self , family , type , sock )) {
234
+ mp_raise_RuntimeError (translate ("Out of sockets" ));
235
+ }
236
+ mark_user_socket (sock -> num );
237
+ return sock ;
238
+ }
239
+
65
240
int socketpool_socket_accept (socketpool_socket_obj_t * self , uint8_t * ip , uint32_t * port ) {
66
241
struct sockaddr_in accept_addr ;
67
242
socklen_t socklen = sizeof (accept_addr );
@@ -92,6 +267,10 @@ int socketpool_socket_accept(socketpool_socket_obj_t *self, uint8_t *ip, uint32_
92
267
if (newsoc < 0 ) {
93
268
return - MP_EBADF ;
94
269
}
270
+ if (!register_open_socket (newsoc )) {
271
+ lwip_close (newsoc );
272
+ return - MP_EBADF ;
273
+ }
95
274
return newsoc ;
96
275
}
97
276
@@ -100,17 +279,14 @@ socketpool_socket_obj_t *common_hal_socketpool_socket_accept(socketpool_socket_o
100
279
int newsoc = socketpool_socket_accept (self , ip , port );
101
280
102
281
if (newsoc > 0 ) {
282
+ mark_user_socket (newsoc );
103
283
// Create the socket
104
284
socketpool_socket_obj_t * sock = m_new_obj_with_finaliser (socketpool_socket_obj_t );
105
285
sock -> base .type = & socketpool_socket_type ;
106
286
sock -> num = newsoc ;
107
287
sock -> pool = self -> pool ;
108
288
sock -> connected = true;
109
289
110
- if (!register_open_socket (sock )) {
111
- mp_raise_OSError (MP_EBADF );
112
- }
113
-
114
290
lwip_fcntl (newsoc , F_SETFL , O_NONBLOCK );
115
291
return sock ;
116
292
} else {
@@ -150,18 +326,13 @@ void socketpool_socket_close(socketpool_socket_obj_t *self) {
150
326
if (self -> num >= 0 ) {
151
327
lwip_shutdown (self -> num , SHUT_RDWR );
152
328
lwip_close (self -> num );
329
+ unregister_open_socket (self -> num );
153
330
self -> num = -1 ;
154
331
}
155
332
}
156
333
157
334
void common_hal_socketpool_socket_close (socketpool_socket_obj_t * self ) {
158
335
socketpool_socket_close (self );
159
- // Remove socket record
160
- for (size_t i = 0 ; i < MP_ARRAY_SIZE (open_socket_handles ); i ++ ) {
161
- if (open_socket_handles [i ] == self ) {
162
- open_socket_handles [i ] = NULL ;
163
- }
164
- }
165
336
}
166
337
167
338
void common_hal_socketpool_socket_connect (socketpool_socket_obj_t * self ,
0 commit comments