Skip to content

Emulate non-blocking pipes in proc_open() on Windows #5864

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 272 additions & 4 deletions ext/standard/proc_open.c
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ SECURITY_ATTRIBUTES php_proc_open_security = {
.bInheritHandle = TRUE
};

# define pipe(pair) (CreatePipe(&pair[0], &pair[1], &php_proc_open_security, 0) ? 0 : -1)
# define pipe(pair) (CreatePipe(&pair[0], &pair[1], &php_proc_open_security, 8192) ? 0 : -1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the magic 8192 value here?


# define COMSPEC_NT "cmd.exe"

Expand Down Expand Up @@ -943,6 +943,268 @@ static int set_proc_descriptor_from_resource(zval *resource, descriptorspec_item
return SUCCESS;
}

#ifdef PHP_WIN32

#define THREADED_PIPE_BUF_SIZE 4096

typedef struct _threaded_pipe {
HANDLE pipe; /* Parent end of a process pipe. */
HANDLE thread; /* Thread being used to handle IO. */
php_socket_t sock; /* Internal end of the proxy socket pair. */
php_stream *stream; /* Wraps the exposed end of the socket pair. */
const char *mode; /* Pipe open mode. */
int closed; /* Set to 1 on stream close. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int closed; /* Set to 1 on stream close. */
bool closed; /* Set to 1 on stream close. */

int wsa_error; /* Last socket error code. */
DWORD error; /* Last error code. */
} threaded_pipe;

static int threaded_error_check(threaded_pipe *data, const char *format)
{
if (data->error) {
char *err = php_win32_error_to_msg(data->error);
php_error_docref(NULL, E_WARNING, format, (int) data->error, err);
php_win32_error_msg_free(err);
return 1;
}

if (data->wsa_error) {
zend_string *err = php_socket_error_str(data->wsa_error);
php_error_docref(NULL, E_WARNING, format, data->wsa_error, ZSTR_VAL(err));
zend_string_release(err);
return 1;
}

return 0;
}

static ssize_t threaded_pipe_write(php_stream *stream, const char *buf, size_t count)
{
threaded_pipe *data = (threaded_pipe *) stream->abstract;

if (threaded_error_check(data, "Pipe write failed: [%d] %s")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer passing in only "Pipe write failed" and let the rest of the message be constructed by threaded_error_check. Separating format string from arguments makes me uncomfortable...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't really get why the error check is performed before the operation?

return -1;
}

return php_stream_write(data->stream, buf, count);
}

static ssize_t threaded_pipe_read(php_stream *stream, char *buf, size_t count)
{
threaded_pipe *data = (threaded_pipe *) stream->abstract;

if (threaded_error_check(data, "Pipe read failed: [%d] %s")) {
return -1;
}

ssize_t len = php_stream_read(data->stream, buf, count);

if (len == 0 && threaded_error_check(data, "Pipe read failed: [%d] %s")) {
return -1;
}

return len;
}

static int threaded_pipe_close(php_stream *stream, int close_handle)
{
threaded_pipe *data = (threaded_pipe *) stream->abstract;

data->closed = 1;

/* Writer thread will terminate when socket is closed. */
if (data->mode[0] == 'w') {
php_stream_close(data->stream);
data->stream = NULL;

/* Thread will write remaining buffer contents to pipe and exit. */
if (WAIT_TIMEOUT != WaitForSingleObject(data->thread, 1000)) {
CloseHandle(data->thread);
return 0;
}
}

/* Interrupt IO calls in thread and wait with timeout to prevent race conditions. */
do {
CancelSynchronousIo(data->thread);
} while (WAIT_TIMEOUT == WaitForSingleObject(data->thread, 5));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a bit fishy...


if (data->stream) {
php_stream_close(data->stream);
}

/* Dispose of terminated thread. */
CloseHandle(data->thread);

return 0;
}

static int threaded_pipe_flush(php_stream *stream)
{
return php_stream_flush(((threaded_pipe *) stream->abstract)->stream);
}

