Skip to content

Commit 9825b7f

Browse files
committed
Web Workflow sockets and threads handling improvements.
Fixes polling thread looping forever hangs preventing new connections. Don't lose listening sockets on mp resets and re-init. Keep better separation of "system" and "user" sockets. Track socket states to prevent re-use of sockets before closed. Close REST socket when transaction completes. No post-init. Remove unnecessary state flags.
1 parent 4e7bbf7 commit 9825b7f

File tree

7 files changed

+198
-181
lines changed

7 files changed

+198
-181
lines changed

ports/espressif/common-hal/socketpool/Socket.c

Lines changed: 126 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -45,70 +45,79 @@
4545

4646
StackType_t socket_select_stack[2 * configMINIMAL_STACK_SIZE];
4747

48-
STATIC int open_socket_fds[CONFIG_LWIP_MAX_SOCKETS];
48+
/* Socket state table:
49+
* 0 := Closed (unused)
50+
* 1 := Open
51+
* 2 := Closing (remove from rfds)
52+
* Index into socket_fd_state is calculated from actual lwip fd. idx := fd - LWIP_SOCKET_OFFSET
53+
*/
54+
#define FDSTATE_CLOSED 0
55+
#define FDSTATE_OPEN 1
56+
#define FDSTATE_CLOSING 2
57+
STATIC uint8_t socket_fd_state[CONFIG_LWIP_MAX_SOCKETS];
58+
4959
STATIC socketpool_socket_obj_t *user_socket[CONFIG_LWIP_MAX_SOCKETS];
50-
StaticTask_t socket_select_task_handle;
60+
StaticTask_t socket_select_task_buffer;
61+
TaskHandle_t socket_select_task_handle;
5162
STATIC int socket_change_fd = -1;
5263

