Skip to content

Retry send if only some bytes sent #6742

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions ports/espressif/supervisor/port.c
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,6 @@ void reset_port(void) {

reset_all_pins();

// A larger delay so the idle task can run and do any IDF cleanup needed.
vTaskDelay(4);

#if CIRCUITPY_ANALOGIO
analogout_reset();
#endif
Expand Down Expand Up @@ -402,6 +399,9 @@ void reset_port(void) {
#if CIRCUITPY_WATCHDOG
watchdog_reset();
#endif

// Yield so the idle task can run and do any IDF cleanup needed.
port_yield();
}

void reset_to_bootloader(void) {
Expand Down Expand Up @@ -492,6 +492,10 @@ void port_wake_main_task_from_isr() {
}
}

void port_yield() {
vTaskDelay(4);
}

void sleep_timer_cb(void *arg) {
port_wake_main_task();
}
Expand Down
5 changes: 5 additions & 0 deletions supervisor/port.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ void port_wake_main_task(void);
// default weak implementation is provided that does nothing.
void port_wake_main_task_from_isr(void);

// Some ports may use real RTOS tasks besides the background task framework of
// CircuitPython. Calling this will yield to other tasks and then return to the
// CircuitPython task when others are done.
void port_yield(void);

// Some ports need special handling just after completing boot.py execution.
// This function is called once while boot.py's VM is still valid, and
// then a second time after the VM is finalized.
Expand Down
3 changes: 3 additions & 0 deletions supervisor/shared/port.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ MP_WEAK void port_wake_main_task(void) {

MP_WEAK void port_wake_main_task_from_isr(void) {
}

MP_WEAK void port_yield(void) {
}
32 changes: 21 additions & 11 deletions supervisor/shared/web_workflow/web_workflow.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "shared/timeutils/timeutils.h"
#include "supervisor/fatfs_port.h"
#include "supervisor/filesystem.h"
#include "supervisor/port.h"
#include "supervisor/shared/reload.h"
#include "supervisor/shared/translate/translate.h"
#include "supervisor/shared/web_workflow/web_workflow.h"
Expand Down Expand Up @@ -323,22 +324,31 @@ void supervisor_start_web_workflow(void) {
#endif
}

static void _send_raw(socketpool_socket_obj_t *socket, const uint8_t *buf, int len) {
void web_workflow_send_raw(socketpool_socket_obj_t *socket, const uint8_t *buf, int len) {
int total_sent = 0;
int sent = -EAGAIN;
while (sent == -EAGAIN && common_hal_socketpool_socket_get_connected(socket)) {
sent = socketpool_socket_send(socket, buf, len);
while ((sent == -EAGAIN || (sent > 0 && total_sent < len)) &&
common_hal_socketpool_socket_get_connected(socket)) {
sent = socketpool_socket_send(socket, buf + total_sent, len - total_sent);
if (sent > 0) {
total_sent += sent;
if (total_sent < len) {
// Yield so that network code can run.
port_yield();
}
}
}
if (sent < len) {
if (total_sent < len) {
ESP_LOGE(TAG, "short send %d %d", sent, len);
}
}

STATIC void _print_raw(void *env, const char *str, size_t len) {
_send_raw((socketpool_socket_obj_t *)env, (const uint8_t *)str, (size_t)len);
web_workflow_send_raw((socketpool_socket_obj_t *)env, (const uint8_t *)str, (size_t)len);
}

static void _send_str(socketpool_socket_obj_t *socket, const char *str) {
_send_raw(socket, (const uint8_t *)str, strlen(str));
web_workflow_send_raw(socket, (const uint8_t *)str, strlen(str));
}

// The last argument must be NULL! Otherwise, it won't stop.
Expand All @@ -357,15 +367,15 @@ static void _send_strs(socketpool_socket_obj_t *socket, ...) {
static void _send_chunk(socketpool_socket_obj_t *socket, const char *chunk) {
mp_print_t _socket_print = {socket, _print_raw};
mp_printf(&_socket_print, "%X\r\n", strlen(chunk));
_send_raw(socket, (const uint8_t *)chunk, strlen(chunk));
_send_raw(socket, (const uint8_t *)"\r\n", 2);
web_workflow_send_raw(socket, (const uint8_t *)chunk, strlen(chunk));
web_workflow_send_raw(socket, (const uint8_t *)"\r\n", 2);
}

STATIC void _print_chunk(void *env, const char *str, size_t len) {
mp_print_t _socket_print = {env, _print_raw};
mp_printf(&_socket_print, "%X\r\n", len);
_send_raw((socketpool_socket_obj_t *)env, (const uint8_t *)str, len);
_send_raw((socketpool_socket_obj_t *)env, (const uint8_t *)"\r\n", 2);
web_workflow_send_raw((socketpool_socket_obj_t *)env, (const uint8_t *)str, len);
web_workflow_send_raw((socketpool_socket_obj_t *)env, (const uint8_t *)"\r\n", 2);
}

// A bit of a misnomer because it sends all arguments as one chunk.
Expand Down Expand Up @@ -938,7 +948,7 @@ static void _reply_static(socketpool_socket_obj_t *socket, _request *request, co
"Content-Length: ", encoded_len, "\r\n",
"Content-Type: ", content_type, "\r\n",
"\r\n", NULL);
_send_raw(socket, response, response_len);
web_workflow_send_raw(socket, response, response_len);
}

#define _REPLY_STATIC(socket, request, filename) _reply_static(socket, request, filename, filename##_length, filename##_content_type)
Expand Down
5 changes: 5 additions & 0 deletions supervisor/shared/web_workflow/web_workflow.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@

#include <stdbool.h>

#include "shared-bindings/socketpool/Socket.h"

// This background function should be called repeatedly. It cannot be done based
// on events.
void supervisor_web_workflow_background(void);
bool supervisor_web_workflow_status_dirty(void);
void supervisor_web_workflow_status(void);
void supervisor_start_web_workflow(void);
void supervisor_stop_web_workflow(void);

// To share with websocket.
void web_workflow_send_raw(socketpool_socket_obj_t *socket, const uint8_t *buf, int len);
28 changes: 11 additions & 17 deletions supervisor/shared/web_workflow/websocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "py/runtime.h"
#include "shared/runtime/interrupt_char.h"
#include "supervisor/shared/title_bar.h"
#include "supervisor/shared/web_workflow/web_workflow.h"

// TODO: Remove ESP specific stuff. For now, it is useful as we refine the server.
#include "esp_log.h"
Expand Down Expand Up @@ -91,16 +92,6 @@ static bool _read_byte(uint8_t *c) {
return true;
}

static void _send_raw(socketpool_socket_obj_t *socket, const uint8_t *buf, int len) {
int sent = -EAGAIN;
while (sent == -EAGAIN) {
sent = socketpool_socket_send(socket, buf, len);
}
if (sent < len) {
ESP_LOGE(TAG, "short send on %d err %d len %d", socket->num, sent, len);
}
}

static void _read_next_frame_header(void) {
uint8_t h;
if (cp_serial.frame_index == 0 && _read_byte(&h)) {
Expand Down Expand Up @@ -159,14 +150,14 @@ static void _read_next_frame_header(void) {
ESP_LOGE(TAG, "CLOSE or PING has long payload");
}
frame_header[1] = cp_serial.payload_remaining;
_send_raw(&cp_serial.socket, (const uint8_t *)frame_header, 2);
web_workflow_send_raw(&cp_serial.socket, (const uint8_t *)frame_header, 2);
}

if (cp_serial.payload_remaining > 0 && _read_byte(&h)) {
// Send the payload back to the client.
cp_serial.frame_index++;
cp_serial.payload_remaining--;
_send_raw(&cp_serial.socket, &h, 1);
web_workflow_send_raw(&cp_serial.socket, &h, 1);
}

if (cp_serial.payload_remaining == 0) {
Expand Down Expand Up @@ -231,30 +222,33 @@ static void _websocket_send(_websocket *ws, const char *text, size_t len) {
payload_len = 127;
}
frame_header[1] = payload_len;
_send_raw(&ws->socket, (const uint8_t *)frame_header, 2);
web_workflow_send_raw(&ws->socket, (const uint8_t *)frame_header, 2);
uint8_t extended_len[4];
if (payload_len == 126) {
extended_len[0] = (len >> 8) & 0xff;
extended_len[1] = len & 0xff;
_send_raw(&ws->socket, extended_len, 2);
web_workflow_send_raw(&ws->socket, extended_len, 2);
} else if (payload_len == 127) {
uint32_t zero = 0;
// 64 bits where top four bytes are zero.
_send_raw(&ws->socket, (const uint8_t *)&zero, 4);
web_workflow_send_raw(&ws->socket, (const uint8_t *)&zero, 4);
extended_len[0] = (len >> 24) & 0xff;
extended_len[1] = (len >> 16) & 0xff;
extended_len[2] = (len >> 8) & 0xff;
extended_len[3] = len & 0xff;
_send_raw(&ws->socket, extended_len, 4);
web_workflow_send_raw(&ws->socket, extended_len, 4);
}
_send_raw(&ws->socket, (const uint8_t *)text, len);
web_workflow_send_raw(&ws->socket, (const uint8_t *)text, len);
}

void websocket_write(const char *text, size_t len) {
_websocket_send(&cp_serial, text, len);
}

void websocket_background(void) {
if (!websocket_connected()) {
return;
}
uint8_t c;
while (ringbuf_num_empty(&_incoming_ringbuf) > 0 &&
_read_next_payload_byte(&c)) {
Expand Down