-
Notifications
You must be signed in to change notification settings - Fork 7.9k
Emulate non-blocking pipes in proc_open() on Windows #5864
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -420,7 +420,7 @@ SECURITY_ATTRIBUTES php_proc_open_security = { | |||||
.bInheritHandle = TRUE | ||||||
}; | ||||||
|
||||||
# define pipe(pair) (CreatePipe(&pair[0], &pair[1], &php_proc_open_security, 0) ? 0 : -1) | ||||||
# define pipe(pair) (CreatePipe(&pair[0], &pair[1], &php_proc_open_security, 8192) ? 0 : -1) | ||||||
|
||||||
# define COMSPEC_NT "cmd.exe" | ||||||
|
||||||
|
@@ -943,6 +943,268 @@ static int set_proc_descriptor_from_resource(zval *resource, descriptorspec_item | |||||
return SUCCESS; | ||||||
} | ||||||
|
||||||
#ifdef PHP_WIN32 | ||||||
|
||||||
#define THREADED_PIPE_BUF_SIZE 4096 | ||||||
|
||||||
typedef struct _threaded_pipe { | ||||||
HANDLE pipe; /* Parent end of a process pipe. */ | ||||||
HANDLE thread; /* Thread being used to handle IO. */ | ||||||
php_socket_t sock; /* Internal end of the proxy socket pair. */ | ||||||
php_stream *stream; /* Wraps the exposed end of the socket pair. */ | ||||||
const char *mode; /* Pipe open mode. */ | ||||||
int closed; /* Set to 1 on stream close. */ | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
int wsa_error; /* Last socket error code. */ | ||||||
DWORD error; /* Last error code. */ | ||||||
} threaded_pipe; | ||||||
|
||||||
static int threaded_error_check(threaded_pipe *data, const char *format) | ||||||
{ | ||||||
if (data->error) { | ||||||
char *err = php_win32_error_to_msg(data->error); | ||||||
php_error_docref(NULL, E_WARNING, format, (int) data->error, err); | ||||||
php_win32_error_msg_free(err); | ||||||
return 1; | ||||||
} | ||||||
|
||||||
if (data->wsa_error) { | ||||||
zend_string *err = php_socket_error_str(data->wsa_error); | ||||||
php_error_docref(NULL, E_WARNING, format, data->wsa_error, ZSTR_VAL(err)); | ||||||
zend_string_release(err); | ||||||
return 1; | ||||||
} | ||||||
|
||||||
return 0; | ||||||
} | ||||||
|
||||||
static ssize_t threaded_pipe_write(php_stream *stream, const char *buf, size_t count) | ||||||
{ | ||||||
threaded_pipe *data = (threaded_pipe *) stream->abstract; | ||||||
|
||||||
if (threaded_error_check(data, "Pipe write failed: [%d] %s")) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer passing in only There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also don't really get why the error check is performed before the operation? |
||||||
return -1; | ||||||
} | ||||||
|
||||||
return php_stream_write(data->stream, buf, count); | ||||||
} | ||||||
|
||||||
static ssize_t threaded_pipe_read(php_stream *stream, char *buf, size_t count) | ||||||
{ | ||||||
threaded_pipe *data = (threaded_pipe *) stream->abstract; | ||||||
|
||||||
if (threaded_error_check(data, "Pipe read failed: [%d] %s")) { | ||||||
return -1; | ||||||
} | ||||||
|
||||||
ssize_t len = php_stream_read(data->stream, buf, count); | ||||||
|
||||||
if (len == 0 && threaded_error_check(data, "Pipe read failed: [%d] %s")) { | ||||||
return -1; | ||||||
} | ||||||
|
||||||
return len; | ||||||
} | ||||||
|
||||||
static int threaded_pipe_close(php_stream *stream, int close_handle) | ||||||
{ | ||||||
threaded_pipe *data = (threaded_pipe *) stream->abstract; | ||||||
|
||||||
data->closed = 1; | ||||||
|
||||||
/* Writer thread will terminate when socket is closed. */ | ||||||
if (data->mode[0] == 'w') { | ||||||
php_stream_close(data->stream); | ||||||
data->stream = NULL; | ||||||
|
||||||
/* Thread will write remaining buffer contents to pipe and exit. */ | ||||||
if (WAIT_TIMEOUT != WaitForSingleObject(data->thread, 1000)) { | ||||||
CloseHandle(data->thread); | ||||||
return 0; | ||||||
} | ||||||
} | ||||||
|
||||||
/* Interrupt IO calls in thread and wait with timeout to prevent race conditions. */ | ||||||
do { | ||||||
CancelSynchronousIo(data->thread); | ||||||
} while (WAIT_TIMEOUT == WaitForSingleObject(data->thread, 5)); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks a bit fishy... |
||||||
|
||||||
if (data->stream) { | ||||||
php_stream_close(data->stream); | ||||||
} | ||||||
|
||||||
/* Dispose of terminated thread. */ | ||||||
CloseHandle(data->thread); | ||||||
|
||||||
return 0; | ||||||
} | ||||||
|
||||||
static int threaded_pipe_flush(php_stream *stream) | ||||||
{ | ||||||
return php_stream_flush(((threaded_pipe *) stream->abstract)->stream); | ||||||
} | ||||||
|
||||||
static int threaded_pipe_cast(php_stream *stream, int castas, void **ret) | ||||||
{ | ||||||
return php_stream_cast(((threaded_pipe *) stream->abstract)->stream, castas, ret, 1); | ||||||
} | ||||||
|
||||||
static int threaded_pipe_stat(php_stream *stream, php_stream_statbuf *ssb) | ||||||
{ | ||||||
return php_stream_stat(((threaded_pipe *) stream->abstract)->stream, ssb); | ||||||
} | ||||||
|
||||||
static int threaded_pipe_set_option(php_stream *stream, int option, int value, void *ptrparam) | ||||||
{ | ||||||
return php_stream_set_option(((threaded_pipe *) stream->abstract)->stream, option, value, ptrparam); | ||||||
} | ||||||
|
||||||
/* Threaded pipe stream handlers. */ | ||||||
static php_stream_ops threaded_pipe_ops = { | ||||||
threaded_pipe_write, | ||||||
threaded_pipe_read, | ||||||
threaded_pipe_close, | ||||||
threaded_pipe_flush, | ||||||
"STDIO", | ||||||
NULL, | ||||||
threaded_pipe_cast, | ||||||
threaded_pipe_stat, | ||||||
threaded_pipe_set_option | ||||||
}; | ||||||
|
||||||
/* Reads data from a process pipe and forwards it into a socket. */ | ||||||
static DWORD WINAPI pipe_reader_thread(LPVOID arg) | ||||||
{ | ||||||
threaded_pipe *data = (threaded_pipe *) arg; | ||||||
|
||||||
DWORD a, b; | ||||||
char buf[THREADED_PIPE_BUF_SIZE]; | ||||||
|
||||||
WSABUF wsabuf = { | ||||||
.buf = buf | ||||||
}; | ||||||
|
||||||
/* Loop runs until closed flag is set. */ | ||||||
while (!data->closed) { | ||||||
if (!ReadFile(data->pipe, buf, THREADED_PIPE_BUF_SIZE, &a, NULL)) { | ||||||
/* Save error code but do not treat EOF as an error. */ | ||||||
if (ERROR_BROKEN_PIPE == (data->error = GetLastError())) { | ||||||
data->error = 0; | ||||||
} | ||||||
break; | ||||||
} | ||||||
|
||||||
wsabuf.len = a; | ||||||
|
||||||
if (a < 1 || WSASend(data->sock, &wsabuf, 1, &b, 0, NULL, NULL)) { | ||||||
data->wsa_error = WSAGetLastError(); | ||||||
break; | ||||||
} | ||||||
} | ||||||
|
||||||
CloseHandle(data->pipe); | ||||||
closesocket(data->sock); | ||||||
|
||||||
return 0; | ||||||
} | ||||||
|
||||||
/* Reads data from a socket and forwards it into a process pipe. */ | ||||||
static DWORD WINAPI pipe_writer_thread(LPVOID arg) | ||||||
{ | ||||||
threaded_pipe *data = (threaded_pipe *) arg; | ||||||
|
||||||
DWORD flags, a, b; | ||||||
char buf[THREADED_PIPE_BUF_SIZE]; | ||||||
|
||||||
WSABUF wsabuf = { | ||||||
.len = THREADED_PIPE_BUF_SIZE, | ||||||
.buf = buf | ||||||
}; | ||||||
|
||||||
/* Loop runs until exposed part of the socket pair is closed. */ | ||||||
while (1) { | ||||||
if (WSARecv(data->sock, &wsabuf, 1, &a, &flags, NULL, NULL)) { | ||||||
data->wsa_error = WSAGetLastError(); | ||||||
|
||||||
/* Do not treat EOF as an error. */ | ||||||
switch (data->wsa_error) { | ||||||
case WSAECONNRESET: | ||||||
case WSAECONNABORTED: | ||||||
data->wsa_error = 0; | ||||||
} | ||||||
break; | ||||||
} | ||||||
|
||||||
if (a < 1 || !WriteFile(data->pipe, buf, a, &b, NULL)) { | ||||||
data->error = GetLastError(); | ||||||
break; | ||||||
} | ||||||
} | ||||||
|
||||||
CloseHandle(data->pipe); | ||||||
closesocket(data->sock); | ||||||
|
||||||
return 0; | ||||||
} | ||||||
|
||||||
/* Create a new threaded pipe wrapping the given process pipe. */ | ||||||
static php_stream *create_threaded_pipe(HANDLE pipe, const char *mode) | ||||||
{ | ||||||
php_socket_t socks[2]; | ||||||
|
||||||
/* Create socket pair that proxies data to userland PHP. */ | ||||||
if (socketpair_win32(AF_INET, SOCK_STREAM, 0, socks, 0)) { | ||||||
zend_string *err = php_socket_error_str(php_socket_errno()); | ||||||
php_error_docref(NULL, E_WARNING, "Unable to create socket pair: %s", ZSTR_VAL(err)); | ||||||
zend_string_release(err); | ||||||
return NULL; | ||||||
} | ||||||
|
||||||
threaded_pipe *data = emalloc(sizeof(threaded_pipe)); | ||||||
|
||||||
data->closed = 0; | ||||||
data->error = 0; | ||||||
data->wsa_error = 0; | ||||||
|
||||||
data->mode = mode; | ||||||
data->pipe = pipe; | ||||||
data->sock = socks[1]; | ||||||
|
||||||
/* Expose one end of the socket pair to userland. */ | ||||||
if (NULL == (data->stream = php_stream_sock_open_from_socket(socks[0], NULL))) { | ||||||
closesocket(socks[0]); | ||||||
closesocket(socks[1]); | ||||||
efree(data); | ||||||
return NULL; | ||||||
} | ||||||
|
||||||
/* Create thread based on pipe mode. */ | ||||||
LPTHREAD_START_ROUTINE tf = pipe_reader_thread; | ||||||
|
||||||
if (mode[0] == 'w') { | ||||||
tf = pipe_writer_thread; | ||||||
|
||||||
/* Restrict receive buffer size to avoid excessive data buffering. */ | ||||||
int bsize = THREADED_PIPE_BUF_SIZE; | ||||||
setsockopt(socks[1], SOL_SOCKET, SO_RCVBUF, (char *) &bsize, sizeof(int)); | ||||||
} | ||||||
|
||||||
data->thread = CreateThread(&php_proc_open_security, 0xFFFF, tf, data, 0, NULL); | ||||||
|
||||||
if (data->thread == NULL) { | ||||||
DWORD dw = GetLastError(); | ||||||
closesocket(socks[0]); | ||||||
closesocket(socks[1]); | ||||||
php_stream_close(data->stream); | ||||||
php_error_docref(NULL, E_WARNING, "Failed to spawn pipe thread: %u", dw); | ||||||
efree(data); | ||||||
return NULL; | ||||||
} | ||||||
|
||||||
return php_stream_alloc_rel(&threaded_pipe_ops, data, NULL, mode); | ||||||
} | ||||||
|
||||||
#endif | ||||||
|
||||||
#ifndef PHP_WIN32 | ||||||
static int close_parentends_of_pipes(descriptorspec_item *descriptors, int ndesc) | ||||||
{ | ||||||
|
@@ -1018,6 +1280,7 @@ PHP_FUNCTION(proc_open) | |||||
int suppress_errors = 0; | ||||||
int bypass_shell = 0; | ||||||
int blocking_pipes = 0; | ||||||
int threaded_pipes = 0; | ||||||
int create_process_group = 0; | ||||||
int create_new_console = 0; | ||||||
#else | ||||||
|
@@ -1070,6 +1333,7 @@ PHP_FUNCTION(proc_open) | |||||
/* TODO: Deprecate in favor of array command? */ | ||||||
bypass_shell = bypass_shell || get_option(other_options, "bypass_shell"); | ||||||
blocking_pipes = get_option(other_options, "blocking_pipes"); | ||||||
threaded_pipes = get_option(other_options, "threaded_pipes"); | ||||||
create_process_group = get_option(other_options, "create_process_group"); | ||||||
create_new_console = get_option(other_options, "create_new_console"); | ||||||
} | ||||||
|
@@ -1270,9 +1534,13 @@ PHP_FUNCTION(proc_open) | |||||
} | ||||||
|
||||||
#ifdef PHP_WIN32 | ||||||
stream = php_stream_fopen_from_fd(_open_osfhandle((zend_intptr_t)descriptors[i].parentend, | ||||||
descriptors[i].mode_flags), mode_string, NULL); | ||||||
php_stream_set_option(stream, PHP_STREAM_OPTION_PIPE_BLOCKING, blocking_pipes, NULL); | ||||||
if (threaded_pipes) { | ||||||
stream = create_threaded_pipe(descriptors[i].parentend, mode_string); | ||||||
} else { | ||||||
stream = php_stream_fopen_from_fd(_open_osfhandle((zend_intptr_t)descriptors[i].parentend, | ||||||
descriptors[i].mode_flags), mode_string, NULL); | ||||||
php_stream_set_option(stream, PHP_STREAM_OPTION_PIPE_BLOCKING, blocking_pipes, NULL); | ||||||
} | ||||||
#else | ||||||
stream = php_stream_fopen_from_fd(descriptors[i].parentend, mode_string, NULL); | ||||||
#endif | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the magic 8192 value here?