static int threaded_pipe_cast(php_stream *stream, int castas, void **ret)
{
return php_stream_cast(((threaded_pipe *) stream->abstract)->stream, castas, ret, 1);
}

static int threaded_pipe_stat(php_stream *stream, php_stream_statbuf *ssb)
{
return php_stream_stat(((threaded_pipe *) stream->abstract)->stream, ssb);
}

static int threaded_pipe_set_option(php_stream *stream, int option, int value, void *ptrparam)
{
return php_stream_set_option(((threaded_pipe *) stream->abstract)->stream, option, value, ptrparam);
}

/* Threaded pipe stream handlers. */
static php_stream_ops threaded_pipe_ops = {
threaded_pipe_write,
threaded_pipe_read,
threaded_pipe_close,
threaded_pipe_flush,
"STDIO",
NULL,
threaded_pipe_cast,
threaded_pipe_stat,
threaded_pipe_set_option
};

/* Reads data from a process pipe and forwards it into a socket. */
static DWORD WINAPI pipe_reader_thread(LPVOID arg)
{
threaded_pipe *data = (threaded_pipe *) arg;

DWORD a, b;
char buf[THREADED_PIPE_BUF_SIZE];

WSABUF wsabuf = {
.buf = buf
};

/* Loop runs until closed flag is set. */
while (!data->closed) {
if (!ReadFile(data->pipe, buf, THREADED_PIPE_BUF_SIZE, &a, NULL)) {
/* Save error code but do not treat EOF as an error. */
if (ERROR_BROKEN_PIPE == (data->error = GetLastError())) {
data->error = 0;
}
break;
}

wsabuf.len = a;

if (a < 1 || WSASend(data->sock, &wsabuf, 1, &b, 0, NULL, NULL)) {
data->wsa_error = WSAGetLastError();
break;
}
}

CloseHandle(data->pipe);
closesocket(data->sock);

return 0;
}

/* Reads data from a socket and forwards it into a process pipe. */
static DWORD WINAPI pipe_writer_thread(LPVOID arg)
{
threaded_pipe *data = (threaded_pipe *) arg;

DWORD flags, a, b;
char buf[THREADED_PIPE_BUF_SIZE];

WSABUF wsabuf = {
.len = THREADED_PIPE_BUF_SIZE,
.buf = buf
};

/* Loop runs until exposed part of the socket pair is closed. */
while (1) {
if (WSARecv(data->sock, &wsabuf, 1, &a, &flags, NULL, NULL)) {
data->wsa_error = WSAGetLastError();

/* Do not treat EOF as an error. */
switch (data->wsa_error) {
case WSAECONNRESET:
case WSAECONNABORTED:
data->wsa_error = 0;
}
break;
}

if (a < 1 || !WriteFile(data->pipe, buf, a, &b, NULL)) {
data->error = GetLastError();
break;
}
}

CloseHandle(data->pipe);
closesocket(data->sock);

return 0;
}

/* Create a new threaded pipe wrapping the given process pipe. */
static php_stream *create_threaded_pipe(HANDLE pipe, const char *mode)
{
php_socket_t socks[2];

/* Create socket pair that proxies data to userland PHP. */
if (socketpair_win32(AF_INET, SOCK_STREAM, 0, socks, 0)) {
zend_string *err = php_socket_error_str(php_socket_errno());
php_error_docref(NULL, E_WARNING, "Unable to create socket pair: %s", ZSTR_VAL(err));
zend_string_release(err);
return NULL;
}

threaded_pipe *data = emalloc(sizeof(threaded_pipe));

data->closed = 0;
data->error = 0;
data->wsa_error = 0;

data->mode = mode;
data->pipe = pipe;
data->sock = socks[1];

/* Expose one end of the socket pair to userland. */
if (NULL == (data->stream = php_stream_sock_open_from_socket(socks[0], NULL))) {
closesocket(socks[0]);
closesocket(socks[1]);
efree(data);
return NULL;
}

/* Create thread based on pipe mode. */
LPTHREAD_START_ROUTINE tf = pipe_reader_thread;

if (mode[0] == 'w') {
tf = pipe_writer_thread;

/* Restrict receive buffer size to avoid excessive data buffering. */
int bsize = THREADED_PIPE_BUF_SIZE;
setsockopt(socks[1], SOL_SOCKET, SO_RCVBUF, (char *) &bsize, sizeof(int));
}

data->thread = CreateThread(&php_proc_open_security, 0xFFFF, tf, data, 0, NULL);

if (data->thread == NULL) {
DWORD dw = GetLastError();
closesocket(socks[0]);
closesocket(socks[1]);
php_stream_close(data->stream);
php_error_docref(NULL, E_WARNING, "Failed to spawn pipe thread: %u", dw);
efree(data);
return NULL;
}

return php_stream_alloc_rel(&threaded_pipe_ops, data, NULL, mode);
}

