Skip to content

Commit 9005cb4

Browse files
Merge pull request #507 from adierking/sock-and-roll
event: support socket sources on Windows
2 parents 6d32c4d + 73ba1ce commit 9005cb4

File tree

4 files changed

+228
-25
lines changed

4 files changed

+228
-25
lines changed

src/event/event_windows.c

Lines changed: 188 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ enum _dispatch_windows_port {
3030
DISPATCH_PORT_FILE_HANDLE,
3131
DISPATCH_PORT_PIPE_HANDLE_READ,
3232
DISPATCH_PORT_PIPE_HANDLE_WRITE,
33+
DISPATCH_PORT_SOCKET_READ,
34+
DISPATCH_PORT_SOCKET_WRITE,
3335
};
3436

3537
enum _dispatch_muxnote_events {
@@ -59,13 +61,24 @@ typedef struct dispatch_muxnote_s {
5961
DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID,
6062
DISPATCH_MUXNOTE_HANDLE_TYPE_FILE,
6163
DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE,
64+
DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET,
6265
} dmn_handle_type;
6366
enum _dispatch_muxnote_events dmn_events;
6467

65-
// Used by the pipe monitoring thread
66-
HANDLE dmn_thread;
68+
// For pipes, this event is used to synchronize the monitoring thread with
69+
// I/O completion port processing. For sockets, this is the event used with
70+
// WSAEventSelect().
6771
HANDLE dmn_event;
72+
73+
// Pipe monitoring thread control
74+
HANDLE dmn_thread;
6875
os_atomic(bool) dmn_stop;
76+
77+
// Socket events registered with WSAEventSelect()
78+
long dmn_network_events;
79+
80+
// Threadpool wait handle for socket events
81+
PTP_WAIT dmn_threadpool_wait;
6982
} *dispatch_muxnote_t;
7083

7184
static LIST_HEAD(dispatch_muxnote_bucket_s, dispatch_muxnote_s)
@@ -146,17 +159,10 @@ _dispatch_muxnote_create(dispatch_unote_t du,
146159

147160
case FILE_TYPE_PIPE:
148161
// The specified file is a socket, a named pipe, or an
149-
// anonymous pipe. Use GetNamedPipeInfo() to distinguish between
150-
// a pipe and a socket. Despite its name, it also succeeds for
151-
// anonymous pipes.
152-
if (!GetNamedPipeInfo(handle, NULL, NULL, NULL, NULL)) {
153-
// We'll get ERROR_ACCESS_DENIED for outbound pipes.
154-
if (GetLastError() != ERROR_ACCESS_DENIED) {
155-
// The file is probably a socket.
156-
WIN_PORT_ERROR();
157-
}
158-
}
159-
dmn->dmn_handle_type = DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE;
162+
// anonymous pipe.
163+
dmn->dmn_handle_type = _dispatch_handle_is_socket(handle)
164+
? DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET
165+
: DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE;
160166
break;
161167
}
162168

@@ -183,18 +189,27 @@ _dispatch_muxnote_stop(dispatch_muxnote_t dmn)
183189
CloseHandle(dmn->dmn_thread);
184190
dmn->dmn_thread = NULL;
185191
}
186-
if (dmn->dmn_event) {
187-
CloseHandle(dmn->dmn_event);
188-
dmn->dmn_event = NULL;
192+
if (dmn->dmn_threadpool_wait) {
193+
SetThreadpoolWait(dmn->dmn_threadpool_wait, NULL, NULL);
194+
WaitForThreadpoolWaitCallbacks(dmn->dmn_threadpool_wait,
195+
/* fCancelPendingCallbacks */ FALSE);
196+
CloseThreadpoolWait(dmn->dmn_threadpool_wait);
197+
dmn->dmn_threadpool_wait = NULL;
198+
}
199+
if (dmn->dmn_handle_type == DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET) {
200+
WSAEventSelect((SOCKET)dmn->dmn_ident, NULL, 0);
189201
}
190202
}
191203