5364
STATIC void socket_select_task(void *arg) {
5465
uint64_t signal;
66+
fd_set readfds;
67+
fd_set excptfds;
5568

5669
while (true) {
57-
fd_set readfds;
58-
fd_set errfds;
5970
FD_ZERO(&readfds);
60-
FD_ZERO(&errfds);
71+
FD_ZERO(&excptfds);
6172
FD_SET(socket_change_fd, &readfds);
62-
FD_SET(socket_change_fd, &errfds);
6373
int max_fd = socket_change_fd;
64-
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
65-
int sockfd = open_socket_fds[i];
66-
if (sockfd < 0) {
67-
continue;
74+
for (size_t i = 0; i < MP_ARRAY_SIZE(socket_fd_state); i++) {
75+
if ((socket_fd_state[i] == FDSTATE_OPEN) && (user_socket[i] == NULL)) {
76+
int sockfd = i + LWIP_SOCKET_OFFSET;
77+
max_fd = MAX(max_fd, sockfd);
78+
FD_SET(sockfd, &readfds);
79+
FD_SET(sockfd, &excptfds);
6880
}
69-
max_fd = MAX(max_fd, sockfd);
70-
FD_SET(sockfd, &readfds);
71-
FD_SET(sockfd, &errfds);
7281
}
7382

74-
int num_triggered = select(max_fd + 1, &readfds, NULL, &errfds, NULL);
75-
// Check for bad file descriptor and queue up the background task before
76-
// circling around.
77-
if (num_triggered == -1 && errno == EBADF) {
78-
// One for the change fd and one for the closed socket.
79-
num_triggered = 2;
80-
}
81-
// Try and find the bad file and remove it from monitoring.
82-
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
83-
int sockfd = open_socket_fds[i];
84-
if (sockfd < 0) {
85-
continue;
86-
}
87-
int err;
88-
int optlen = sizeof(int);
89-
int ret = getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, (socklen_t *)&optlen);
90-
if (ret < 0) {
91-
open_socket_fds[i] = -1;
92-
// Raise num_triggered so that we skip the assert and queue the background task.
93-
num_triggered = 2;
94-
}
83+
int num_triggered = select(max_fd + 1, &readfds, NULL, &excptfds, NULL);
84+
// Hard error (or someone closed a socket on another thread)
85+
if (num_triggered == -1) {
86+
assert(errno == EBADF);
87+
continue;
9588
}
96-
assert(num_triggered >= 0);
9789

90+
assert(num_triggered > 0);
91+
assert(!FD_ISSET(socket_change_fd, &excptfds));
92+
93+
// Notice event trigger
9894
if (FD_ISSET(socket_change_fd, &readfds)) {
9995
read(socket_change_fd, &signal, sizeof(signal));
100-
num_triggered -= 1;
96+
num_triggered--;
10197
}
102-
if (num_triggered > 0) {
103-
supervisor_workflow_request_background();
10498

105-
// Wake up CircuitPython. We know it is asleep because we are lower
106-
// priority.
107-
port_wake_main_task();
99+
// Handle active FDs, close the dead ones
100+
for (size_t i = 0; i < MP_ARRAY_SIZE(socket_fd_state); i++) {
101+
int sockfd = i + LWIP_SOCKET_OFFSET;
102+
if (socket_fd_state[i] != FDSTATE_CLOSED) {
103+
if (FD_ISSET(sockfd, &readfds) || FD_ISSET(sockfd, &excptfds)) {
104+
if (socket_fd_state[i] == FDSTATE_CLOSING) {
105+
socket_fd_state[i] = FDSTATE_CLOSED;
106+
num_triggered--;
107+
}
108+
}
109+
}
108110
}
109111

112+
if (num_triggered > 0) {
113+
// Wake up CircuitPython by queuing request
114+
supervisor_workflow_request_background();
115+
ulTaskNotifyTake(pdTRUE, portMAX_DELAY);
116+
}
110117
}
118+
111119
close(socket_change_fd);
120+
socket_change_fd = -1;
112121
vTaskDelete(NULL);
113122
}
114123

@@ -117,85 +126,74 @@ void socket_user_reset(void) {
117126
esp_vfs_eventfd_config_t config = ESP_VFS_EVENTD_CONFIG_DEFAULT();
118127
ESP_ERROR_CHECK(esp_vfs_eventfd_register(&config));
119128

120-
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
121-
open_socket_fds[i] = -1;
129+
// Clear initial socket states
130+
for (size_t i = 0; i < MP_ARRAY_SIZE(socket_fd_state); i++) {
131+
socket_fd_state[i] = FDSTATE_CLOSED;
122132
user_socket[i] = NULL;
123133
}
124134
socket_change_fd = eventfd(0, 0);
125135
// Run this at the same priority as CP so that the web workflow background task can be
126136
// queued while CP is running. Both tasks can still sleep and, therefore, sleep overall.
127-
(void)xTaskCreateStaticPinnedToCore(socket_select_task,
137+
socket_select_task_handle = xTaskCreateStaticPinnedToCore(socket_select_task,
128138
"socket_select",
129139
2 * configMINIMAL_STACK_SIZE,
130140
NULL,
131141
uxTaskPriorityGet(NULL),
132142
socket_select_stack,
133-
&socket_select_task_handle,
143+
&socket_select_task_buffer,
134144
xPortGetCoreID());
145+
} else {
146+
// Not init - close open user sockets
147+
for (size_t i = 0; i < MP_ARRAY_SIZE(socket_fd_state); i++) {
148+
if ((socket_fd_state[i] == FDSTATE_OPEN) && user_socket[i]) {
149+
common_hal_socketpool_socket_close(user_socket[i]);
150+
}
151+
}
135152
}
153+
}
136154

137-
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
138-
if (open_socket_fds[i] >= 0 && user_socket[i]) {
139-
common_hal_socketpool_socket_close(user_socket[i]);
140-
int num = open_socket_fds[i];
141-
// Close automatically clears socket handle
142-
lwip_shutdown(num, SHUT_RDWR);
143-
lwip_close(num);
144-
open_socket_fds[i] = -1;
145-
user_socket[i] = NULL;
146-
}
155+
// Unblock select task (ok if not blocked yet)
156+
void socketpool_socket_poll_resume(void) {
157+
if (socket_select_task_handle) {
158+
xTaskNotifyGive(socket_select_task_handle);
147159
}
148160
}
149161

150162
// The writes below send an event to the socket select task so that it redoes the
151163
// select with the new open socket set.
152164

153165
STATIC bool register_open_socket(int fd) {
154-
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
155-
if (open_socket_fds[i] == -1) {
156-
open_socket_fds[i] = fd;
157-
user_socket[i] = false;
158-
uint64_t signal = 1;
159-
write(socket_change_fd, &signal, sizeof(signal));
160-
return true;
161-
}
162-
}
163-
return false;
164-
}
166+
if (fd < FD_SETSIZE) {
167+
socket_fd_state[fd - LWIP_SOCKET_OFFSET] = FDSTATE_OPEN;
168+
user_socket[fd - LWIP_SOCKET_OFFSET] = NULL;
165169

166-
STATIC void unregister_open_socket(int fd) {
167-
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
168-
if (open_socket_fds[i] == fd) {
169-
open_socket_fds[i] = -1;
170-
user_socket[i] = false;
171-
// Write must be 8 bytes for an eventfd.
172-
uint64_t signal = 1;
173-
write(socket_change_fd, &signal, sizeof(signal));
174-
return;
175-
}
170+
uint64_t signal = 1;
171+
write(socket_change_fd, &signal, sizeof(signal));
172+
socketpool_socket_poll_resume();
173+
return true;
176174
}
175+
return false;
177176
}
178177

