Skip to content

Commit c79c124

Browse files
committed
Web Workflow sockets and threads handling improvements.
Fixes pooling thread looping forever hangs preventing new connections. Don't lose listening sockets on mp resets and re-init. Keep better separation of "system" and "user" sockets. Track socket states to prevent re-use of sockets before closed. Close REST socket when transaction completes. No post-init. Remove unnecessary state flags.
1 parent 4e7bbf7 commit c79c124

File tree

11 files changed

+215
-171
lines changed

11 files changed

+215
-171
lines changed

ports/espressif/common-hal/socketpool/Socket.c

Lines changed: 135 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -45,70 +45,79 @@
4545

4646
StackType_t socket_select_stack[2 * configMINIMAL_STACK_SIZE];
4747

48-
STATIC int open_socket_fds[CONFIG_LWIP_MAX_SOCKETS];
48+
/* Socket state table:
49+
* 0 := Closed (unused)
50+
* 1 := Open
51+
* 2 := Closing (remove from rfds)
52+
* Index into socket_fd_state is calculated from actual lwip fd. idx := fd - LWIP_SOCKET_OFFSET
53+
*/
54+
#define FDSTATE_CLOSED 0
55+
#define FDSTATE_OPEN 1
56+
#define FDSTATE_CLOSING 2
57+
STATIC uint8_t socket_fd_state[CONFIG_LWIP_MAX_SOCKETS];
58+
4959
STATIC socketpool_socket_obj_t *user_socket[CONFIG_LWIP_MAX_SOCKETS];
50-
StaticTask_t socket_select_task_handle;
60+
StaticTask_t socket_select_task_buffer;
61+
TaskHandle_t socket_select_task_handle;
5162
STATIC int socket_change_fd = -1;
5263