192204
static void
193205
_dispatch_muxnote_dispose(dispatch_muxnote_t dmn)
194206
{
195-
if (dmn->dmn_thread) {
207+
if (dmn->dmn_thread || dmn->dmn_threadpool_wait) {
196208
DISPATCH_INTERNAL_CRASH(0, "disposed a muxnote with an active thread");
197209
}
210+
if (dmn->dmn_event) {
211+
CloseHandle(dmn->dmn_event);
212+
}
198213
free(dmn);
199214
}
200215

@@ -300,10 +315,51 @@ _dispatch_pipe_write_availability(HANDLE hPipe)
300315
return fpli.WriteQuotaAvailable;
301316
}
302317

318+
static VOID CALLBACK
319+
_dispatch_socket_callback(PTP_CALLBACK_INSTANCE inst, void *context,
320+
PTP_WAIT pwa, TP_WAIT_RESULT res)
321+
{
322+
dispatch_muxnote_t dmn = (dispatch_muxnote_t)context;
323+
SOCKET sock = (SOCKET)dmn->dmn_ident;
324+
WSANETWORKEVENTS events;
325+
if (WSAEnumNetworkEvents(sock, (WSAEVENT)dmn->dmn_event, &events) == 0) {
326+
long lNetworkEvents = events.lNetworkEvents;
327+
DWORD dwBytesAvailable = 1;
328+
if (lNetworkEvents & FD_CLOSE) {
329+
dwBytesAvailable = 0;
330+
// Post to all registered read and write handlers
331+
lNetworkEvents |= FD_READ | FD_WRITE;
332+
} else if (lNetworkEvents & FD_READ) {
333+
ioctlsocket(sock, FIONREAD, &dwBytesAvailable);
334+
}
335+
if (lNetworkEvents & FD_READ) {
336+
_dispatch_muxnote_retain(dmn);
337+
if (!PostQueuedCompletionStatus(hPort, dwBytesAvailable,
338+
(ULONG_PTR)DISPATCH_PORT_SOCKET_READ, (LPOVERLAPPED)dmn)) {
339+
DISPATCH_INTERNAL_CRASH(GetLastError(),
340+
"PostQueuedCompletionStatus");
341+
}
342+
}
343+
if (lNetworkEvents & FD_WRITE) {
344+
_dispatch_muxnote_retain(dmn);
345+
if (!PostQueuedCompletionStatus(hPort, dwBytesAvailable,
346+
(ULONG_PTR)DISPATCH_PORT_SOCKET_WRITE, (LPOVERLAPPED)dmn)) {
347+
DISPATCH_INTERNAL_CRASH(GetLastError(),
348+
"PostQueuedCompletionStatus");
349+
}
350+
}
351+
} else {
352+
_dispatch_debug("socket[0x%llx]: WSAEnumNetworkEvents() failed (%d)",
353+
(long long)sock, WSAGetLastError());
354+
}
355+
SetThreadpoolWait(pwa, dmn->dmn_event, /* pftTimeout */ NULL);
356+
}
357+
303358
static BOOL
304359
_dispatch_io_trigger(dispatch_muxnote_t dmn)
305360
{
306361
BOOL bSuccess;
362+
long lNetworkEvents;
307363

308364
switch (dmn->dmn_handle_type) {
309365
case DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID:
@@ -321,19 +377,17 @@ _dispatch_io_trigger(dispatch_muxnote_t dmn)
321377
case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE:
322378
if ((dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_READ) &&
323379
!dmn->dmn_thread) {
324-
HANDLE hThread = (HANDLE)_beginthreadex(/* security */ NULL,
380+
dmn->dmn_thread = (HANDLE)_beginthreadex(/* security */ NULL,
325381
/* stack_size */ 1, _dispatch_pipe_monitor_thread,
326382
(void *)dmn, /* initflag */ 0, /* thrdaddr */ NULL);
327-
if (!hThread) {
383+
if (!dmn->dmn_thread) {
328384
DISPATCH_INTERNAL_CRASH(errno, "_beginthread");
329385
}
330-
HANDLE hEvent = CreateEventW(NULL, /* bManualReset */ FALSE,
386+
dmn->dmn_event = CreateEventW(NULL, /* bManualReset */ FALSE,
331387
/* bInitialState */ FALSE, NULL);
332-
if (!hEvent) {
388+
if (!dmn->dmn_event) {
333389
DISPATCH_INTERNAL_CRASH(GetLastError(), "CreateEventW");
334390
}
335-
dmn->dmn_thread = hThread;
336-
dmn->dmn_event = hEvent;
337391
}
338392
if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE) {
339393
_dispatch_muxnote_retain(dmn);
@@ -348,6 +402,59 @@ _dispatch_io_trigger(dispatch_muxnote_t dmn)
348402
}
349403
}
350404
break;
405+
406+
case DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET:
407+
if (!dmn->dmn_event) {
408+
dmn->dmn_event = CreateEventW(NULL, /* bManualReset */ FALSE,
409+
/* bInitialState */ FALSE, NULL);
410+
if (!dmn->dmn_event) {
411+
DISPATCH_INTERNAL_CRASH(GetLastError(), "CreateEventW");
412+
}
413+
}
414+
if (!dmn->dmn_threadpool_wait) {
415+
dmn->dmn_threadpool_wait = CreateThreadpoolWait(
416+
_dispatch_socket_callback, dmn,
417+
/* PTP_CALLBACK_ENVIRON */ NULL);
418+
if (!dmn->dmn_threadpool_wait) {
419+
DISPATCH_INTERNAL_CRASH(GetLastError(), "CreateThreadpoolWait");
420+
}
421+
SetThreadpoolWait(dmn->dmn_threadpool_wait, dmn->dmn_event,
422+
/* pftTimeout */ NULL);
423+
}
424+
lNetworkEvents = FD_CLOSE;
425+
if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_READ) {
426+
lNetworkEvents |= FD_READ;
427+
}
428+
if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE) {
429+
lNetworkEvents |= FD_WRITE;
430+
}
431+
if (dmn->dmn_network_events != lNetworkEvents) {
432+
if (WSAEventSelect((SOCKET)dmn->dmn_ident, (WSAEVENT)dmn->dmn_event,
433+
lNetworkEvents) != 0) {
434+
DISPATCH_INTERNAL_CRASH(WSAGetLastError(), "WSAEventSelect");
435+
}
436+
dmn->dmn_network_events = lNetworkEvents;
437+
}
438+
if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE) {
439+
// FD_WRITE is edge-triggered, not level-triggered, so it will only
440+
// be signaled if the socket becomes writable after a send() fails
441+
// with WSAEWOULDBLOCK. We can work around this by performing a
442+
// zero-byte send(). If the socket is writable, the send() will
443+
// succeed and we can immediately post a packet, and if it isn't, it
444+
// will fail with WSAEWOULDBLOCK and WSAEventSelect() will report
445+
// the next time it becomes available.
446+
if (send((SOCKET)dmn->dmn_ident, "", 0, 0) == 0) {
447+
_dispatch_muxnote_retain(dmn);
448+
bSuccess = PostQueuedCompletionStatus(hPort, 1,
449+
(ULONG_PTR)DISPATCH_PORT_SOCKET_WRITE,
450+
(LPOVERLAPPED)dmn);
451+
if (bSuccess == FALSE) {
452+
DISPATCH_INTERNAL_CRASH(GetLastError(),
453+
"PostQueuedCompletionStatus");
454+
}
455+
}
456+
}
457+
break;
351458
}
352459

