Skip to content

event: support socket sources on Windows #507

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 188 additions & 23 deletions src/event/event_windows.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ enum _dispatch_windows_port {
DISPATCH_PORT_FILE_HANDLE,
DISPATCH_PORT_PIPE_HANDLE_READ,
DISPATCH_PORT_PIPE_HANDLE_WRITE,
DISPATCH_PORT_SOCKET_READ,
DISPATCH_PORT_SOCKET_WRITE,
};

enum _dispatch_muxnote_events {
Expand Down Expand Up @@ -59,13 +61,24 @@ typedef struct dispatch_muxnote_s {
DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID,
DISPATCH_MUXNOTE_HANDLE_TYPE_FILE,
DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE,
DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET,
} dmn_handle_type;
enum _dispatch_muxnote_events dmn_events;

// Used by the pipe monitoring thread
HANDLE dmn_thread;
// For pipes, this event is used to synchronize the monitoring thread with
// I/O completion port processing. For sockets, this is the event used with
// WSAEventSelect().
HANDLE dmn_event;

// Pipe monitoring thread control
HANDLE dmn_thread;
os_atomic(bool) dmn_stop;

// Socket events registered with WSAEventSelect()
long dmn_network_events;

// Threadpool wait handle for socket events
PTP_WAIT dmn_threadpool_wait;
} *dispatch_muxnote_t;

static LIST_HEAD(dispatch_muxnote_bucket_s, dispatch_muxnote_s)
Expand Down Expand Up @@ -146,17 +159,10 @@ _dispatch_muxnote_create(dispatch_unote_t du,

case FILE_TYPE_PIPE:
// The specified file is a socket, a named pipe, or an
// anonymous pipe. Use GetNamedPipeInfo() to distinguish between
// a pipe and a socket. Despite its name, it also succeeds for
// anonymous pipes.
if (!GetNamedPipeInfo(handle, NULL, NULL, NULL, NULL)) {
// We'll get ERROR_ACCESS_DENIED for outbound pipes.
if (GetLastError() != ERROR_ACCESS_DENIED) {
// The file is probably a socket.
WIN_PORT_ERROR();
}
}
dmn->dmn_handle_type = DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE;
// anonymous pipe.
dmn->dmn_handle_type = _dispatch_handle_is_socket(handle)
? DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET
: DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE;
break;
}

Expand All @@ -183,18 +189,27 @@ _dispatch_muxnote_stop(dispatch_muxnote_t dmn)
CloseHandle(dmn->dmn_thread);
dmn->dmn_thread = NULL;
}
if (dmn->dmn_event) {
CloseHandle(dmn->dmn_event);
dmn->dmn_event = NULL;
if (dmn->dmn_threadpool_wait) {
SetThreadpoolWait(dmn->dmn_threadpool_wait, NULL, NULL);
WaitForThreadpoolWaitCallbacks(dmn->dmn_threadpool_wait,
/* fCancelPendingCallbacks */ FALSE);
CloseThreadpoolWait(dmn->dmn_threadpool_wait);
dmn->dmn_threadpool_wait = NULL;
}
if (dmn->dmn_handle_type == DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET) {
WSAEventSelect((SOCKET)dmn->dmn_ident, NULL, 0);
}
}

static void
_dispatch_muxnote_dispose(dispatch_muxnote_t dmn)
{
if (dmn->dmn_thread) {
if (dmn->dmn_thread || dmn->dmn_threadpool_wait) {
DISPATCH_INTERNAL_CRASH(0, "disposed a muxnote with an active thread");
}
if (dmn->dmn_event) {
CloseHandle(dmn->dmn_event);
}
free(dmn);
}

Expand Down Expand Up @@ -300,10 +315,51 @@ _dispatch_pipe_write_availability(HANDLE hPipe)
return fpli.WriteQuotaAvailable;
}

