@@ -30,6 +30,8 @@ enum _dispatch_windows_port {
30
30
DISPATCH_PORT_FILE_HANDLE ,
31
31
DISPATCH_PORT_PIPE_HANDLE_READ ,
32
32
DISPATCH_PORT_PIPE_HANDLE_WRITE ,
33
+ DISPATCH_PORT_SOCKET_READ ,
34
+ DISPATCH_PORT_SOCKET_WRITE ,
33
35
};
34
36
35
37
enum _dispatch_muxnote_events {
@@ -59,13 +61,24 @@ typedef struct dispatch_muxnote_s {
59
61
DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID ,
60
62
DISPATCH_MUXNOTE_HANDLE_TYPE_FILE ,
61
63
DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE ,
64
+ DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET ,
62
65
} dmn_handle_type ;
63
66
enum _dispatch_muxnote_events dmn_events ;
64
67
65
- // Used by the pipe monitoring thread
66
- HANDLE dmn_thread ;
68
+ // For pipes, this event is used to synchronize the monitoring thread with
69
+ // I/O completion port processing. For sockets, this is the event used with
70
+ // WSAEventSelect().
67
71
HANDLE dmn_event ;
72
+
73
+ // Pipe monitoring thread control
74
+ HANDLE dmn_thread ;
68
75
os_atomic (bool ) dmn_stop ;
76
+
77
+ // Socket events registered with WSAEventSelect()
78
+ long dmn_network_events ;
79
+
80
+ // Threadpool wait handle for socket events
81
+ PTP_WAIT dmn_threadpool_wait ;
69
82
} * dispatch_muxnote_t ;
70
83
71
84
static LIST_HEAD (dispatch_muxnote_bucket_s , dispatch_muxnote_s )
@@ -146,17 +159,10 @@ _dispatch_muxnote_create(dispatch_unote_t du,
146
159
147
160
case FILE_TYPE_PIPE :
148
161
// The specified file is a socket, a named pipe, or an
149
- // anonymous pipe. Use GetNamedPipeInfo() to distinguish between
150
- // a pipe and a socket. Despite its name, it also succeeds for
151
- // anonymous pipes.
152
- if (!GetNamedPipeInfo (handle , NULL , NULL , NULL , NULL )) {
153
- // We'll get ERROR_ACCESS_DENIED for outbound pipes.
154
- if (GetLastError () != ERROR_ACCESS_DENIED ) {
155
- // The file is probably a socket.
156
- WIN_PORT_ERROR ();
157
- }
158
- }
159
- dmn -> dmn_handle_type = DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE ;
162
+ // anonymous pipe.
163
+ dmn -> dmn_handle_type = _dispatch_handle_is_socket (handle )
164
+ ? DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET
165
+ : DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE ;
160
166
break ;
161
167
}
162
168
@@ -183,18 +189,27 @@ _dispatch_muxnote_stop(dispatch_muxnote_t dmn)
183
189
CloseHandle (dmn -> dmn_thread );
184
190
dmn -> dmn_thread = NULL ;
185
191
}
186
- if (dmn -> dmn_event ) {
187
- CloseHandle (dmn -> dmn_event );
188
- dmn -> dmn_event = NULL ;
192
+ if (dmn -> dmn_threadpool_wait ) {
193
+ SetThreadpoolWait (dmn -> dmn_threadpool_wait , NULL , NULL );
194
+ WaitForThreadpoolWaitCallbacks (dmn -> dmn_threadpool_wait ,
195
+ /* fCancelPendingCallbacks */ FALSE);
196
+ CloseThreadpoolWait (dmn -> dmn_threadpool_wait );
197
+ dmn -> dmn_threadpool_wait = NULL ;
198
+ }
199
+ if (dmn -> dmn_handle_type == DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET ) {
200
+ WSAEventSelect ((SOCKET )dmn -> dmn_ident , NULL , 0 );
189
201
}
190
202
}
191
203
192
204
static void
193
205
_dispatch_muxnote_dispose (dispatch_muxnote_t dmn )
194
206
{
195
- if (dmn -> dmn_thread ) {
207
+ if (dmn -> dmn_thread || dmn -> dmn_threadpool_wait ) {
196
208
DISPATCH_INTERNAL_CRASH (0 , "disposed a muxnote with an active thread" );
197
209
}
210
+ if (dmn -> dmn_event ) {
211
+ CloseHandle (dmn -> dmn_event );
212
+ }
198
213
free (dmn );
199
214
}
200
215
@@ -300,10 +315,51 @@ _dispatch_pipe_write_availability(HANDLE hPipe)
300
315
return fpli .WriteQuotaAvailable ;
301
316
}
302
317
318
+ static VOID CALLBACK
319
+ _dispatch_socket_callback (PTP_CALLBACK_INSTANCE inst , void * context ,
320
+ PTP_WAIT pwa , TP_WAIT_RESULT res )
321
+ {
322
+ dispatch_muxnote_t dmn = (dispatch_muxnote_t )context ;
323
+ SOCKET sock = (SOCKET )dmn -> dmn_ident ;
324
+ WSANETWORKEVENTS events ;
325
+ if (WSAEnumNetworkEvents (sock , (WSAEVENT )dmn -> dmn_event , & events ) == 0 ) {
326
+ long lNetworkEvents = events .lNetworkEvents ;
327
+ DWORD dwBytesAvailable = 1 ;
328
+ if (lNetworkEvents & FD_CLOSE ) {
329
+ dwBytesAvailable = 0 ;
330
+ // Post to all registered read and write handlers
331
+ lNetworkEvents |= FD_READ | FD_WRITE ;
332
+ } else if (lNetworkEvents & FD_READ ) {
333
+ ioctlsocket (sock , FIONREAD , & dwBytesAvailable );
334
+ }
335
+ if (lNetworkEvents & FD_READ ) {
336
+ _dispatch_muxnote_retain (dmn );
337
+ if (!PostQueuedCompletionStatus (hPort , dwBytesAvailable ,
338
+ (ULONG_PTR )DISPATCH_PORT_SOCKET_READ , (LPOVERLAPPED )dmn )) {
339
+ DISPATCH_INTERNAL_CRASH (GetLastError (),
340
+ "PostQueuedCompletionStatus" );
341
+ }
342
+ }
343
+ if (lNetworkEvents & FD_WRITE ) {
344
+ _dispatch_muxnote_retain (dmn );
345
+ if (!PostQueuedCompletionStatus (hPort , dwBytesAvailable ,
346
+ (ULONG_PTR )DISPATCH_PORT_SOCKET_WRITE , (LPOVERLAPPED )dmn )) {
347
+ DISPATCH_INTERNAL_CRASH (GetLastError (),
348
+ "PostQueuedCompletionStatus" );
349
+ }
350
+ }
351
+ } else {
352
+ _dispatch_debug ("socket[0x%llx]: WSAEnumNetworkEvents() failed (%d)" ,
353
+ (long long )sock , WSAGetLastError ());
354
+ }
355
+ SetThreadpoolWait (pwa , dmn -> dmn_event , /* pftTimeout */ NULL );
356
+ }
357
+
303
358
static BOOL
304
359
_dispatch_io_trigger (dispatch_muxnote_t dmn )
305
360
{
306
361
BOOL bSuccess ;
362
+ long lNetworkEvents ;
307
363
308
364
switch (dmn -> dmn_handle_type ) {
309
365
case DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID :
@@ -321,19 +377,17 @@ _dispatch_io_trigger(dispatch_muxnote_t dmn)
321
377
case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE :
322
378
if ((dmn -> dmn_events & DISPATCH_MUXNOTE_EVENT_READ ) &&
323
379
!dmn -> dmn_thread ) {
324
- HANDLE hThread = (HANDLE )_beginthreadex (/* security */ NULL ,
380
+ dmn -> dmn_thread = (HANDLE )_beginthreadex (/* security */ NULL ,
325
381
/* stack_size */ 1 , _dispatch_pipe_monitor_thread ,
326
382
(void * )dmn , /* initflag */ 0 , /* thrdaddr */ NULL );
327
- if (!hThread ) {
383
+ if (!dmn -> dmn_thread ) {
328
384
DISPATCH_INTERNAL_CRASH (errno , "_beginthread" );
329
385
}
330
- HANDLE hEvent = CreateEventW (NULL , /* bManualReset */ FALSE,
386
+ dmn -> dmn_event = CreateEventW (NULL , /* bManualReset */ FALSE,
331
387
/* bInitialState */ FALSE, NULL );
332
- if (!hEvent ) {
388
+ if (!dmn -> dmn_event ) {
333
389
DISPATCH_INTERNAL_CRASH (GetLastError (), "CreateEventW" );
334
390
}
335
- dmn -> dmn_thread = hThread ;
336
- dmn -> dmn_event = hEvent ;
337
391
}
338
392
if (dmn -> dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE ) {
339
393
_dispatch_muxnote_retain (dmn );
@@ -348,6 +402,59 @@ _dispatch_io_trigger(dispatch_muxnote_t dmn)
348
402
}
349
403
}
350
404
break ;
405
+
406
+ case DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET :
407
+ if (!dmn -> dmn_event ) {
408
+ dmn -> dmn_event = CreateEventW (NULL , /* bManualReset */ FALSE,
409
+ /* bInitialState */ FALSE, NULL );
410
+ if (!dmn -> dmn_event ) {
411
+ DISPATCH_INTERNAL_CRASH (GetLastError (), "CreateEventW" );
412
+ }
413
+ }
414
+ if (!dmn -> dmn_threadpool_wait ) {
415
+ dmn -> dmn_threadpool_wait = CreateThreadpoolWait (
416
+ _dispatch_socket_callback , dmn ,
417
+ /* PTP_CALLBACK_ENVIRON */ NULL );
418
+ if (!dmn -> dmn_threadpool_wait ) {
419
+ DISPATCH_INTERNAL_CRASH (GetLastError (), "CreateThreadpoolWait" );
420
+ }
421
+ SetThreadpoolWait (dmn -> dmn_threadpool_wait , dmn -> dmn_event ,
422
+ /* pftTimeout */ NULL );
423
+ }
424
+ lNetworkEvents = FD_CLOSE ;
425
+ if (dmn -> dmn_events & DISPATCH_MUXNOTE_EVENT_READ ) {
426
+ lNetworkEvents |= FD_READ ;
427
+ }
428
+ if (dmn -> dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE ) {
429
+ lNetworkEvents |= FD_WRITE ;
430
+ }
431
+ if (dmn -> dmn_network_events != lNetworkEvents ) {
432
+ if (WSAEventSelect ((SOCKET )dmn -> dmn_ident , (WSAEVENT )dmn -> dmn_event ,
433
+ lNetworkEvents ) != 0 ) {
434
+ DISPATCH_INTERNAL_CRASH (WSAGetLastError (), "WSAEventSelect" );
435
+ }
436
+ dmn -> dmn_network_events = lNetworkEvents ;
437
+ }
438
+ if (dmn -> dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE ) {
439
+ // FD_WRITE is edge-triggered, not level-triggered, so it will only
440
+ // be signaled if the socket becomes writable after a send() fails
441
+ // with WSAEWOULDBLOCK. We can work around this by performing a
442
+ // zero-byte send(). If the socket is writable, the send() will
443
+ // succeed and we can immediately post a packet, and if it isn't, it
444
+ // will fail with WSAEWOULDBLOCK and WSAEventSelect() will report
445
+ // the next time it becomes available.
446
+ if (send ((SOCKET )dmn -> dmn_ident , "" , 0 , 0 ) == 0 ) {
447
+ _dispatch_muxnote_retain (dmn );
448
+ bSuccess = PostQueuedCompletionStatus (hPort , 1 ,
449
+ (ULONG_PTR )DISPATCH_PORT_SOCKET_WRITE ,
450
+ (LPOVERLAPPED )dmn );
451
+ if (bSuccess == FALSE) {
452
+ DISPATCH_INTERNAL_CRASH (GetLastError (),
453
+ "PostQueuedCompletionStatus" );
454
+ }
455
+ }
456
+ }
457
+ break ;
351
458
}
352
459
353
460
return TRUE;
@@ -408,6 +515,7 @@ _dispatch_unote_register_muxed(dispatch_unote_t du)
408
515
break ;
409
516
410
517
case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE :
518
+ case DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET :
411
519
if (events & DISPATCH_MUXNOTE_EVENT_READ ) {
412
520
LIST_INSERT_HEAD (& dmn -> dmn_readers_head , dul , du_link );
413
521
} else if (events & DISPATCH_MUXNOTE_EVENT_WRITE ) {
@@ -449,6 +557,7 @@ _dispatch_unote_unregister_muxed(dispatch_unote_t du)
449
557
break ;
450
558
451
559
case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE :
560
+ case DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET :
452
561
LIST_REMOVE (dul , du_link );
453
562
_LIST_TRASH_ENTRY (dul , du_link );
454
563
break ;
@@ -532,6 +641,52 @@ _dispatch_event_merge_pipe_handle_write(dispatch_muxnote_t dmn,
532
641
_dispatch_muxnote_release (dmn );
533
642
}
534
643
644
+ static void
645
+ _dispatch_event_merge_socket (dispatch_unote_t du , DWORD dwBytesAvailable )
646
+ {
647
+ // consumed by dux_merge_evt()
648
+ _dispatch_retain_unote_owner (du );
649
+ dispatch_unote_state_t du_state = _dispatch_unote_state (du );
650
+ du_state &= ~DU_STATE_ARMED ;
651
+ uintptr_t data = dwBytesAvailable ;
652
+ uint32_t flags ;
653
+ if (dwBytesAvailable > 0 ) {
654
+ flags = EV_ADD | EV_ENABLE | EV_DISPATCH ;
655
+ } else {
656
+ du_state |= DU_STATE_NEEDS_DELETE ;
657
+ flags = EV_DELETE | EV_DISPATCH ;
658
+ }
659
+ _dispatch_unote_state_set (du , du_state );
660
+ os_atomic_store2o (du ._dr , ds_pending_data , ~data , relaxed );
661
+ dux_merge_evt (du ._du , flags , data , 0 );
662
+ }
663
+
664
+ static void
665
+ _dispatch_event_merge_socket_read (dispatch_muxnote_t dmn ,
666
+ DWORD dwBytesAvailable )
667
+ {
668
+ dispatch_unote_linkage_t dul , dul_next ;
669
+ LIST_FOREACH_SAFE (dul , & dmn -> dmn_readers_head , du_link , dul_next ) {
670
+ dispatch_unote_t du = _dispatch_unote_linkage_get_unote (dul );
671
+ _dispatch_event_merge_socket (du , dwBytesAvailable );
672
+ }
673
+ // Retained when posting the completion packet
674
+ _dispatch_muxnote_release (dmn );
675
+ }
676
+
677
+ static void
678
+ _dispatch_event_merge_socket_write (dispatch_muxnote_t dmn ,
679
+ DWORD dwBytesAvailable )
680
+ {
681
+ dispatch_unote_linkage_t dul , dul_next ;
682
+ LIST_FOREACH_SAFE (dul , & dmn -> dmn_writers_head , du_link , dul_next ) {
683
+ dispatch_unote_t du = _dispatch_unote_linkage_get_unote (dul );
684
+ _dispatch_event_merge_socket (du , dwBytesAvailable );
685
+ }
686
+ // Retained when posting the completion packet
687
+ _dispatch_muxnote_release (dmn );
688
+ }
689
+
535
690
#pragma mark timers
536
691
537
692
typedef struct _dispatch_windows_timeout_s {
@@ -716,6 +871,16 @@ _dispatch_event_loop_drain(uint32_t flags)
716
871
dwNumberOfBytesTransferred );
717
872
break ;
718
873
874
+ case DISPATCH_PORT_SOCKET_READ :
875
+ _dispatch_event_merge_socket_read ((dispatch_muxnote_t )pOV ,
876
+ dwNumberOfBytesTransferred );
877
+ break ;
878
+
879
+ case DISPATCH_PORT_SOCKET_WRITE :
880
+ _dispatch_event_merge_socket_write ((dispatch_muxnote_t )pOV ,
881
+ dwNumberOfBytesTransferred );
882
+ break ;
883
+
719
884
default :
720
885
DISPATCH_INTERNAL_CRASH (ulCompletionKey ,
721
886
"unsupported completion key" );
0 commit comments