353460
return TRUE;
@@ -408,6 +515,7 @@ _dispatch_unote_register_muxed(dispatch_unote_t du)
408515
break;
409516

410517
case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE:
518+
case DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET:
411519
if (events & DISPATCH_MUXNOTE_EVENT_READ) {
412520
LIST_INSERT_HEAD(&dmn->dmn_readers_head, dul, du_link);
413521
} else if (events & DISPATCH_MUXNOTE_EVENT_WRITE) {
@@ -449,6 +557,7 @@ _dispatch_unote_unregister_muxed(dispatch_unote_t du)
449557
break;
450558

451559
case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE:
560+
case DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET:
452561
LIST_REMOVE(dul, du_link);
453562
_LIST_TRASH_ENTRY(dul, du_link);
454563
break;
@@ -532,6 +641,52 @@ _dispatch_event_merge_pipe_handle_write(dispatch_muxnote_t dmn,
532641
_dispatch_muxnote_release(dmn);
533642
}
534643

644+
static void
645+
_dispatch_event_merge_socket(dispatch_unote_t du, DWORD dwBytesAvailable)
646+
{
647+
// consumed by dux_merge_evt()
648+
_dispatch_retain_unote_owner(du);
649+
dispatch_unote_state_t du_state = _dispatch_unote_state(du);
650+
du_state &= ~DU_STATE_ARMED;
651+
uintptr_t data = dwBytesAvailable;
652+
uint32_t flags;
653+
if (dwBytesAvailable > 0) {
654+
flags = EV_ADD | EV_ENABLE | EV_DISPATCH;
655+
} else {
656+
du_state |= DU_STATE_NEEDS_DELETE;
657+
flags = EV_DELETE | EV_DISPATCH;
658+
}
659+
_dispatch_unote_state_set(du, du_state);
660+
os_atomic_store2o(du._dr, ds_pending_data, ~data, relaxed);
661+
dux_merge_evt(du._du, flags, data, 0);
662+
}
663+
664+
static void
665+
_dispatch_event_merge_socket_read(dispatch_muxnote_t dmn,
666+
DWORD dwBytesAvailable)
667+
{
668+
dispatch_unote_linkage_t dul, dul_next;
669+
LIST_FOREACH_SAFE(dul, &dmn->dmn_readers_head, du_link, dul_next) {
670+
dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul);
671+
_dispatch_event_merge_socket(du, dwBytesAvailable);
672+
}
673+
// Retained when posting the completion packet
674+
_dispatch_muxnote_release(dmn);
675+
}
676+
677+
static void
678+
_dispatch_event_merge_socket_write(dispatch_muxnote_t dmn,
679+
DWORD dwBytesAvailable)
680+
{
681+
dispatch_unote_linkage_t dul, dul_next;
682+
LIST_FOREACH_SAFE(dul, &dmn->dmn_writers_head, du_link, dul_next) {
683+
dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul);
684+
_dispatch_event_merge_socket(du, dwBytesAvailable);
685+
}
686+
// Retained when posting the completion packet
687+
_dispatch_muxnote_release(dmn);
688+
}
689+
535690
#pragma mark timers
536691