static VOID CALLBACK
_dispatch_socket_callback(PTP_CALLBACK_INSTANCE inst, void *context,
PTP_WAIT pwa, TP_WAIT_RESULT res)
{
dispatch_muxnote_t dmn = (dispatch_muxnote_t)context;
SOCKET sock = (SOCKET)dmn->dmn_ident;
WSANETWORKEVENTS events;
if (WSAEnumNetworkEvents(sock, (WSAEVENT)dmn->dmn_event, &events) == 0) {
long lNetworkEvents = events.lNetworkEvents;
DWORD dwBytesAvailable = 1;
if (lNetworkEvents & FD_CLOSE) {
dwBytesAvailable = 0;
// Post to all registered read and write handlers
lNetworkEvents |= FD_READ | FD_WRITE;
} else if (lNetworkEvents & FD_READ) {
ioctlsocket(sock, FIONREAD, &dwBytesAvailable);
}
if (lNetworkEvents & FD_READ) {
_dispatch_muxnote_retain(dmn);
if (!PostQueuedCompletionStatus(hPort, dwBytesAvailable,
(ULONG_PTR)DISPATCH_PORT_SOCKET_READ, (LPOVERLAPPED)dmn)) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"PostQueuedCompletionStatus");
}
}
if (lNetworkEvents & FD_WRITE) {
_dispatch_muxnote_retain(dmn);
if (!PostQueuedCompletionStatus(hPort, dwBytesAvailable,
(ULONG_PTR)DISPATCH_PORT_SOCKET_WRITE, (LPOVERLAPPED)dmn)) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"PostQueuedCompletionStatus");
}
}
} else {
_dispatch_debug("socket[0x%llx]: WSAEnumNetworkEvents() failed (%d)",
(long long)sock, WSAGetLastError());
}
SetThreadpoolWait(pwa, dmn->dmn_event, /* pftTimeout */ NULL);
}

static BOOL
_dispatch_io_trigger(dispatch_muxnote_t dmn)
{
BOOL bSuccess;
long lNetworkEvents;

switch (dmn->dmn_handle_type) {
case DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID:
Expand All @@ -321,19 +377,17 @@ _dispatch_io_trigger(dispatch_muxnote_t dmn)
case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE:
if ((dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_READ) &&
!dmn->dmn_thread) {
HANDLE hThread = (HANDLE)_beginthreadex(/* security */ NULL,
dmn->dmn_thread = (HANDLE)_beginthreadex(/* security */ NULL,
/* stack_size */ 1, _dispatch_pipe_monitor_thread,
(void *)dmn, /* initflag */ 0, /* thrdaddr */ NULL);
if (!hThread) {
if (!dmn->dmn_thread) {
DISPATCH_INTERNAL_CRASH(errno, "_beginthread");
}
HANDLE hEvent = CreateEventW(NULL, /* bManualReset */ FALSE,
dmn->dmn_event = CreateEventW(NULL, /* bManualReset */ FALSE,
/* bInitialState */ FALSE, NULL);
if (!hEvent) {
if (!dmn->dmn_event) {
DISPATCH_INTERNAL_CRASH(GetLastError(), "CreateEventW");
}
dmn->dmn_thread = hThread;
dmn->dmn_event = hEvent;
}
if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE) {
_dispatch_muxnote_retain(dmn);
Expand All @@ -348,6 +402,59 @@ _dispatch_io_trigger(dispatch_muxnote_t dmn)
}
}
break;

case DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET:
if (!dmn->dmn_event) {
dmn->dmn_event = CreateEventW(NULL, /* bManualReset */ FALSE,
/* bInitialState */ FALSE, NULL);
if (!dmn->dmn_event) {
DISPATCH_INTERNAL_CRASH(GetLastError(), "CreateEventW");
}
}
if (!dmn->dmn_threadpool_wait) {
dmn->dmn_threadpool_wait = CreateThreadpoolWait(
_dispatch_socket_callback, dmn,
/* PTP_CALLBACK_ENVIRON */ NULL);
if (!dmn->dmn_threadpool_wait) {
DISPATCH_INTERNAL_CRASH(GetLastError(), "CreateThreadpoolWait");
}
SetThreadpoolWait(dmn->dmn_threadpool_wait, dmn->dmn_event,
/* pftTimeout */ NULL);
}
lNetworkEvents = FD_CLOSE;
if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_READ) {
lNetworkEvents |= FD_READ;
}
if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE) {
lNetworkEvents |= FD_WRITE;
}
if (dmn->dmn_network_events != lNetworkEvents) {
if (WSAEventSelect((SOCKET)dmn->dmn_ident, (WSAEVENT)dmn->dmn_event,
lNetworkEvents) != 0) {
DISPATCH_INTERNAL_CRASH(WSAGetLastError(), "WSAEventSelect");
}
dmn->dmn_network_events = lNetworkEvents;
}
if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE) {
// FD_WRITE is edge-triggered, not level-triggered, so it will only
// be signaled if the socket becomes writable after a send() fails
// with WSAEWOULDBLOCK. We can work around this by performing a
// zero-byte send(). If the socket is writable, the send() will
// succeed and we can immediately post a packet, and if it isn't, it
// will fail with WSAEWOULDBLOCK and WSAEventSelect() will report
// the next time it becomes available.
if (send((SOCKET)dmn->dmn_ident, "", 0, 0) == 0) {
_dispatch_muxnote_retain(dmn);
bSuccess = PostQueuedCompletionStatus(hPort, 1,
(ULONG_PTR)DISPATCH_PORT_SOCKET_WRITE,
(LPOVERLAPPED)dmn);
if (bSuccess == FALSE) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"PostQueuedCompletionStatus");
}
}
}
break;
}

