Skip to content

Commit 1b6b26a

Browse files
committed
pipe: fix and clarify pipe write wakeup logic
The pipe rework ends up having been extra painful, partly becaused of actual bugs with ordering and caching of the pipe state, but also because of subtle performance issues. In particular, the pipe rework caused the kernel build to inexplicably slow down. The reason turns out to be that the GNU make jobserver (which limits the parallelism of the build) uses a pipe to implement a "token" system: a parallel submake will read a character from the pipe to get the job token before starting a new job, and will write a character back to the pipe when it is done. The overall job limit is thus easily controlled by just writing the appropriate number of initial token characters into the pipe. But to work well, that really means that the old behavior of write wakeups being synchronous (WF_SYNC) is very important - when the pipe writer wakes up a reader, we want the reader to actually get scheduled immediately. Otherwise you lose the parallelism of the build. The pipe rework lost that synchronous wakeup on write, and we had clearly all forgotten the reasons and rules for it. This rewrites the pipe write wakeup logic to do the required Wsync wakeups, but also clarifies the logic and avoids extraneous wakeups. It also ends up addign a number of comments about what oit does and why, so that we hopefully don't end up forgetting about this next time we change this code. Cc: David Howells <[email protected]> Signed-off-by: Linus Torvalds <[email protected]>
1 parent ad910e3 commit 1b6b26a

File tree

1 file changed

+41
-18
lines changed

1 file changed

+41
-18
lines changed

fs/pipe.c

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -391,9 +391,9 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
391391
struct pipe_inode_info *pipe = filp->private_data;
392392
unsigned int head;
393393
ssize_t ret = 0;
394-
int do_wakeup = 0;
395394
size_t total_len = iov_iter_count(from);
396395
ssize_t chars;
396+
bool was_empty = false;
397397

398398
/* Null write succeeds. */
399399
if (unlikely(total_len == 0))
@@ -407,11 +407,21 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
407407
goto out;
408408
}
409409

410+
/*
411+
* Only wake up if the pipe started out empty, since
412+
* otherwise there should be no readers waiting.
413+
*
414+
* If it wasn't empty we try to merge new data into
415+
* the last buffer.
416+
*
417+
* That naturally merges small writes, but it also
418+
* page-aligs the rest of the writes for large writes
419+
* spanning multiple pages.
420+
*/
410421
head = pipe->head;
411-
412-
/* We try to merge small writes */
413-
chars = total_len & (PAGE_SIZE-1); /* size of the last buffer */
414-
if (!pipe_empty(head, pipe->tail) && chars != 0) {
422+
was_empty = pipe_empty(head, pipe->tail);
423+
chars = total_len & (PAGE_SIZE-1);
424+
if (chars && !was_empty) {
415425
unsigned int mask = pipe->ring_size - 1;
416426
struct pipe_buffer *buf = &pipe->bufs[(head - 1) & mask];
417427
int offset = buf->offset + buf->len;
@@ -426,7 +436,7 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
426436
ret = -EFAULT;
427437
goto out;
428438
}
429-
do_wakeup = 1;
439+
430440
buf->len += ret;
431441
if (!iov_iter_count(from))
432442
goto out;
@@ -471,17 +481,7 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
471481
}
472482

473483
pipe->head = head + 1;
474-
475-
/* Always wake up, even if the copy fails. Otherwise
476-
* we lock up (O_NONBLOCK-)readers that sleep due to
477-
* syscall merging.
478-
* FIXME! Is this really true?
479-
*/
480-
wake_up_locked_poll(
481-
&pipe->wait, EPOLLIN | EPOLLRDNORM);
482-
483484
spin_unlock_irq(&pipe->wait.lock);
484-
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
485485

486486
/* Insert it into the buffer array */
487487
buf = &pipe->bufs[head & mask];
@@ -524,14 +524,37 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
524524
ret = -ERESTARTSYS;
525525
break;
526526
}
527+
528+
/*
529+
* We're going to release the pipe lock and wait for more
530+
* space. We wake up any readers if necessary, and then
531+
* after waiting we need to re-check whether the pipe
532+
* become empty while we dropped the lock.
533+
*/
534+
if (was_empty) {
535+
wake_up_interruptible_sync_poll(&pipe->wait, EPOLLIN | EPOLLRDNORM);
536+
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
537+
}
527538
pipe->waiting_writers++;
528539
pipe_wait(pipe);
529540
pipe->waiting_writers--;
541+
542+
was_empty = pipe_empty(head, pipe->tail);
530543
}
531544
out:
532545
__pipe_unlock(pipe);
533-
if (do_wakeup) {
534-
wake_up_interruptible_poll(&pipe->wait, EPOLLIN | EPOLLRDNORM);
546+
547+
/*
548+
* If we do do a wakeup event, we do a 'sync' wakeup, because we
549+
* want the reader to start processing things asap, rather than
550+
* leave the data pending.
551+
*
552+
* This is particularly important for small writes, because of
553+
* how (for example) the GNU make jobserver uses small writes to
554+
* wake up pending jobs
555+
*/
556+
if (was_empty) {
557+
wake_up_interruptible_sync_poll(&pipe->wait, EPOLLIN | EPOLLRDNORM);
535558
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
536559
}
537560
if (ret > 0 && sb_start_write_trylock(file_inode(filp)->i_sb)) {

0 commit comments

Comments
 (0)