5364
STATIC void socket_select_task(void *arg) {
5465
uint64_t signal;
66+
fd_set readfds;
67+
fd_set excptfds;
5568

5669
while (true) {
57-
fd_set readfds;
58-
fd_set errfds;
5970
FD_ZERO(&readfds);
60-
FD_ZERO(&errfds);
71+
FD_ZERO(&excptfds);
6172
FD_SET(socket_change_fd, &readfds);
62-
FD_SET(socket_change_fd, &errfds);
6373
int max_fd = socket_change_fd;
64-
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
65-
int sockfd = open_socket_fds[i];
66-
if (sockfd < 0) {
67-
continue;
74+
for (size_t i = 0; i < MP_ARRAY_SIZE(socket_fd_state); i++) {
75+
if ((socket_fd_state[i] == FDSTATE_OPEN) && (user_socket[i] == NULL)) {
76+
int sockfd = i + LWIP_SOCKET_OFFSET;
77+
max_fd = MAX(max_fd, sockfd);
78+
FD_SET(sockfd, &readfds);
79+
FD_SET(sockfd, &excptfds);
6880
}
69-
max_fd = MAX(max_fd, sockfd);
70-
FD_SET(sockfd, &readfds);
71-
FD_SET(sockfd, &errfds);
7281
}
7382

74-
int num_triggered = select(max_fd + 1, &readfds, NULL, &errfds, NULL);
75-
// Check for bad file descriptor and queue up the background task before
76-
// circling around.
77-
if (num_triggered == -1 && errno == EBADF) {
78-
// One for the change fd and one for the closed socket.
79-
num_triggered = 2;
80-
}
81-
// Try and find the bad file and remove it from monitoring.
82-
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
83-
int sockfd = open_socket_fds[i];
84-
if (sockfd < 0) {
85-
continue;
86-
}
87-
int err;
88-
int optlen = sizeof(int);
89-
int ret = getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, (socklen_t *)&optlen);
90-
if (ret < 0) {
91-
open_socket_fds[i] = -1;
92-
// Raise num_triggered so that we skip the assert and queue the background task.
93-
num_triggered = 2;
94-
}
83+
int num_triggered = select(max_fd + 1, &readfds, NULL, &excptfds, NULL);
84+
// Hard error (or someone closed a socket on another thread)
85+
if (num_triggered == -1) {
86+
assert(errno == EBADF);
87+
continue;
9588
}
96-
assert(num_triggered >= 0);
9789

90+
assert(num_triggered > 0);
91+
assert(!FD_ISSET(socket_change_fd, &excptfds));
92+
93+
// Notice event trigger
9894
if (FD_ISSET(socket_change_fd, &readfds)) {
9995
read(socket_change_fd, &signal, sizeof(signal));
100-
num_triggered -= 1;
96+
num_triggered--;
10197
}
102-
if (num_triggered > 0) {
103-
supervisor_workflow_request_background();
10498

105-
// Wake up CircuitPython. We know it is asleep because we are lower
106-
// priority.
107-
port_wake_main_task();
99+
// Handle active FDs, close the dead ones
100+
for (size_t i = 0; i < MP_ARRAY_SIZE(socket_fd_state); i++) {
101+
int sockfd = i + LWIP_SOCKET_OFFSET;
102+
if (socket_fd_state[i] != FDSTATE_CLOSED) {
103+
if (FD_ISSET(sockfd, &readfds) || FD_ISSET(sockfd, &excptfds)) {
104+
if (socket_fd_state[i] == FDSTATE_CLOSING) {
105+
socket_fd_state[i] = FDSTATE_CLOSED;
106+
num_triggered--;
107+
}
108+
}
109+
}
108110
}
109111

112+
if (num_triggered > 0) {
113+
// Wake up CircuitPython by queuing request
114+
supervisor_workflow_request_background(NULL);
115+
ulTaskNotifyTake(pdTRUE, portMAX_DELAY);
116+
}
110117
}
118+
111119
close(socket_change_fd);
120+
socket_change_fd = -1;
112121
vTaskDelete(NULL);
113122
}
114123

@@ -117,85 +126,86 @@ void socket_user_reset(void) {
117126
esp_vfs_eventfd_config_t config = ESP_VFS_EVENTD_CONFIG_DEFAULT();
118127
ESP_ERROR_CHECK(esp_vfs_eventfd_register(&config));
119128

120-
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
121-
open_socket_fds[i] = -1;
129+
// Clear initial socket states
130+
for (size_t i = 0; i < MP_ARRAY_SIZE(socket_fd_state); i++) {
131+
socket_fd_state[i] = FDSTATE_CLOSED;
122132
user_socket[i] = NULL;
123133
}
124134
socket_change_fd = eventfd(0, 0);
125135
// Run this at the same priority as CP so that the web workflow background task can be
126136
// queued while CP is running. Both tasks can still sleep and, therefore, sleep overall.
127-
(void)xTaskCreateStaticPinnedToCore(socket_select_task,
137+
socket_select_task_handle = xTaskCreateStaticPinnedToCore(socket_select_task,
128138
"socket_select",
129139
2 * configMINIMAL_STACK_SIZE,
130140
NULL,
131141
uxTaskPriorityGet(NULL),
132142
socket_select_stack,
133-
&socket_select_task_handle,
143+
&socket_select_task_buffer,
134144
xPortGetCoreID());
145+
} else {
146+
// Not init - close open user sockets
147+
for (size_t i = 0; i < MP_ARRAY_SIZE(socket_fd_state); i++) {
148+
if ((socket_fd_state[i] == FDSTATE_OPEN) && user_socket[i]) {
149+
common_hal_socketpool_socket_close(user_socket[i]);
150+
}
151+
}
135152
}
153+
}
136154

137-
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
138-
if (open_socket_fds[i] >= 0 && user_socket[i]) {
139-
common_hal_socketpool_socket_close(user_socket[i]);
140-
int num = open_socket_fds[i];
141-
// Close automatically clears socket handle
142-
lwip_shutdown(num, SHUT_RDWR);
143-
lwip_close(num);
144-
open_socket_fds[i] = -1;
145-
user_socket[i] = NULL;
146-
}
155+
// Unblock select task (ok if not blocked yet)
156+
void socketpool_socket_poll_resume(void) {
157+
if (socket_select_task_handle) {
158+
xTaskNotifyGive(socket_select_task_handle);
147159
}
148160
}
149161

150162
// The writes below send an event to the socket select task so that it redoes the
151163
// select with the new open socket set.
152164

153165
STATIC bool register_open_socket(int fd) {
154-
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
155-
if (open_socket_fds[i] == -1) {
156-
open_socket_fds[i] = fd;
157-
user_socket[i] = false;
158-
uint64_t signal = 1;
159-
write(socket_change_fd, &signal, sizeof(signal));
160-
return true;
161-
}
166+
if (fd < FD_SETSIZE) {
167+
socket_fd_state[fd - LWIP_SOCKET_OFFSET] = FDSTATE_OPEN;
168+
user_socket[fd - LWIP_SOCKET_OFFSET] = NULL;
169+
170+
uint64_t signal = 1;
171+
write(socket_change_fd, &signal, sizeof(signal));
172+
socketpool_socket_poll_resume();
173+
return true;
162174
}
163175
return false;
164176
}
165177

178+
#if 0
179+
// This is probably not needed
166180
STATIC void unregister_open_socket(int fd) {
167-
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
168-
if (open_socket_fds[i] == fd) {
169-
open_socket_fds[i] = -1;
170-
user_socket[i] = false;
171-
// Write must be 8 bytes for an eventfd.
172-
uint64_t signal = 1;
173-
write(socket_change_fd, &signal, sizeof(signal));
174-
return;
175-
}
176-
}
181+
socket_fd_state[fd - LWIP_SOCKET_OFFSET] = FDSTATE_CLOSING;
182+
183+
// Write must be 8 bytes for an eventfd.
184+
uint64_t signal = 1;
185+
write(socket_change_fd, &signal, sizeof(signal));
186+
socketpool_socket_poll_resume();
177187
}
188+
#endif
178189