return TRUE;
Expand Down Expand Up @@ -408,6 +515,7 @@ _dispatch_unote_register_muxed(dispatch_unote_t du)
break;

case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE:
case DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET:
if (events & DISPATCH_MUXNOTE_EVENT_READ) {
LIST_INSERT_HEAD(&dmn->dmn_readers_head, dul, du_link);
} else if (events & DISPATCH_MUXNOTE_EVENT_WRITE) {
Expand Down Expand Up @@ -449,6 +557,7 @@ _dispatch_unote_unregister_muxed(dispatch_unote_t du)
break;

case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE:
case DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET:
LIST_REMOVE(dul, du_link);
_LIST_TRASH_ENTRY(dul, du_link);
break;
Expand Down Expand Up @@ -532,6 +641,52 @@ _dispatch_event_merge_pipe_handle_write(dispatch_muxnote_t dmn,
_dispatch_muxnote_release(dmn);
}

static void
_dispatch_event_merge_socket(dispatch_unote_t du, DWORD dwBytesAvailable)
{
// consumed by dux_merge_evt()
_dispatch_retain_unote_owner(du);
dispatch_unote_state_t du_state = _dispatch_unote_state(du);
du_state &= ~DU_STATE_ARMED;
uintptr_t data = dwBytesAvailable;
uint32_t flags;
if (dwBytesAvailable > 0) {
flags = EV_ADD | EV_ENABLE | EV_DISPATCH;
} else {
du_state |= DU_STATE_NEEDS_DELETE;
flags = EV_DELETE | EV_DISPATCH;
}
_dispatch_unote_state_set(du, du_state);
os_atomic_store2o(du._dr, ds_pending_data, ~data, relaxed);
dux_merge_evt(du._du, flags, data, 0);
}

static void
_dispatch_event_merge_socket_read(dispatch_muxnote_t dmn,
DWORD dwBytesAvailable)
{
dispatch_unote_linkage_t dul, dul_next;
LIST_FOREACH_SAFE(dul, &dmn->dmn_readers_head, du_link, dul_next) {
dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul);
_dispatch_event_merge_socket(du, dwBytesAvailable);
}
// Retained when posting the completion packet
_dispatch_muxnote_release(dmn);
}

static void
_dispatch_event_merge_socket_write(dispatch_muxnote_t dmn,
DWORD dwBytesAvailable)
{
dispatch_unote_linkage_t dul, dul_next;
LIST_FOREACH_SAFE(dul, &dmn->dmn_writers_head, du_link, dul_next) {
dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul);
_dispatch_event_merge_socket(du, dwBytesAvailable);
}
// Retained when posting the completion packet
_dispatch_muxnote_release(dmn);
}

#pragma mark timers

typedef struct _dispatch_windows_timeout_s {
Expand Down Expand Up @@ -716,6 +871,16 @@ _dispatch_event_loop_drain(uint32_t flags)
dwNumberOfBytesTransferred);
break;