537692
typedef struct _dispatch_windows_timeout_s {
@@ -716,6 +871,16 @@ _dispatch_event_loop_drain(uint32_t flags)
716871
dwNumberOfBytesTransferred);
717872
break;
718873

874+
case DISPATCH_PORT_SOCKET_READ:
875+
_dispatch_event_merge_socket_read((dispatch_muxnote_t)pOV,
876+
dwNumberOfBytesTransferred);
877+
break;
878+
879+
case DISPATCH_PORT_SOCKET_WRITE:
880+
_dispatch_event_merge_socket_write((dispatch_muxnote_t)pOV,
881+
dwNumberOfBytesTransferred);
882+
break;
883+
719884
default:
720885
DISPATCH_INTERNAL_CRASH(ulCompletionKey,
721886
"unsupported completion key");

src/io.c

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2412,7 +2412,18 @@ _dispatch_operation_perform(dispatch_operation_t op)
24122412
#if defined(_WIN32)
24132413
HANDLE hFile = (HANDLE)op->fd_entry->fd;
24142414
BOOL bSuccess;
2415-
if (GetFileType(hFile) == FILE_TYPE_PIPE) {
2415+
if (_dispatch_handle_is_socket(hFile)) {
2416+
processed = recv((SOCKET)hFile, buf, len, 0);
2417+
if (processed < 0) {
2418+
bSuccess = FALSE;
2419+
err = WSAGetLastError();
2420+
if (err == WSAEWOULDBLOCK) {
2421+
err = EAGAIN;
2422+
}
2423+
goto error;
2424+
}
2425+
bSuccess = TRUE;
2426+
} else if (GetFileType(hFile) == FILE_TYPE_PIPE) {
24162427
OVERLAPPED ovlOverlapped = {};
24172428
DWORD dwTotalBytesAvail;
24182429
bSuccess = PeekNamedPipe(hFile, NULL, 0, NULL,
@@ -2466,7 +2477,18 @@ _dispatch_operation_perform(dispatch_operation_t op)
24662477
#if defined(_WIN32)
24672478
HANDLE hFile = (HANDLE)op->fd_entry->fd;
24682479
BOOL bSuccess;
2469-
if (GetFileType(hFile) == FILE_TYPE_PIPE) {
2480+
if (_dispatch_handle_is_socket(hFile)) {
2481+
processed = send((SOCKET)hFile, buf, len, 0);
2482+
if (processed < 0) {
2483+
bSuccess = FALSE;
2484+
err = WSAGetLastError();
2485+
if (err == WSAEWOULDBLOCK) {
2486+
err = EAGAIN;
2487+
}
2488+
goto error;
2489+
}
2490+
bSuccess = TRUE;
2491+
} else if (GetFileType(hFile) == FILE_TYPE_PIPE) {
24702492
// Unfortunately there isn't a good way to achieve O_NONBLOCK
24712493
// semantics when writing to a pipe. SetNamedPipeHandleState()
24722494
// can allow pipes to be switched into a "no wait" mode, but

src/shims/generic_win_stubs.c

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,19 @@ typedef NTSTATUS (NTAPI *_NtQueryInformationFile_fn_t)(HANDLE FileHandle,
1313
DISPATCH_STATIC_GLOBAL(dispatch_once_t _dispatch_ntdll_pred);
1414
DISPATCH_STATIC_GLOBAL(_NtQueryInformationFile_fn_t _dispatch_NtQueryInformationFile_ptr);
1515

16+
bool
17+
_dispatch_handle_is_socket(HANDLE hFile)
18+
{
19+
// GetFileType() returns FILE_TYPE_PIPE for both pipes and sockets. We can
20+
// disambiguate by checking if PeekNamedPipe() fails with
21+
// ERROR_INVALID_FUNCTION.
22+
if (GetFileType(hFile) == FILE_TYPE_PIPE &&
23+
!PeekNamedPipe(hFile, NULL, 0, NULL, NULL, NULL)) {
24+
return GetLastError() == ERROR_INVALID_FUNCTION;
25+
}
26+
return false;
27+
}
28+
1629
static void
1730
_dispatch_init_precise_time(void *context DISPATCH_UNUSED)
1831
{

src/shims/generic_win_stubs.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#include <stdint.h>
66

7+
#include <winsock2.h>
78
#include <Windows.h>
89
#include <crtdbg.h>
910
#include <ntstatus.h>
@@ -39,6 +40,8 @@ typedef __typeof__(_Generic((__SIZE_TYPE__)0, \
3940

4041
#define strcasecmp _stricmp
4142

43+
bool _dispatch_handle_is_socket(HANDLE hFile);
44+
4245
/*
4346
* Wrappers for dynamically loaded Windows APIs
4447
*/

0 commit comments

Comments
 (0)