#endif

#ifndef PHP_WIN32
static int close_parentends_of_pipes(descriptorspec_item *descriptors, int ndesc)
{
Expand Down Expand Up @@ -1018,6 +1280,7 @@ PHP_FUNCTION(proc_open)
int suppress_errors = 0;
int bypass_shell = 0;
int blocking_pipes = 0;
int threaded_pipes = 0;
int create_process_group = 0;
int create_new_console = 0;
#else
Expand Down Expand Up @@ -1070,6 +1333,7 @@ PHP_FUNCTION(proc_open)
/* TODO: Deprecate in favor of array command? */
bypass_shell = bypass_shell || get_option(other_options, "bypass_shell");
blocking_pipes = get_option(other_options, "blocking_pipes");
threaded_pipes = get_option(other_options, "threaded_pipes");
create_process_group = get_option(other_options, "create_process_group");
create_new_console = get_option(other_options, "create_new_console");
}
Expand Down Expand Up @@ -1270,9 +1534,13 @@ PHP_FUNCTION(proc_open)
}

#ifdef PHP_WIN32
stream = php_stream_fopen_from_fd(_open_osfhandle((zend_intptr_t)descriptors[i].parentend,
descriptors[i].mode_flags), mode_string, NULL);
php_stream_set_option(stream, PHP_STREAM_OPTION_PIPE_BLOCKING, blocking_pipes, NULL);
if (threaded_pipes) {
stream = create_threaded_pipe(descriptors[i].parentend, mode_string);
} else {
stream = php_stream_fopen_from_fd(_open_osfhandle((zend_intptr_t)descriptors[i].parentend,
descriptors[i].mode_flags), mode_string, NULL);
php_stream_set_option(stream, PHP_STREAM_OPTION_PIPE_BLOCKING, blocking_pipes, NULL);
}
#else
stream = php_stream_fopen_from_fd(descriptors[i].parentend, mode_string, NULL);
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,18 @@ printf("STDOUT << %s\n", read_pipe($pipes[1]));
write_pipe($pipes[0], 'done');
fclose($pipes[0]);

var_dump(feof($pipes[1]));
printf("STDOUT << %s\n", read_pipe($pipes[1]));
var_dump(read_pipe($pipes[1]));
var_dump(feof($pipes[1]));

?>
--EXPECTF--
bool(true)
bool(true)
STDOUT << hello
STDOUT << world
bool(false)
STDOUT << DONE
string(0) ""
bool(true)
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,17 @@ printf("STDOUT << %s\n", read_pipe($pipes[1]));
fwrite($pipes[0], 'done');
fclose($pipes[0]);

var_dump(feof($pipes[1]));
printf("STDOUT << %s\n", read_pipe($pipes[1]));
var_dump(read_pipe($pipes[1]));
var_dump(feof($pipes[1]));

?>
--EXPECTF--
bool(true)
STDOUT << hello
STDOUT << world
bool(false)
STDOUT << DONE
string(0) ""
bool(true)
Loading