case DISPATCH_PORT_SOCKET_READ:
_dispatch_event_merge_socket_read((dispatch_muxnote_t)pOV,
dwNumberOfBytesTransferred);
break;

case DISPATCH_PORT_SOCKET_WRITE:
_dispatch_event_merge_socket_write((dispatch_muxnote_t)pOV,
dwNumberOfBytesTransferred);
break;

default:
DISPATCH_INTERNAL_CRASH(ulCompletionKey,
"unsupported completion key");
Expand Down
26 changes: 24 additions & 2 deletions src/io.c
Original file line number Diff line number Diff line change
Expand Up @@ -2412,7 +2412,18 @@ _dispatch_operation_perform(dispatch_operation_t op)
#if defined(_WIN32)
HANDLE hFile = (HANDLE)op->fd_entry->fd;
BOOL bSuccess;
if (GetFileType(hFile) == FILE_TYPE_PIPE) {
if (_dispatch_handle_is_socket(hFile)) {
processed = recv((SOCKET)hFile, buf, len, 0);
if (processed < 0) {
bSuccess = FALSE;
err = WSAGetLastError();
if (err == WSAEWOULDBLOCK) {
err = EAGAIN;
}
goto error;
}
bSuccess = TRUE;
} else if (GetFileType(hFile) == FILE_TYPE_PIPE) {
OVERLAPPED ovlOverlapped = {};
DWORD dwTotalBytesAvail;
bSuccess = PeekNamedPipe(hFile, NULL, 0, NULL,
Expand Down Expand Up @@ -2466,7 +2477,18 @@ _dispatch_operation_perform(dispatch_operation_t op)
#if defined(_WIN32)
HANDLE hFile = (HANDLE)op->fd_entry->fd;
BOOL bSuccess;
if (GetFileType(hFile) == FILE_TYPE_PIPE) {
if (_dispatch_handle_is_socket(hFile)) {
processed = send((SOCKET)hFile, buf, len, 0);
if (processed < 0) {
bSuccess = FALSE;
err = WSAGetLastError();
if (err == WSAEWOULDBLOCK) {
err = EAGAIN;
}
goto error;
}
bSuccess = TRUE;
} else if (GetFileType(hFile) == FILE_TYPE_PIPE) {
// Unfortunately there isn't a good way to achieve O_NONBLOCK
// semantics when writing to a pipe. SetNamedPipeHandleState()
// can allow pipes to be switched into a "no wait" mode, but
Expand Down
13 changes: 13 additions & 0 deletions src/shims/generic_win_stubs.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@ typedef NTSTATUS (NTAPI *_NtQueryInformationFile_fn_t)(HANDLE FileHandle,
DISPATCH_STATIC_GLOBAL(dispatch_once_t _dispatch_ntdll_pred);
DISPATCH_STATIC_GLOBAL(_NtQueryInformationFile_fn_t _dispatch_NtQueryInformationFile_ptr);

bool
_dispatch_handle_is_socket(HANDLE hFile)
{
// GetFileType() returns FILE_TYPE_PIPE for both pipes and sockets. We can
// disambiguate by checking if PeekNamedPipe() fails with
// ERROR_INVALID_FUNCTION.
if (GetFileType(hFile) == FILE_TYPE_PIPE &&
!PeekNamedPipe(hFile, NULL, 0, NULL, NULL, NULL)) {
return GetLastError() == ERROR_INVALID_FUNCTION;
}
return false;
}

static void
_dispatch_init_precise_time(void *context DISPATCH_UNUSED)
{
Expand Down
3 changes: 3 additions & 0 deletions src/shims/generic_win_stubs.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <stdint.h>

#include <winsock2.h>
#include <Windows.h>
#include <crtdbg.h>
#include <ntstatus.h>
Expand Down Expand Up @@ -39,6 +40,8 @@ typedef __typeof__(_Generic((__SIZE_TYPE__)0, \

#define strcasecmp _stricmp

bool _dispatch_handle_is_socket(HANDLE hFile);

/*
* Wrappers for dynamically loaded Windows APIs
*/
Expand Down