179178
STATIC void mark_user_socket(int fd, socketpool_socket_obj_t *obj) {
180-
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
181-
if (open_socket_fds[i] == fd) {
182-
user_socket[i] = obj;
183-
return;
184-
}
185-
}
179+
socket_fd_state[fd - LWIP_SOCKET_OFFSET] = FDSTATE_OPEN;
180+
user_socket[fd - LWIP_SOCKET_OFFSET] = obj;
181+
// No need to wakeup select task
186182
}
187183

188-
bool socketpool_socket(socketpool_socketpool_obj_t *self,
184+
STATIC bool _socketpool_socket(socketpool_socketpool_obj_t *self,
189185
socketpool_socketpool_addressfamily_t family, socketpool_socketpool_sock_t type,
190186
socketpool_socket_obj_t *sock) {
191187
int addr_family;
192188
int ipproto;
193189
if (family == SOCKETPOOL_AF_INET) {
194190
addr_family = AF_INET;
195191
ipproto = IPPROTO_IP;
192+
#if LWIP_IPV6
196193
} else { // INET6
197194
addr_family = AF_INET6;
198195
ipproto = IPPROTO_IPV6;
196+
#endif
199197
}
200198

201199
int socket_type;
@@ -218,14 +216,28 @@ bool socketpool_socket(socketpool_socketpool_obj_t *self,
218216
if (socknum < 0) {
219217
return false;
220218
}
221-
// This shouldn't happen since we have room for the same number of sockets as LWIP.
222-
if (!register_open_socket(socknum)) {
223-
lwip_close(socknum);
224-
return false;
225-
}
219+
226220
sock->num = socknum;
227221
// Sockets should be nonblocking in most cases
228222
lwip_fcntl(socknum, F_SETFL, O_NONBLOCK);
223+
224+
return true;
225+
}
226+
227+
// special entry for workflow listener (register system socket)
228+
bool socketpool_socket(socketpool_socketpool_obj_t *self,
229+
socketpool_socketpool_addressfamily_t family, socketpool_socketpool_sock_t type,
230+
socketpool_socket_obj_t *sock) {
231+
232+
if (!_socketpool_socket(self, family, type, sock)) {
233+
return false;
234+
}
235+
236+
// This shouldn't happen since we have room for the same number of sockets as LWIP.
237+
if (!register_open_socket(sock->num)) {
238+
lwip_close(sock->num);
239+
return false;
240+
}
229241
return true;
230242
}
231243

@@ -238,7 +250,7 @@ socketpool_socket_obj_t *common_hal_socketpool_socket(socketpool_socketpool_obj_
238250
socketpool_socket_obj_t *sock = m_new_obj_with_finaliser(socketpool_socket_obj_t);
239251
sock->base.type = &socketpool_socket_type;
240252

241-
if (!socketpool_socket(self, family, type, sock)) {
253+
if (!_socketpool_socket(self, family, type, sock)) {
242254
mp_raise_RuntimeError(translate("Out of sockets"));
243255
}
244256
mark_user_socket(sock->num, sock);
@@ -279,17 +291,16 @@ int socketpool_socket_accept(socketpool_socket_obj_t *self, uint8_t *ip, uint32_
279291
// We got a socket. New client socket will not be non-blocking by default, so make it non-blocking.
280292
lwip_fcntl(newsoc, F_SETFL, O_NONBLOCK);
281293

282-
if (!register_open_socket(newsoc)) {
283-
lwip_close(newsoc);
284-
return -MP_EBADF;
285-
}
286-
287-
288294
if (accepted != NULL) {
289-
// Close the active socket because we have another we accepted.
290-
if (!common_hal_socketpool_socket_get_closed(accepted)) {
291-
common_hal_socketpool_socket_close(accepted);
295+
// Error if called with open socket object.
296+
assert(common_hal_socketpool_socket_get_closed(accepted));
297+
298+
// Register if system socket
299+
if (!register_open_socket(newsoc)) {
300+
lwip_close(newsoc);
301+
return -MP_EBADF;
292302
}
303+
293304
// Replace the old accepted socket with the new one.
294305
accepted->num = newsoc;
295306
accepted->pool = self->pool;
@@ -353,12 +364,21 @@ void socketpool_socket_close(socketpool_socket_obj_t *self) {
353364
return;
354365
}
355366
self->connected = false;
356-
if (self->num >= 0) {
357-
lwip_shutdown(self->num, SHUT_RDWR);
358-
lwip_close(self->num);
359-
unregister_open_socket(self->num);
360-
self->num = -1;
367+
int fd = self->num;
368+
// Ignore bogus/closed sockets
369+
if (fd >= LWIP_SOCKET_OFFSET) {
370+
if (user_socket[fd - LWIP_SOCKET_OFFSET] == NULL) {
371+
socket_fd_state[fd - LWIP_SOCKET_OFFSET] = FDSTATE_CLOSING;
372+
lwip_shutdown(fd, SHUT_RDWR);
373+
lwip_close(fd);
374+
} else {
375+
lwip_shutdown(fd, SHUT_RDWR);
376+
lwip_close(fd);
377+
socket_fd_state[fd - LWIP_SOCKET_OFFSET] = FDSTATE_CLOSED;
378+
user_socket[fd - LWIP_SOCKET_OFFSET] = NULL;
379+
}
361380
}
381+
self->num = -1;
362382
}
363383

364384
void common_hal_socketpool_socket_close(socketpool_socket_obj_t *self) {
@@ -420,7 +440,7 @@ bool common_hal_socketpool_socket_get_connected(socketpool_socket_obj_t *self) {
420440
}
421441

422442
bool common_hal_socketpool_socket_listen(socketpool_socket_obj_t *self, int backlog) {
423-
return lwip_listen(self->num, backlog);
443+
return lwip_listen(self->num, backlog) == 0;
424444
}
425445

426446
mp_uint_t common_hal_socketpool_socket_recvfrom_into(socketpool_socket_obj_t *self,
@@ -479,10 +499,9 @@ int socketpool_socket_recv_into(socketpool_socket_obj_t *self,
479499
}
480500
RUN_BACKGROUND_TASKS;
481501
received = lwip_recv(self->num, (void *)buf, len, 0);
482-
483502
// In non-blocking mode, fail instead of looping
484-
if (received == -1 && self->timeout_ms == 0) {
485-
if (errno == ENOTCONN) {
503+
if (received < 1 && self->timeout_ms == 0) {
504+
if ((received == 0) || (errno == ENOTCONN)) {
486505
self->connected = false;
487506
return -MP_ENOTCONN;
488507
}

ports/espressif/common-hal/socketpool/Socket.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,5 @@ typedef struct {
4848
} socketpool_socket_obj_t;
4949

5050
void socket_user_reset(void);
51+
// Unblock workflow socket select thread (platform specific)
52+
void socketpool_socket_poll_resume(void);

ports/raspberrypi/common-hal/socketpool/Socket.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,7 @@ typedef struct _lwip_socket_obj_t {
7575
socketpool_socketpool_obj_t *pool;
7676
} socketpool_socket_obj_t;
7777

78+
// Not required for RPi socket positive callbacks
79+
#define socketpool_socket_poll_resume(x)
80+
7881
void socket_user_reset(void);

0 commit comments

Comments
 (0)