Skip to content

RFC: Web Workflow reliability and performance improvements #7836

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 126 additions & 107 deletions ports/espressif/common-hal/socketpool/Socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,70 +45,79 @@

StackType_t socket_select_stack[2 * configMINIMAL_STACK_SIZE];

STATIC int open_socket_fds[CONFIG_LWIP_MAX_SOCKETS];
/* Socket state table:
* 0 := Closed (unused)
* 1 := Open
* 2 := Closing (remove from rfds)
* Index into socket_fd_state is calculated from actual lwip fd. idx := fd - LWIP_SOCKET_OFFSET
*/
#define FDSTATE_CLOSED 0
#define FDSTATE_OPEN 1
#define FDSTATE_CLOSING 2
STATIC uint8_t socket_fd_state[CONFIG_LWIP_MAX_SOCKETS];

STATIC socketpool_socket_obj_t *user_socket[CONFIG_LWIP_MAX_SOCKETS];
StaticTask_t socket_select_task_handle;
StaticTask_t socket_select_task_buffer;
TaskHandle_t socket_select_task_handle;
STATIC int socket_change_fd = -1;

STATIC void socket_select_task(void *arg) {
uint64_t signal;
fd_set readfds;
fd_set excptfds;

while (true) {
fd_set readfds;
fd_set errfds;
FD_ZERO(&readfds);
FD_ZERO(&errfds);
FD_ZERO(&excptfds);
FD_SET(socket_change_fd, &readfds);
FD_SET(socket_change_fd, &errfds);
int max_fd = socket_change_fd;
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
int sockfd = open_socket_fds[i];
if (sockfd < 0) {
continue;
for (size_t i = 0; i < MP_ARRAY_SIZE(socket_fd_state); i++) {
if ((socket_fd_state[i] == FDSTATE_OPEN) && (user_socket[i] == NULL)) {
int sockfd = i + LWIP_SOCKET_OFFSET;
max_fd = MAX(max_fd, sockfd);
FD_SET(sockfd, &readfds);
FD_SET(sockfd, &excptfds);
}
max_fd = MAX(max_fd, sockfd);
FD_SET(sockfd, &readfds);
FD_SET(sockfd, &errfds);
}

int num_triggered = select(max_fd + 1, &readfds, NULL, &errfds, NULL);
// Check for bad file descriptor and queue up the background task before
// circling around.
if (num_triggered == -1 && errno == EBADF) {
// One for the change fd and one for the closed socket.
num_triggered = 2;
}
// Try and find the bad file and remove it from monitoring.
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
int sockfd = open_socket_fds[i];
if (sockfd < 0) {
continue;
}
int err;
int optlen = sizeof(int);
int ret = getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, (socklen_t *)&optlen);
if (ret < 0) {
open_socket_fds[i] = -1;
// Raise num_triggered so that we skip the assert and queue the background task.
num_triggered = 2;
}
int num_triggered = select(max_fd + 1, &readfds, NULL, &excptfds, NULL);
// Hard error (or someone closed a socket on another thread)
if (num_triggered == -1) {
assert(errno == EBADF);
continue;
}
assert(num_triggered >= 0);

assert(num_triggered > 0);
assert(!FD_ISSET(socket_change_fd, &excptfds));

// Notice event trigger
if (FD_ISSET(socket_change_fd, &readfds)) {
read(socket_change_fd, &signal, sizeof(signal));
num_triggered -= 1;
num_triggered--;
}
if (num_triggered > 0) {
supervisor_workflow_request_background();

// Wake up CircuitPython. We know it is asleep because we are lower
// priority.
port_wake_main_task();
// Handle active FDs, close the dead ones
for (size_t i = 0; i < MP_ARRAY_SIZE(socket_fd_state); i++) {
int sockfd = i + LWIP_SOCKET_OFFSET;
if (socket_fd_state[i] != FDSTATE_CLOSED) {
if (FD_ISSET(sockfd, &readfds) || FD_ISSET(sockfd, &excptfds)) {
if (socket_fd_state[i] == FDSTATE_CLOSING) {
socket_fd_state[i] = FDSTATE_CLOSED;
num_triggered--;
}
}
}
}

if (num_triggered > 0) {
// Wake up CircuitPython by queuing request
supervisor_workflow_request_background();
ulTaskNotifyTake(pdTRUE, portMAX_DELAY);
}
}

close(socket_change_fd);
socket_change_fd = -1;
vTaskDelete(NULL);
}

Expand All @@ -117,85 +126,74 @@ void socket_user_reset(void) {
esp_vfs_eventfd_config_t config = ESP_VFS_EVENTD_CONFIG_DEFAULT();
ESP_ERROR_CHECK(esp_vfs_eventfd_register(&config));

for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
open_socket_fds[i] = -1;
// Clear initial socket states
for (size_t i = 0; i < MP_ARRAY_SIZE(socket_fd_state); i++) {
socket_fd_state[i] = FDSTATE_CLOSED;
user_socket[i] = NULL;
}
socket_change_fd = eventfd(0, 0);
// Run this at the same priority as CP so that the web workflow background task can be
// queued while CP is running. Both tasks can still sleep and, therefore, sleep overall.
(void)xTaskCreateStaticPinnedToCore(socket_select_task,
socket_select_task_handle = xTaskCreateStaticPinnedToCore(socket_select_task,
"socket_select",
2 * configMINIMAL_STACK_SIZE,
NULL,
uxTaskPriorityGet(NULL),
socket_select_stack,
&socket_select_task_handle,
&socket_select_task_buffer,
xPortGetCoreID());
} else {
// Not init - close open user sockets
for (size_t i = 0; i < MP_ARRAY_SIZE(socket_fd_state); i++) {
if ((socket_fd_state[i] == FDSTATE_OPEN) && user_socket[i]) {
common_hal_socketpool_socket_close(user_socket[i]);
}
}
}
}

for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
if (open_socket_fds[i] >= 0 && user_socket[i]) {
common_hal_socketpool_socket_close(user_socket[i]);
int num = open_socket_fds[i];
// Close automatically clears socket handle
lwip_shutdown(num, SHUT_RDWR);
lwip_close(num);
open_socket_fds[i] = -1;
user_socket[i] = NULL;
}
// Unblock select task (ok if not blocked yet)
void socketpool_socket_poll_resume(void) {
if (socket_select_task_handle) {
xTaskNotifyGive(socket_select_task_handle);
}
}

// The writes below send an event to the socket select task so that it redoes the
// select with the new open socket set.

STATIC bool register_open_socket(int fd) {
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
if (open_socket_fds[i] == -1) {
open_socket_fds[i] = fd;
user_socket[i] = false;
uint64_t signal = 1;
write(socket_change_fd, &signal, sizeof(signal));
return true;
}
}
return false;
}
if (fd < FD_SETSIZE) {
socket_fd_state[fd - LWIP_SOCKET_OFFSET] = FDSTATE_OPEN;
user_socket[fd - LWIP_SOCKET_OFFSET] = NULL;

STATIC void unregister_open_socket(int fd) {
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
if (open_socket_fds[i] == fd) {
open_socket_fds[i] = -1;
user_socket[i] = false;
// Write must be 8 bytes for an eventfd.
uint64_t signal = 1;
write(socket_change_fd, &signal, sizeof(signal));
return;
}
uint64_t signal = 1;
write(socket_change_fd, &signal, sizeof(signal));
socketpool_socket_poll_resume();
return true;
}
return false;
}