179190
STATIC void mark_user_socket(int fd, socketpool_socket_obj_t *obj) {
180-
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
181-
if (open_socket_fds[i] == fd) {
182-
user_socket[i] = obj;
183-
return;
184-
}
185-
}
191+
socket_fd_state[fd - LWIP_SOCKET_OFFSET] = FDSTATE_OPEN;
192+
user_socket[fd - LWIP_SOCKET_OFFSET] = obj;
193+
// No need to wakeup select task
186194
}
187195

188-
bool socketpool_socket(socketpool_socketpool_obj_t *self,
196+
STATIC bool _socketpool_socket(socketpool_socketpool_obj_t *self,
189197
socketpool_socketpool_addressfamily_t family, socketpool_socketpool_sock_t type,
190198
socketpool_socket_obj_t *sock) {
191199
int addr_family;
192200
int ipproto;
193201
if (family == SOCKETPOOL_AF_INET) {
194202
addr_family = AF_INET;
195203
ipproto = IPPROTO_IP;
204+
#if LWIP_IPV6
196205
} else { // INET6
197206
addr_family = AF_INET6;
198207
ipproto = IPPROTO_IPV6;
208+
#endif
199209
}
200210

201211
int socket_type;
@@ -218,14 +228,28 @@ bool socketpool_socket(socketpool_socketpool_obj_t *self,
218228
if (socknum < 0) {
219229
return false;
220230
}
221-
// This shouldn't happen since we have room for the same number of sockets as LWIP.
222-
if (!register_open_socket(socknum)) {
223-
lwip_close(socknum);
224-
return false;
225-
}
231+
226232
sock->num = socknum;
227233
// Sockets should be nonblocking in most cases
228234
lwip_fcntl(socknum, F_SETFL, O_NONBLOCK);
235+
236+
return true;
237+
}
238+
239+
// special entry for workflow listener (register system socket)
240+
bool socketpool_socket(socketpool_socketpool_obj_t *self,
241+
socketpool_socketpool_addressfamily_t family, socketpool_socketpool_sock_t type,
242+
socketpool_socket_obj_t *sock) {
243+
244+
if (!_socketpool_socket(self, family, type, sock)) {
245+
return false;
246+
}
247+
248+
// This shouldn't happen since we have room for the same number of sockets as LWIP.
249+
if (!register_open_socket(sock->num)) {
250+
lwip_close(sock->num);
251+
return false;
252+
}
229253
return true;
230254
}
231255

@@ -238,7 +262,7 @@ socketpool_socket_obj_t *common_hal_socketpool_socket(socketpool_socketpool_obj_
238262
socketpool_socket_obj_t *sock = m_new_obj_with_finaliser(socketpool_socket_obj_t);
239263
sock->base.type = &socketpool_socket_type;
240264

241-
if (!socketpool_socket(self, family, type, sock)) {
265+
if (!_socketpool_socket(self, family, type, sock)) {
242266
mp_raise_RuntimeError(translate("Out of sockets"));
243267
}
244268
mark_user_socket(sock->num, sock);
@@ -279,17 +303,16 @@ int socketpool_socket_accept(socketpool_socket_obj_t *self, uint8_t *ip, uint32_
279303
// We got a socket. New client socket will not be non-blocking by default, so make it non-blocking.
280304
lwip_fcntl(newsoc, F_SETFL, O_NONBLOCK);
281305

282-
if (!register_open_socket(newsoc)) {
283-
lwip_close(newsoc);
284-
return -MP_EBADF;
285-
}
286-
287-
288306
if (accepted != NULL) {
289-
// Close the active socket because we have another we accepted.
290-
if (!common_hal_socketpool_socket_get_closed(accepted)) {
291-
common_hal_socketpool_socket_close(accepted);
307+
// Error if called with open socket object.
308+
assert(common_hal_socketpool_socket_get_closed(accepted));
309+
310+
// Register if system socket
311+
if (!register_open_socket(newsoc)) {
312+
lwip_close(newsoc);
313+
return -MP_EBADF;
292314
}
315+
293316
// Replace the old accepted socket with the new one.
294317
accepted->num = newsoc;
295318
accepted->pool = self->pool;
@@ -353,12 +376,21 @@ void socketpool_socket_close(socketpool_socket_obj_t *self) {
353376
return;
354377
}
355378
self->connected = false;
356-
if (self->num >= 0) {
357-
lwip_shutdown(self->num, SHUT_RDWR);
358-
lwip_close(self->num);
359-
unregister_open_socket(self->num);
360-
self->num = -1;
379+
int fd = self->num;
380+
// Ignore bogus/closed sockets
381+
if (fd >= LWIP_SOCKET_OFFSET) {
382+
if (user_socket[fd - LWIP_SOCKET_OFFSET] == NULL) {
383+
socket_fd_state[fd - LWIP_SOCKET_OFFSET] = FDSTATE_CLOSING;
384+
lwip_shutdown(fd, SHUT_RDWR);
385+
lwip_close(fd);
386+
} else {
387+
lwip_shutdown(fd, SHUT_RDWR);
388+
lwip_close(fd);
389+
socket_fd_state[fd - LWIP_SOCKET_OFFSET] = FDSTATE_CLOSED;
390+
user_socket[fd - LWIP_SOCKET_OFFSET] = NULL;
391+
}
361392
}
393+
self->num = -1;
362394
}
363395

364396
void common_hal_socketpool_socket_close(socketpool_socket_obj_t *self) {
@@ -420,7 +452,7 @@ bool common_hal_socketpool_socket_get_connected(socketpool_socket_obj_t *self) {
420452
}
421453

422454
bool common_hal_socketpool_socket_listen(socketpool_socket_obj_t *self, int backlog) {
423-
return lwip_listen(self->num, backlog);
455+
return lwip_listen(self->num, backlog) == 0;
424456
}
425457

426458
mp_uint_t common_hal_socketpool_socket_recvfrom_into(socketpool_socket_obj_t *self,
@@ -479,10 +511,9 @@ int socketpool_socket_recv_into(socketpool_socket_obj_t *self,
479511
}
480512
RUN_BACKGROUND_TASKS;
481513
received = lwip_recv(self->num, (void *)buf, len, 0);
482-
483514
// In non-blocking mode, fail instead of looping
484-
if (received == -1 && self->timeout_ms == 0) {
485-
if (errno == ENOTCONN) {
515+
if (received < 1 && self->timeout_ms == 0) {
516+
if ((received == 0) || (errno == ENOTCONN)) {
486517
self->connected = false;
487518
return -MP_ENOTCONN;
488519
}

ports/espressif/common-hal/socketpool/Socket.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,5 @@ typedef struct {
4848
} socketpool_socket_obj_t;
4949

5050
void socket_user_reset(void);
51+
// Unblock workflow socket select thread (platform specific)
52+
void socketpool_socket_poll_resume(void);

ports/espressif/common-hal/wifi/__init__.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ void wifi_reset(void) {
239239
radio->ap_netif = NULL;
240240
wifi_inited = false;
241241
#endif
242-
supervisor_workflow_request_background();
242+
supervisor_workflow_request_background(NULL);
243243
}
244244

245245
void ipaddress_ipaddress_to_esp_idf(mp_obj_t ip_address, ip_addr_t *esp_ip_address) {

ports/raspberrypi/common-hal/socketpool/Socket.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ static inline void exec_user_callback(socketpool_socket_obj_t *socket) {
166166
mp_sched_schedule(socket->callback, MP_OBJ_FROM_PTR(socket));
167167
}
168168
#endif
169-
supervisor_workflow_request_background();
169+
supervisor_workflow_request_background(NULL);
170170
}
171171

172172
#if MICROPY_PY_LWIP_SOCK_RAW

ports/raspberrypi/common-hal/socketpool/Socket.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,7 @@ typedef struct _lwip_socket_obj_t {
7575
socketpool_socketpool_obj_t *pool;
7676
} socketpool_socket_obj_t;
7777

78+
// Not required for RPi socket positive callbacks
79+
#define socketpool_socket_poll_resume(x)
80+
7881
void socket_user_reset(void);

0 commit comments

Comments
 (0)