Skip to content

Commit 1e0d671

Browse files
committed
ring-buffer: Do not wake up a splice waiter when page is not full
When an application connects to the ring buffer via splice, it can only read full pages. Splice does not work with partial pages. If there is not enough data to fill a page, the splice command will either block or return -EAGAIN (if set to nonblock). Code was added where if the page is not full, to just sleep again. The problem is, it will get woken up again on the next event. That is, when something is written into the ring buffer, if there is a waiter it will wake it up. The waiter would then check the buffer, see that it still does not have enough data to fill a page and go back to sleep. To make matters worse, when the waiter goes back to sleep, it could cause another event, which would wake it back up again to see it doesn't have enough data and sleep again. This produces a tremendous overhead and fills the ring buffer with noise. For example, recording sched_switch on an idle system for 10 seconds produces 25,350,475 events!!! Create another wait queue for those waiters wanting full pages. When an event is written, it only wakes up waiters if there's a full page of data. It does not wake up the waiter if the page is not yet full. After this change, recording sched_switch on an idle system for 10 seconds produces only 800 events. Getting rid of 25,349,675 useless events (99.9969% of events!!), is something to take seriously. Cc: [email protected] # 3.16+ Cc: Rabin Vincent <[email protected]> Fixes: e30f53a "tracing: Do not busy wait in buffer splice" Signed-off-by: Steven Rostedt <[email protected]>
1 parent 7215853 commit 1e0d671

File tree

1 file changed

+35
-5
lines changed

1 file changed

+35
-5
lines changed

kernel/trace/ring_buffer.c

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,10 @@ int ring_buffer_print_page_header(struct trace_seq *s)
445445
struct rb_irq_work {
446446
struct irq_work work;
447447
wait_queue_head_t waiters;
448+
wait_queue_head_t full_waiters;
448449
bool waiters_pending;
450+
bool full_waiters_pending;
451+
bool wakeup_full;
449452
};
450453

451454
/*
@@ -527,6 +530,10 @@ static void rb_wake_up_waiters(struct irq_work *work)
527530
struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
528531

529532
wake_up_all(&rbwork->waiters);
533+
if (rbwork->wakeup_full) {
534+
rbwork->wakeup_full = false;
535+
wake_up_all(&rbwork->full_waiters);
536+
}
530537
}
531538

532539
/**
@@ -551,9 +558,11 @@ int ring_buffer_wait(struct ring_buffer *buffer, int cpu, bool full)
551558
* data in any cpu buffer, or a specific buffer, put the
552559
* caller on the appropriate wait queue.
553560
*/
554-
if (cpu == RING_BUFFER_ALL_CPUS)
561+
if (cpu == RING_BUFFER_ALL_CPUS) {
555562
work = &buffer->irq_work;
556-
else {
563+
/* Full only makes sense on per cpu reads */
564+
full = false;
565+
} else {
557566
if (!cpumask_test_cpu(cpu, buffer->cpumask))
558567
return -ENODEV;
559568
cpu_buffer = buffer->buffers[cpu];
@@ -562,7 +571,10 @@ int ring_buffer_wait(struct ring_buffer *buffer, int cpu, bool full)
562571

563572

564573
while (true) {
565-
prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE);
574+
if (full)
575+
prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE);
576+
else
577+
prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE);
566578

567579
/*
568580
* The events can happen in critical sections where
@@ -584,7 +596,10 @@ int ring_buffer_wait(struct ring_buffer *buffer, int cpu, bool full)
584596
* that is necessary is that the wake up happens after
585597
* a task has been queued. It's OK for spurious wake ups.
586598
*/
587-
work->waiters_pending = true;
599+
if (full)
600+
work->full_waiters_pending = true;
601+
else
602+
work->waiters_pending = true;
588603

589604
if (signal_pending(current)) {
590605
ret = -EINTR;
@@ -613,7 +628,10 @@ int ring_buffer_wait(struct ring_buffer *buffer, int cpu, bool full)
613628
schedule();
614629
}
615630

616-
finish_wait(&work->waiters, &wait);
631+
if (full)
632+
finish_wait(&work->full_waiters, &wait);
633+
else
634+
finish_wait(&work->waiters, &wait);
617635

618636
return ret;
619637
}
@@ -1228,6 +1246,7 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int nr_pages, int cpu)
12281246
init_completion(&cpu_buffer->update_done);
12291247
init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
12301248
init_waitqueue_head(&cpu_buffer->irq_work.waiters);
1249+
init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
12311250

12321251
bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
12331252
GFP_KERNEL, cpu_to_node(cpu));
@@ -2799,6 +2818,8 @@ static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
27992818
static __always_inline void
28002819
rb_wakeups(struct ring_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer)
28012820
{
2821+
bool pagebusy;
2822+
28022823
if (buffer->irq_work.waiters_pending) {
28032824
buffer->irq_work.waiters_pending = false;
28042825
/* irq_work_queue() supplies it's own memory barriers */
@@ -2810,6 +2831,15 @@ rb_wakeups(struct ring_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer)
28102831
/* irq_work_queue() supplies it's own memory barriers */
28112832
irq_work_queue(&cpu_buffer->irq_work.work);
28122833
}
2834+
2835+
pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
2836+
2837+
if (!pagebusy && cpu_buffer->irq_work.full_waiters_pending) {
2838+
cpu_buffer->irq_work.wakeup_full = true;
2839+
cpu_buffer->irq_work.full_waiters_pending = false;
2840+
/* irq_work_queue() supplies it's own memory barriers */
2841+
irq_work_queue(&cpu_buffer->irq_work.work);
2842+
}
28132843
}
28142844

28152845
/**

0 commit comments

Comments
 (0)