STATIC void mark_user_socket(int fd, socketpool_socket_obj_t *obj) {
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
if (open_socket_fds[i] == fd) {
user_socket[i] = obj;
return;
}
}
socket_fd_state[fd - LWIP_SOCKET_OFFSET] = FDSTATE_OPEN;
user_socket[fd - LWIP_SOCKET_OFFSET] = obj;
// No need to wakeup select task
}

bool socketpool_socket(socketpool_socketpool_obj_t *self,
STATIC bool _socketpool_socket(socketpool_socketpool_obj_t *self,
socketpool_socketpool_addressfamily_t family, socketpool_socketpool_sock_t type,
socketpool_socket_obj_t *sock) {
int addr_family;
int ipproto;
if (family == SOCKETPOOL_AF_INET) {
addr_family = AF_INET;
ipproto = IPPROTO_IP;
#if LWIP_IPV6
} else { // INET6
addr_family = AF_INET6;
ipproto = IPPROTO_IPV6;
#endif
}

int socket_type;
Expand All @@ -218,14 +216,28 @@ bool socketpool_socket(socketpool_socketpool_obj_t *self,
if (socknum < 0) {
return false;
}
// This shouldn't happen since we have room for the same number of sockets as LWIP.
if (!register_open_socket(socknum)) {
lwip_close(socknum);
return false;
}

sock->num = socknum;
// Sockets should be nonblocking in most cases
lwip_fcntl(socknum, F_SETFL, O_NONBLOCK);

return true;
}

// special entry for workflow listener (register system socket)
bool socketpool_socket(socketpool_socketpool_obj_t *self,
socketpool_socketpool_addressfamily_t family, socketpool_socketpool_sock_t type,
socketpool_socket_obj_t *sock) {

if (!_socketpool_socket(self, family, type, sock)) {
return false;
}

// This shouldn't happen since we have room for the same number of sockets as LWIP.
if (!register_open_socket(sock->num)) {
lwip_close(sock->num);
return false;
}
return true;
}

Expand All @@ -238,7 +250,7 @@ socketpool_socket_obj_t *common_hal_socketpool_socket(socketpool_socketpool_obj_
socketpool_socket_obj_t *sock = m_new_obj_with_finaliser(socketpool_socket_obj_t);
sock->base.type = &socketpool_socket_type;

if (!socketpool_socket(self, family, type, sock)) {
if (!_socketpool_socket(self, family, type, sock)) {
mp_raise_RuntimeError(translate("Out of sockets"));
}
mark_user_socket(sock->num, sock);
Expand Down Expand Up @@ -279,17 +291,16 @@ int socketpool_socket_accept(socketpool_socket_obj_t *self, uint8_t *ip, uint32_
// We got a socket. New client socket will not be non-blocking by default, so make it non-blocking.
lwip_fcntl(newsoc, F_SETFL, O_NONBLOCK);

if (!register_open_socket(newsoc)) {
lwip_close(newsoc);
return -MP_EBADF;
}


if (accepted != NULL) {
// Close the active socket because we have another we accepted.
if (!common_hal_socketpool_socket_get_closed(accepted)) {
common_hal_socketpool_socket_close(accepted);
// Error if called with open socket object.
assert(common_hal_socketpool_socket_get_closed(accepted));

// Register if system socket
if (!register_open_socket(newsoc)) {
lwip_close(newsoc);
return -MP_EBADF;
}

// Replace the old accepted socket with the new one.
accepted->num = newsoc;
accepted->pool = self->pool;
Expand Down Expand Up @@ -353,12 +364,21 @@ void socketpool_socket_close(socketpool_socket_obj_t *self) {
return;
}
self->connected = false;
if (self->num >= 0) {
lwip_shutdown(self->num, SHUT_RDWR);
lwip_close(self->num);
unregister_open_socket(self->num);
self->num = -1;
int fd = self->num;
// Ignore bogus/closed sockets
if (fd >= LWIP_SOCKET_OFFSET) {
if (user_socket[fd - LWIP_SOCKET_OFFSET] == NULL) {
socket_fd_state[fd - LWIP_SOCKET_OFFSET] = FDSTATE_CLOSING;
lwip_shutdown(fd, SHUT_RDWR);
lwip_close(fd);
} else {
lwip_shutdown(fd, SHUT_RDWR);
lwip_close(fd);
socket_fd_state[fd - LWIP_SOCKET_OFFSET] = FDSTATE_CLOSED;
user_socket[fd - LWIP_SOCKET_OFFSET] = NULL;
}
}
self->num = -1;
}

void common_hal_socketpool_socket_close(socketpool_socket_obj_t *self) {
Expand Down Expand Up @@ -420,7 +440,7 @@ bool common_hal_socketpool_socket_get_connected(socketpool_socket_obj_t *self) {
}

bool common_hal_socketpool_socket_listen(socketpool_socket_obj_t *self, int backlog) {
return lwip_listen(self->num, backlog);
return lwip_listen(self->num, backlog) == 0;
}

mp_uint_t common_hal_socketpool_socket_recvfrom_into(socketpool_socket_obj_t *self,
Expand Down Expand Up @@ -479,10 +499,9 @@ int socketpool_socket_recv_into(socketpool_socket_obj_t *self,
}
RUN_BACKGROUND_TASKS;
received = lwip_recv(self->num, (void *)buf, len, 0);

// In non-blocking mode, fail instead of looping
if (received == -1 && self->timeout_ms == 0) {
if (errno == ENOTCONN) {
if (received < 1 && self->timeout_ms == 0) {
if ((received == 0) || (errno == ENOTCONN)) {
self->connected = false;
return -MP_ENOTCONN;
}
Expand Down
2 changes: 2 additions & 0 deletions ports/espressif/common-hal/socketpool/Socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,5 @@ typedef struct {
} socketpool_socket_obj_t;

void socket_user_reset(void);
// Unblock workflow socket select thread (platform specific)
void socketpool_socket_poll_resume(void);
3 changes: 3 additions & 0 deletions ports/raspberrypi/common-hal/socketpool/Socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,7 @@ typedef struct _lwip_socket_obj_t {
socketpool_socketpool_obj_t *pool;
} socketpool_socket_obj_t;

// Not required for RPi socket positive callbacks
#define socketpool_socket_poll_resume(x)

void socket_user_reset(void);
Loading