Linux-libre 3.4.28-gnu1
[librecmc/linux-libre.git] / kernel / trace / ring_buffer.c
1 /*
2  * Generic ring buffer
3  *
4  * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
5  */
6 #include <linux/ring_buffer.h>
7 #include <linux/trace_clock.h>
8 #include <linux/spinlock.h>
9 #include <linux/debugfs.h>
10 #include <linux/uaccess.h>
11 #include <linux/hardirq.h>
12 #include <linux/kmemcheck.h>
13 #include <linux/module.h>
14 #include <linux/percpu.h>
15 #include <linux/mutex.h>
16 #include <linux/slab.h>
17 #include <linux/init.h>
18 #include <linux/hash.h>
19 #include <linux/list.h>
20 #include <linux/cpu.h>
21 #include <linux/fs.h>
22
23 #include <asm/local.h>
24 #include "trace.h"
25
26 /*
27  * The ring buffer header is special. We must manually up keep it.
28  */
29 int ring_buffer_print_entry_header(struct trace_seq *s)
30 {
31         int ret;
32
33         ret = trace_seq_printf(s, "# compressed entry header\n");
34         ret = trace_seq_printf(s, "\ttype_len    :    5 bits\n");
35         ret = trace_seq_printf(s, "\ttime_delta  :   27 bits\n");
36         ret = trace_seq_printf(s, "\tarray       :   32 bits\n");
37         ret = trace_seq_printf(s, "\n");
38         ret = trace_seq_printf(s, "\tpadding     : type == %d\n",
39                                RINGBUF_TYPE_PADDING);
40         ret = trace_seq_printf(s, "\ttime_extend : type == %d\n",
41                                RINGBUF_TYPE_TIME_EXTEND);
42         ret = trace_seq_printf(s, "\tdata max type_len  == %d\n",
43                                RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
44
45         return ret;
46 }
47
48 /*
49  * The ring buffer is made up of a list of pages. A separate list of pages is
50  * allocated for each CPU. A writer may only write to a buffer that is
51  * associated with the CPU it is currently executing on.  A reader may read
52  * from any per cpu buffer.
53  *
54  * The reader is special. For each per cpu buffer, the reader has its own
55  * reader page. When a reader has read the entire reader page, this reader
56  * page is swapped with another page in the ring buffer.
57  *
58  * Now, as long as the writer is off the reader page, the reader can do what
59  * ever it wants with that page. The writer will never write to that page
60  * again (as long as it is out of the ring buffer).
61  *
62  * Here's some silly ASCII art.
63  *
64  *   +------+
65  *   |reader|          RING BUFFER
66  *   |page  |
67  *   +------+        +---+   +---+   +---+
68  *                   |   |-->|   |-->|   |
69  *                   +---+   +---+   +---+
70  *                     ^               |
71  *                     |               |
72  *                     +---------------+
73  *
74  *
75  *   +------+
76  *   |reader|          RING BUFFER
77  *   |page  |------------------v
78  *   +------+        +---+   +---+   +---+
79  *                   |   |-->|   |-->|   |
80  *                   +---+   +---+   +---+
81  *                     ^               |
82  *                     |               |
83  *                     +---------------+
84  *
85  *
86  *   +------+
87  *   |reader|          RING BUFFER
88  *   |page  |------------------v
89  *   +------+        +---+   +---+   +---+
90  *      ^            |   |-->|   |-->|   |
91  *      |            +---+   +---+   +---+
92  *      |                              |
93  *      |                              |
94  *      +------------------------------+
95  *
96  *
97  *   +------+
98  *   |buffer|          RING BUFFER
99  *   |page  |------------------v
100  *   +------+        +---+   +---+   +---+
101  *      ^            |   |   |   |-->|   |
102  *      |   New      +---+   +---+   +---+
103  *      |  Reader------^               |
104  *      |   page                       |
105  *      +------------------------------+
106  *
107  *
108  * After we make this swap, the reader can hand this page off to the splice
109  * code and be done with it. It can even allocate a new page if it needs to
110  * and swap that into the ring buffer.
111  *
112  * We will be using cmpxchg soon to make all this lockless.
113  *
114  */
115
116 /*
117  * A fast way to enable or disable all ring buffers is to
118  * call tracing_on or tracing_off. Turning off the ring buffers
119  * prevents all ring buffers from being recorded to.
120  * Turning this switch on, makes it OK to write to the
121  * ring buffer, if the ring buffer is enabled itself.
122  *
123  * There's three layers that must be on in order to write
124  * to the ring buffer.
125  *
126  * 1) This global flag must be set.
127  * 2) The ring buffer must be enabled for recording.
128  * 3) The per cpu buffer must be enabled for recording.
129  *
130  * In case of an anomaly, this global flag has a bit set that
131  * will permantly disable all ring buffers.
132  */
133
134 /*
135  * Global flag to disable all recording to ring buffers
136  *  This has two bits: ON, DISABLED
137  *
138  *  ON   DISABLED
139  * ---- ----------
140  *   0      0        : ring buffers are off
141  *   1      0        : ring buffers are on
142  *   X      1        : ring buffers are permanently disabled
143  */
144
145 enum {
146         RB_BUFFERS_ON_BIT       = 0,
147         RB_BUFFERS_DISABLED_BIT = 1,
148 };
149
150 enum {
151         RB_BUFFERS_ON           = 1 << RB_BUFFERS_ON_BIT,
152         RB_BUFFERS_DISABLED     = 1 << RB_BUFFERS_DISABLED_BIT,
153 };
154
155 static unsigned long ring_buffer_flags __read_mostly = RB_BUFFERS_ON;
156
157 /* Used for individual buffers (after the counter) */
158 #define RB_BUFFER_OFF           (1 << 20)
159
160 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data)
161
162 /**
163  * tracing_off_permanent - permanently disable ring buffers
164  *
165  * This function, once called, will disable all ring buffers
166  * permanently.
167  */
168 void tracing_off_permanent(void)
169 {
170         set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags);
171 }
172
173 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
174 #define RB_ALIGNMENT            4U
175 #define RB_MAX_SMALL_DATA       (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
176 #define RB_EVNT_MIN_SIZE        8U      /* two 32bit words */
177
178 #if !defined(CONFIG_64BIT) || defined(CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS)
179 # define RB_FORCE_8BYTE_ALIGNMENT       0
180 # define RB_ARCH_ALIGNMENT              RB_ALIGNMENT
181 #else
182 # define RB_FORCE_8BYTE_ALIGNMENT       1
183 # define RB_ARCH_ALIGNMENT              8U
184 #endif
185
186 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
187 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
188
189 enum {
190         RB_LEN_TIME_EXTEND = 8,
191         RB_LEN_TIME_STAMP = 16,
192 };
193
194 #define skip_time_extend(event) \
195         ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND))
196
197 static inline int rb_null_event(struct ring_buffer_event *event)
198 {
199         return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta;
200 }
201
202 static void rb_event_set_padding(struct ring_buffer_event *event)
203 {
204         /* padding has a NULL time_delta */
205         event->type_len = RINGBUF_TYPE_PADDING;
206         event->time_delta = 0;
207 }
208
209 static unsigned
210 rb_event_data_length(struct ring_buffer_event *event)
211 {
212         unsigned length;
213
214         if (event->type_len)
215                 length = event->type_len * RB_ALIGNMENT;
216         else
217                 length = event->array[0];
218         return length + RB_EVNT_HDR_SIZE;
219 }
220
221 /*
222  * Return the length of the given event. Will return
223  * the length of the time extend if the event is a
224  * time extend.
225  */
226 static inline unsigned
227 rb_event_length(struct ring_buffer_event *event)
228 {
229         switch (event->type_len) {
230         case RINGBUF_TYPE_PADDING:
231                 if (rb_null_event(event))
232                         /* undefined */
233                         return -1;
234                 return  event->array[0] + RB_EVNT_HDR_SIZE;
235
236         case RINGBUF_TYPE_TIME_EXTEND:
237                 return RB_LEN_TIME_EXTEND;
238
239         case RINGBUF_TYPE_TIME_STAMP:
240                 return RB_LEN_TIME_STAMP;
241
242         case RINGBUF_TYPE_DATA:
243                 return rb_event_data_length(event);
244         default:
245                 BUG();
246         }
247         /* not hit */
248         return 0;
249 }
250
251 /*
252  * Return total length of time extend and data,
253  *   or just the event length for all other events.
254  */
255 static inline unsigned
256 rb_event_ts_length(struct ring_buffer_event *event)
257 {
258         unsigned len = 0;
259
260         if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) {
261                 /* time extends include the data event after it */
262                 len = RB_LEN_TIME_EXTEND;
263                 event = skip_time_extend(event);
264         }
265         return len + rb_event_length(event);
266 }
267
268 /**
269  * ring_buffer_event_length - return the length of the event
270  * @event: the event to get the length of
271  *
272  * Returns the size of the data load of a data event.
273  * If the event is something other than a data event, it
274  * returns the size of the event itself. With the exception
275  * of a TIME EXTEND, where it still returns the size of the
276  * data load of the data event after it.
277  */
278 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
279 {
280         unsigned length;
281
282         if (event->type_len == RINGBUF_TYPE_TIME_EXTEND)
283                 event = skip_time_extend(event);
284
285         length = rb_event_length(event);
286         if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
287                 return length;
288         length -= RB_EVNT_HDR_SIZE;
289         if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
290                 length -= sizeof(event->array[0]);
291         return length;
292 }
293 EXPORT_SYMBOL_GPL(ring_buffer_event_length);
294
295 /* inline for ring buffer fast paths */
296 static void *
297 rb_event_data(struct ring_buffer_event *event)
298 {
299         if (event->type_len == RINGBUF_TYPE_TIME_EXTEND)
300                 event = skip_time_extend(event);
301         BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
302         /* If length is in len field, then array[0] has the data */
303         if (event->type_len)
304                 return (void *)&event->array[0];
305         /* Otherwise length is in array[0] and array[1] has the data */
306         return (void *)&event->array[1];
307 }
308
309 /**
310  * ring_buffer_event_data - return the data of the event
311  * @event: the event to get the data from
312  */
313 void *ring_buffer_event_data(struct ring_buffer_event *event)
314 {
315         return rb_event_data(event);
316 }
317 EXPORT_SYMBOL_GPL(ring_buffer_event_data);
318
319 #define for_each_buffer_cpu(buffer, cpu)                \
320         for_each_cpu(cpu, buffer->cpumask)
321
322 #define TS_SHIFT        27
323 #define TS_MASK         ((1ULL << TS_SHIFT) - 1)
324 #define TS_DELTA_TEST   (~TS_MASK)
325
326 /* Flag when events were overwritten */
327 #define RB_MISSED_EVENTS        (1 << 31)
328 /* Missed count stored at end */
329 #define RB_MISSED_STORED        (1 << 30)
330
331 struct buffer_data_page {
332         u64              time_stamp;    /* page time stamp */
333         local_t          commit;        /* write committed index */
334         unsigned char    data[];        /* data of buffer page */
335 };
336
337 /*
338  * Note, the buffer_page list must be first. The buffer pages
339  * are allocated in cache lines, which means that each buffer
340  * page will be at the beginning of a cache line, and thus
341  * the least significant bits will be zero. We use this to
342  * add flags in the list struct pointers, to make the ring buffer
343  * lockless.
344  */
345 struct buffer_page {
346         struct list_head list;          /* list of buffer pages */
347         local_t          write;         /* index for next write */
348         unsigned         read;          /* index for next read */
349         local_t          entries;       /* entries on this page */
350         unsigned long    real_end;      /* real end of data */
351         struct buffer_data_page *page;  /* Actual data page */
352 };
353
354 /*
355  * The buffer page counters, write and entries, must be reset
356  * atomically when crossing page boundaries. To synchronize this
357  * update, two counters are inserted into the number. One is
358  * the actual counter for the write position or count on the page.
359  *
360  * The other is a counter of updaters. Before an update happens
361  * the update partition of the counter is incremented. This will
362  * allow the updater to update the counter atomically.
363  *
364  * The counter is 20 bits, and the state data is 12.
365  */
366 #define RB_WRITE_MASK           0xfffff
367 #define RB_WRITE_INTCNT         (1 << 20)
368
369 static void rb_init_page(struct buffer_data_page *bpage)
370 {
371         local_set(&bpage->commit, 0);
372 }
373
374 /**
375  * ring_buffer_page_len - the size of data on the page.
376  * @page: The page to read
377  *
378  * Returns the amount of data on the page, including buffer page header.
379  */
380 size_t ring_buffer_page_len(void *page)
381 {
382         return local_read(&((struct buffer_data_page *)page)->commit)
383                 + BUF_PAGE_HDR_SIZE;
384 }
385
386 /*
387  * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
388  * this issue out.
389  */
390 static void free_buffer_page(struct buffer_page *bpage)
391 {
392         free_page((unsigned long)bpage->page);
393         kfree(bpage);
394 }
395
396 /*
397  * We need to fit the time_stamp delta into 27 bits.
398  */
399 static inline int test_time_stamp(u64 delta)
400 {
401         if (delta & TS_DELTA_TEST)
402                 return 1;
403         return 0;
404 }
405
406 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE)
407
408 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */
409 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2))
410
411 int ring_buffer_print_page_header(struct trace_seq *s)
412 {
413         struct buffer_data_page field;
414         int ret;
415
416         ret = trace_seq_printf(s, "\tfield: u64 timestamp;\t"
417                                "offset:0;\tsize:%u;\tsigned:%u;\n",
418                                (unsigned int)sizeof(field.time_stamp),
419                                (unsigned int)is_signed_type(u64));
420
421         ret = trace_seq_printf(s, "\tfield: local_t commit;\t"
422                                "offset:%u;\tsize:%u;\tsigned:%u;\n",
423                                (unsigned int)offsetof(typeof(field), commit),
424                                (unsigned int)sizeof(field.commit),
425                                (unsigned int)is_signed_type(long));
426
427         ret = trace_seq_printf(s, "\tfield: int overwrite;\t"
428                                "offset:%u;\tsize:%u;\tsigned:%u;\n",
429                                (unsigned int)offsetof(typeof(field), commit),
430                                1,
431                                (unsigned int)is_signed_type(long));
432
433         ret = trace_seq_printf(s, "\tfield: char data;\t"
434                                "offset:%u;\tsize:%u;\tsigned:%u;\n",
435                                (unsigned int)offsetof(typeof(field), data),
436                                (unsigned int)BUF_PAGE_SIZE,
437                                (unsigned int)is_signed_type(char));
438
439         return ret;
440 }
441
442 /*
443  * head_page == tail_page && head == tail then buffer is empty.
444  */
445 struct ring_buffer_per_cpu {
446         int                             cpu;
447         atomic_t                        record_disabled;
448         struct ring_buffer              *buffer;
449         raw_spinlock_t                  reader_lock;    /* serialize readers */
450         arch_spinlock_t                 lock;
451         struct lock_class_key           lock_key;
452         struct list_head                *pages;
453         struct buffer_page              *head_page;     /* read from head */
454         struct buffer_page              *tail_page;     /* write to tail */
455         struct buffer_page              *commit_page;   /* committed pages */
456         struct buffer_page              *reader_page;
457         unsigned long                   lost_events;
458         unsigned long                   last_overrun;
459         local_t                         entries_bytes;
460         local_t                         commit_overrun;
461         local_t                         overrun;
462         local_t                         entries;
463         local_t                         committing;
464         local_t                         commits;
465         unsigned long                   read;
466         unsigned long                   read_bytes;
467         u64                             write_stamp;
468         u64                             read_stamp;
469 };
470
471 struct ring_buffer {
472         unsigned                        pages;
473         unsigned                        flags;
474         int                             cpus;
475         atomic_t                        record_disabled;
476         cpumask_var_t                   cpumask;
477
478         struct lock_class_key           *reader_lock_key;
479
480         struct mutex                    mutex;
481
482         struct ring_buffer_per_cpu      **buffers;
483
484 #ifdef CONFIG_HOTPLUG_CPU
485         struct notifier_block           cpu_notify;
486 #endif
487         u64                             (*clock)(void);
488 };
489
490 struct ring_buffer_iter {
491         struct ring_buffer_per_cpu      *cpu_buffer;
492         unsigned long                   head;
493         struct buffer_page              *head_page;
494         struct buffer_page              *cache_reader_page;
495         unsigned long                   cache_read;
496         u64                             read_stamp;
497 };
498
499 /* buffer may be either ring_buffer or ring_buffer_per_cpu */
500 #define RB_WARN_ON(b, cond)                                             \
501         ({                                                              \
502                 int _____ret = unlikely(cond);                          \
503                 if (_____ret) {                                         \
504                         if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
505                                 struct ring_buffer_per_cpu *__b =       \
506                                         (void *)b;                      \
507                                 atomic_inc(&__b->buffer->record_disabled); \
508                         } else                                          \
509                                 atomic_inc(&b->record_disabled);        \
510                         WARN_ON(1);                                     \
511                 }                                                       \
512                 _____ret;                                               \
513         })
514
515 /* Up this if you want to test the TIME_EXTENTS and normalization */
516 #define DEBUG_SHIFT 0
517
518 static inline u64 rb_time_stamp(struct ring_buffer *buffer)
519 {
520         /* shift to debug/test normalization and TIME_EXTENTS */
521         return buffer->clock() << DEBUG_SHIFT;
522 }
523
524 u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu)
525 {
526         u64 time;
527
528         preempt_disable_notrace();
529         time = rb_time_stamp(buffer);
530         preempt_enable_no_resched_notrace();
531
532         return time;
533 }
534 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);
535
536 void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer,
537                                       int cpu, u64 *ts)
538 {
539         /* Just stupid testing the normalize function and deltas */
540         *ts >>= DEBUG_SHIFT;
541 }
542 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
543
544 /*
545  * Making the ring buffer lockless makes things tricky.
546  * Although writes only happen on the CPU that they are on,
547  * and they only need to worry about interrupts. Reads can
548  * happen on any CPU.
549  *
550  * The reader page is always off the ring buffer, but when the
551  * reader finishes with a page, it needs to swap its page with
552  * a new one from the buffer. The reader needs to take from
553  * the head (writes go to the tail). But if a writer is in overwrite
554  * mode and wraps, it must push the head page forward.
555  *
556  * Here lies the problem.
557  *
558  * The reader must be careful to replace only the head page, and
559  * not another one. As described at the top of the file in the
560  * ASCII art, the reader sets its old page to point to the next
561  * page after head. It then sets the page after head to point to
562  * the old reader page. But if the writer moves the head page
563  * during this operation, the reader could end up with the tail.
564  *
565  * We use cmpxchg to help prevent this race. We also do something
566  * special with the page before head. We set the LSB to 1.
567  *
568  * When the writer must push the page forward, it will clear the
569  * bit that points to the head page, move the head, and then set
570  * the bit that points to the new head page.
571  *
572  * We also don't want an interrupt coming in and moving the head
573  * page on another writer. Thus we use the second LSB to catch
574  * that too. Thus:
575  *
576  * head->list->prev->next        bit 1          bit 0
577  *                              -------        -------
578  * Normal page                     0              0
579  * Points to head page             0              1
580  * New head page                   1              0
581  *
582  * Note we can not trust the prev pointer of the head page, because:
583  *
584  * +----+       +-----+        +-----+
585  * |    |------>|  T  |---X--->|  N  |
586  * |    |<------|     |        |     |
587  * +----+       +-----+        +-----+
588  *   ^                           ^ |
589  *   |          +-----+          | |
590  *   +----------|  R  |----------+ |
591  *              |     |<-----------+
592  *              +-----+
593  *
594  * Key:  ---X-->  HEAD flag set in pointer
595  *         T      Tail page
596  *         R      Reader page
597  *         N      Next page
598  *
599  * (see __rb_reserve_next() to see where this happens)
600  *
601  *  What the above shows is that the reader just swapped out
602  *  the reader page with a page in the buffer, but before it
603  *  could make the new header point back to the new page added
604  *  it was preempted by a writer. The writer moved forward onto
605  *  the new page added by the reader and is about to move forward
606  *  again.
607  *
608  *  You can see, it is legitimate for the previous pointer of
609  *  the head (or any page) not to point back to itself. But only
610  *  temporarially.
611  */
612
613 #define RB_PAGE_NORMAL          0UL
614 #define RB_PAGE_HEAD            1UL
615 #define RB_PAGE_UPDATE          2UL
616
617
618 #define RB_FLAG_MASK            3UL
619
620 /* PAGE_MOVED is not part of the mask */
621 #define RB_PAGE_MOVED           4UL
622
623 /*
624  * rb_list_head - remove any bit
625  */
626 static struct list_head *rb_list_head(struct list_head *list)
627 {
628         unsigned long val = (unsigned long)list;
629
630         return (struct list_head *)(val & ~RB_FLAG_MASK);
631 }
632
633 /*
634  * rb_is_head_page - test if the given page is the head page
635  *
636  * Because the reader may move the head_page pointer, we can
637  * not trust what the head page is (it may be pointing to
638  * the reader page). But if the next page is a header page,
639  * its flags will be non zero.
640  */
641 static inline int
642 rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer,
643                 struct buffer_page *page, struct list_head *list)
644 {
645         unsigned long val;
646
647         val = (unsigned long)list->next;
648
649         if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list)
650                 return RB_PAGE_MOVED;
651
652         return val & RB_FLAG_MASK;
653 }
654
655 /*
656  * rb_is_reader_page
657  *
658  * The unique thing about the reader page, is that, if the
659  * writer is ever on it, the previous pointer never points
660  * back to the reader page.
661  */
662 static int rb_is_reader_page(struct buffer_page *page)
663 {
664         struct list_head *list = page->list.prev;
665
666         return rb_list_head(list->next) != &page->list;
667 }
668
669 /*
670  * rb_set_list_to_head - set a list_head to be pointing to head.
671  */
672 static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer,
673                                 struct list_head *list)
674 {
675         unsigned long *ptr;
676
677         ptr = (unsigned long *)&list->next;
678         *ptr |= RB_PAGE_HEAD;
679         *ptr &= ~RB_PAGE_UPDATE;
680 }
681
682 /*
683  * rb_head_page_activate - sets up head page
684  */
685 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer)
686 {
687         struct buffer_page *head;
688
689         head = cpu_buffer->head_page;
690         if (!head)
691                 return;
692
693         /*
694          * Set the previous list pointer to have the HEAD flag.
695          */
696         rb_set_list_to_head(cpu_buffer, head->list.prev);
697 }
698
699 static void rb_list_head_clear(struct list_head *list)
700 {
701         unsigned long *ptr = (unsigned long *)&list->next;
702
703         *ptr &= ~RB_FLAG_MASK;
704 }
705
706 /*
707  * rb_head_page_dactivate - clears head page ptr (for free list)
708  */
709 static void
710 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer)
711 {
712         struct list_head *hd;
713
714         /* Go through the whole list and clear any pointers found. */
715         rb_list_head_clear(cpu_buffer->pages);
716
717         list_for_each(hd, cpu_buffer->pages)
718                 rb_list_head_clear(hd);
719 }
720
721 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer,
722                             struct buffer_page *head,
723                             struct buffer_page *prev,
724                             int old_flag, int new_flag)
725 {
726         struct list_head *list;
727         unsigned long val = (unsigned long)&head->list;
728         unsigned long ret;
729
730         list = &prev->list;
731
732         val &= ~RB_FLAG_MASK;
733
734         ret = cmpxchg((unsigned long *)&list->next,
735                       val | old_flag, val | new_flag);
736
737         /* check if the reader took the page */
738         if ((ret & ~RB_FLAG_MASK) != val)
739                 return RB_PAGE_MOVED;
740
741         return ret & RB_FLAG_MASK;
742 }
743
744 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer,
745                                    struct buffer_page *head,
746                                    struct buffer_page *prev,
747                                    int old_flag)
748 {
749         return rb_head_page_set(cpu_buffer, head, prev,
750                                 old_flag, RB_PAGE_UPDATE);
751 }
752
753 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer,
754                                  struct buffer_page *head,
755                                  struct buffer_page *prev,
756                                  int old_flag)
757 {
758         return rb_head_page_set(cpu_buffer, head, prev,
759                                 old_flag, RB_PAGE_HEAD);
760 }
761
762 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer,
763                                    struct buffer_page *head,
764                                    struct buffer_page *prev,
765                                    int old_flag)
766 {
767         return rb_head_page_set(cpu_buffer, head, prev,
768                                 old_flag, RB_PAGE_NORMAL);
769 }
770
771 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
772                                struct buffer_page **bpage)
773 {
774         struct list_head *p = rb_list_head((*bpage)->list.next);
775
776         *bpage = list_entry(p, struct buffer_page, list);
777 }
778
779 static struct buffer_page *
780 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
781 {
782         struct buffer_page *head;
783         struct buffer_page *page;
784         struct list_head *list;
785         int i;
786
787         if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
788                 return NULL;
789
790         /* sanity check */
791         list = cpu_buffer->pages;
792         if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list))
793                 return NULL;
794
795         page = head = cpu_buffer->head_page;
796         /*
797          * It is possible that the writer moves the header behind
798          * where we started, and we miss in one loop.
799          * A second loop should grab the header, but we'll do
800          * three loops just because I'm paranoid.
801          */
802         for (i = 0; i < 3; i++) {
803                 do {
804                         if (rb_is_head_page(cpu_buffer, page, page->list.prev)) {
805                                 cpu_buffer->head_page = page;
806                                 return page;
807                         }
808                         rb_inc_page(cpu_buffer, &page);
809                 } while (page != head);
810         }
811
812         RB_WARN_ON(cpu_buffer, 1);
813
814         return NULL;
815 }
816
817 static int rb_head_page_replace(struct buffer_page *old,
818                                 struct buffer_page *new)
819 {
820         unsigned long *ptr = (unsigned long *)&old->list.prev->next;
821         unsigned long val;
822         unsigned long ret;
823
824         val = *ptr & ~RB_FLAG_MASK;
825         val |= RB_PAGE_HEAD;
826
827         ret = cmpxchg(ptr, val, (unsigned long)&new->list);
828
829         return ret == val;
830 }
831
832 /*
833  * rb_tail_page_update - move the tail page forward
834  *
835  * Returns 1 if moved tail page, 0 if someone else did.
836  */
837 static int rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
838                                struct buffer_page *tail_page,
839                                struct buffer_page *next_page)
840 {
841         struct buffer_page *old_tail;
842         unsigned long old_entries;
843         unsigned long old_write;
844         int ret = 0;
845
846         /*
847          * The tail page now needs to be moved forward.
848          *
849          * We need to reset the tail page, but without messing
850          * with possible erasing of data brought in by interrupts
851          * that have moved the tail page and are currently on it.
852          *
853          * We add a counter to the write field to denote this.
854          */
855         old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write);
856         old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries);
857
858         /*
859          * Just make sure we have seen our old_write and synchronize
860          * with any interrupts that come in.
861          */
862         barrier();
863
864         /*
865          * If the tail page is still the same as what we think
866          * it is, then it is up to us to update the tail
867          * pointer.
868          */
869         if (tail_page == cpu_buffer->tail_page) {
870                 /* Zero the write counter */
871                 unsigned long val = old_write & ~RB_WRITE_MASK;
872                 unsigned long eval = old_entries & ~RB_WRITE_MASK;
873
874                 /*
875                  * This will only succeed if an interrupt did
876                  * not come in and change it. In which case, we
877                  * do not want to modify it.
878                  *
879                  * We add (void) to let the compiler know that we do not care
880                  * about the return value of these functions. We use the
881                  * cmpxchg to only update if an interrupt did not already
882                  * do it for us. If the cmpxchg fails, we don't care.
883                  */
884                 (void)local_cmpxchg(&next_page->write, old_write, val);
885                 (void)local_cmpxchg(&next_page->entries, old_entries, eval);
886
887                 /*
888                  * No need to worry about races with clearing out the commit.
889                  * it only can increment when a commit takes place. But that
890                  * only happens in the outer most nested commit.
891                  */
892                 local_set(&next_page->page->commit, 0);
893
894                 old_tail = cmpxchg(&cpu_buffer->tail_page,
895                                    tail_page, next_page);
896
897                 if (old_tail == tail_page)
898                         ret = 1;
899         }
900
901         return ret;
902 }
903
904 static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
905                           struct buffer_page *bpage)
906 {
907         unsigned long val = (unsigned long)bpage;
908
909         if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK))
910                 return 1;
911
912         return 0;
913 }
914
915 /**
916  * rb_check_list - make sure a pointer to a list has the last bits zero
917  */
918 static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer,
919                          struct list_head *list)
920 {
921         if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev))
922                 return 1;
923         if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next))
924                 return 1;
925         return 0;
926 }
927
928 /**
929  * check_pages - integrity check of buffer pages
930  * @cpu_buffer: CPU buffer with pages to test
931  *
932  * As a safety measure we check to make sure the data pages have not
933  * been corrupted.
934  */
935 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
936 {
937         struct list_head *head = cpu_buffer->pages;
938         struct buffer_page *bpage, *tmp;
939
940         rb_head_page_deactivate(cpu_buffer);
941
942         if (RB_WARN_ON(cpu_buffer, head->next->prev != head))
943                 return -1;
944         if (RB_WARN_ON(cpu_buffer, head->prev->next != head))
945                 return -1;
946
947         if (rb_check_list(cpu_buffer, head))
948                 return -1;
949
950         list_for_each_entry_safe(bpage, tmp, head, list) {
951                 if (RB_WARN_ON(cpu_buffer,
952                                bpage->list.next->prev != &bpage->list))
953                         return -1;
954                 if (RB_WARN_ON(cpu_buffer,
955                                bpage->list.prev->next != &bpage->list))
956                         return -1;
957                 if (rb_check_list(cpu_buffer, &bpage->list))
958                         return -1;
959         }
960
961         rb_head_page_activate(cpu_buffer);
962
963         return 0;
964 }
965
966 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
967                              unsigned nr_pages)
968 {
969         struct buffer_page *bpage, *tmp;
970         LIST_HEAD(pages);
971         unsigned i;
972
973         WARN_ON(!nr_pages);
974
975         for (i = 0; i < nr_pages; i++) {
976                 struct page *page;
977                 /*
978                  * __GFP_NORETRY flag makes sure that the allocation fails
979                  * gracefully without invoking oom-killer and the system is
980                  * not destabilized.
981                  */
982                 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
983                                     GFP_KERNEL | __GFP_NORETRY,
984                                     cpu_to_node(cpu_buffer->cpu));
985                 if (!bpage)
986                         goto free_pages;
987
988                 rb_check_bpage(cpu_buffer, bpage);
989
990                 list_add(&bpage->list, &pages);
991
992                 page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu),
993                                         GFP_KERNEL | __GFP_NORETRY, 0);
994                 if (!page)
995                         goto free_pages;
996                 bpage->page = page_address(page);
997                 rb_init_page(bpage->page);
998         }
999
1000         /*
1001          * The ring buffer page list is a circular list that does not
1002          * start and end with a list head. All page list items point to
1003          * other pages.
1004          */
1005         cpu_buffer->pages = pages.next;
1006         list_del(&pages);
1007
1008         rb_check_pages(cpu_buffer);
1009
1010         return 0;
1011
1012  free_pages:
1013         list_for_each_entry_safe(bpage, tmp, &pages, list) {
1014                 list_del_init(&bpage->list);
1015                 free_buffer_page(bpage);
1016         }
1017         return -ENOMEM;
1018 }
1019
1020 static struct ring_buffer_per_cpu *
1021 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
1022 {
1023         struct ring_buffer_per_cpu *cpu_buffer;
1024         struct buffer_page *bpage;
1025         struct page *page;
1026         int ret;
1027
1028         cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
1029                                   GFP_KERNEL, cpu_to_node(cpu));
1030         if (!cpu_buffer)
1031                 return NULL;
1032
1033         cpu_buffer->cpu = cpu;
1034         cpu_buffer->buffer = buffer;
1035         raw_spin_lock_init(&cpu_buffer->reader_lock);
1036         lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
1037         cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
1038
1039         bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1040                             GFP_KERNEL, cpu_to_node(cpu));
1041         if (!bpage)
1042                 goto fail_free_buffer;
1043
1044         rb_check_bpage(cpu_buffer, bpage);
1045
1046         cpu_buffer->reader_page = bpage;
1047         page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0);
1048         if (!page)
1049                 goto fail_free_reader;
1050         bpage->page = page_address(page);
1051         rb_init_page(bpage->page);
1052
1053         INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
1054
1055         ret = rb_allocate_pages(cpu_buffer, buffer->pages);
1056         if (ret < 0)
1057                 goto fail_free_reader;
1058
1059         cpu_buffer->head_page
1060                 = list_entry(cpu_buffer->pages, struct buffer_page, list);
1061         cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
1062
1063         rb_head_page_activate(cpu_buffer);
1064
1065         return cpu_buffer;
1066
1067  fail_free_reader:
1068         free_buffer_page(cpu_buffer->reader_page);
1069
1070  fail_free_buffer:
1071         kfree(cpu_buffer);
1072         return NULL;
1073 }
1074
1075 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
1076 {
1077         struct list_head *head = cpu_buffer->pages;
1078         struct buffer_page *bpage, *tmp;
1079
1080         free_buffer_page(cpu_buffer->reader_page);
1081
1082         rb_head_page_deactivate(cpu_buffer);
1083
1084         if (head) {
1085                 list_for_each_entry_safe(bpage, tmp, head, list) {
1086                         list_del_init(&bpage->list);
1087                         free_buffer_page(bpage);
1088                 }
1089                 bpage = list_entry(head, struct buffer_page, list);
1090                 free_buffer_page(bpage);
1091         }
1092
1093         kfree(cpu_buffer);
1094 }
1095
1096 #ifdef CONFIG_HOTPLUG_CPU
1097 static int rb_cpu_notify(struct notifier_block *self,
1098                          unsigned long action, void *hcpu);
1099 #endif
1100
1101 /**
1102  * ring_buffer_alloc - allocate a new ring_buffer
1103  * @size: the size in bytes per cpu that is needed.
1104  * @flags: attributes to set for the ring buffer.
1105  *
1106  * Currently the only flag that is available is the RB_FL_OVERWRITE
1107  * flag. This flag means that the buffer will overwrite old data
1108  * when the buffer wraps. If this flag is not set, the buffer will
1109  * drop data when the tail hits the head.
1110  */
1111 struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
1112                                         struct lock_class_key *key)
1113 {
1114         struct ring_buffer *buffer;
1115         int bsize;
1116         int cpu;
1117
1118         /* keep it in its own cache line */
1119         buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
1120                          GFP_KERNEL);
1121         if (!buffer)
1122                 return NULL;
1123
1124         if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
1125                 goto fail_free_buffer;
1126
1127         buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
1128         buffer->flags = flags;
1129         buffer->clock = trace_clock_local;
1130         buffer->reader_lock_key = key;
1131
1132         /* need at least two pages */
1133         if (buffer->pages < 2)
1134                 buffer->pages = 2;
1135
1136         /*
1137          * In case of non-hotplug cpu, if the ring-buffer is allocated
1138          * in early initcall, it will not be notified of secondary cpus.
1139          * In that off case, we need to allocate for all possible cpus.
1140          */
1141 #ifdef CONFIG_HOTPLUG_CPU
1142         get_online_cpus();
1143         cpumask_copy(buffer->cpumask, cpu_online_mask);
1144 #else
1145         cpumask_copy(buffer->cpumask, cpu_possible_mask);
1146 #endif
1147         buffer->cpus = nr_cpu_ids;
1148
1149         bsize = sizeof(void *) * nr_cpu_ids;
1150         buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
1151                                   GFP_KERNEL);
1152         if (!buffer->buffers)
1153                 goto fail_free_cpumask;
1154
1155         for_each_buffer_cpu(buffer, cpu) {
1156                 buffer->buffers[cpu] =
1157                         rb_allocate_cpu_buffer(buffer, cpu);
1158                 if (!buffer->buffers[cpu])
1159                         goto fail_free_buffers;
1160         }
1161
1162 #ifdef CONFIG_HOTPLUG_CPU
1163         buffer->cpu_notify.notifier_call = rb_cpu_notify;
1164         buffer->cpu_notify.priority = 0;
1165         register_cpu_notifier(&buffer->cpu_notify);
1166 #endif
1167
1168         put_online_cpus();
1169         mutex_init(&buffer->mutex);
1170
1171         return buffer;
1172
1173  fail_free_buffers:
1174         for_each_buffer_cpu(buffer, cpu) {
1175                 if (buffer->buffers[cpu])
1176                         rb_free_cpu_buffer(buffer->buffers[cpu]);
1177         }
1178         kfree(buffer->buffers);
1179
1180  fail_free_cpumask:
1181         free_cpumask_var(buffer->cpumask);
1182         put_online_cpus();
1183
1184  fail_free_buffer:
1185         kfree(buffer);
1186         return NULL;
1187 }
1188 EXPORT_SYMBOL_GPL(__ring_buffer_alloc);
1189
1190 /**
1191  * ring_buffer_free - free a ring buffer.
1192  * @buffer: the buffer to free.
1193  */
1194 void
1195 ring_buffer_free(struct ring_buffer *buffer)
1196 {
1197         int cpu;
1198
1199         get_online_cpus();
1200
1201 #ifdef CONFIG_HOTPLUG_CPU
1202         unregister_cpu_notifier(&buffer->cpu_notify);
1203 #endif
1204
1205         for_each_buffer_cpu(buffer, cpu)
1206                 rb_free_cpu_buffer(buffer->buffers[cpu]);
1207
1208         put_online_cpus();
1209
1210         kfree(buffer->buffers);
1211         free_cpumask_var(buffer->cpumask);
1212
1213         kfree(buffer);
1214 }
1215 EXPORT_SYMBOL_GPL(ring_buffer_free);
1216
1217 void ring_buffer_set_clock(struct ring_buffer *buffer,
1218                            u64 (*clock)(void))
1219 {
1220         buffer->clock = clock;
1221 }
1222
1223 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
1224
1225 static void
1226 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
1227 {
1228         struct buffer_page *bpage;
1229         struct list_head *p;
1230         unsigned i;
1231
1232         raw_spin_lock_irq(&cpu_buffer->reader_lock);
1233         rb_head_page_deactivate(cpu_buffer);
1234
1235         for (i = 0; i < nr_pages; i++) {
1236                 if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)))
1237                         goto out;
1238                 p = cpu_buffer->pages->next;
1239                 bpage = list_entry(p, struct buffer_page, list);
1240                 list_del_init(&bpage->list);
1241                 free_buffer_page(bpage);
1242         }
1243         if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)))
1244                 goto out;
1245
1246         rb_reset_cpu(cpu_buffer);
1247         rb_check_pages(cpu_buffer);
1248
1249 out:
1250         raw_spin_unlock_irq(&cpu_buffer->reader_lock);
1251 }
1252
1253 static void
1254 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
1255                 struct list_head *pages, unsigned nr_pages)
1256 {
1257         struct buffer_page *bpage;
1258         struct list_head *p;
1259         unsigned i;
1260
1261         raw_spin_lock_irq(&cpu_buffer->reader_lock);
1262         rb_head_page_deactivate(cpu_buffer);
1263
1264         for (i = 0; i < nr_pages; i++) {
1265                 if (RB_WARN_ON(cpu_buffer, list_empty(pages)))
1266                         goto out;
1267                 p = pages->next;
1268                 bpage = list_entry(p, struct buffer_page, list);
1269                 list_del_init(&bpage->list);
1270                 list_add_tail(&bpage->list, cpu_buffer->pages);
1271         }
1272         rb_reset_cpu(cpu_buffer);
1273         rb_check_pages(cpu_buffer);
1274
1275 out:
1276         raw_spin_unlock_irq(&cpu_buffer->reader_lock);
1277 }
1278
1279 /**
1280  * ring_buffer_resize - resize the ring buffer
1281  * @buffer: the buffer to resize.
1282  * @size: the new size.
1283  *
1284  * Minimum size is 2 * BUF_PAGE_SIZE.
1285  *
1286  * Returns -1 on failure.
1287  */
1288 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
1289 {
1290         struct ring_buffer_per_cpu *cpu_buffer;
1291         unsigned nr_pages, rm_pages, new_pages;
1292         struct buffer_page *bpage, *tmp;
1293         unsigned long buffer_size;
1294         LIST_HEAD(pages);
1295         int i, cpu;
1296
1297         /*
1298          * Always succeed at resizing a non-existent buffer:
1299          */
1300         if (!buffer)
1301                 return size;
1302
1303         size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
1304         size *= BUF_PAGE_SIZE;
1305         buffer_size = buffer->pages * BUF_PAGE_SIZE;
1306
1307         /* we need a minimum of two pages */
1308         if (size < BUF_PAGE_SIZE * 2)
1309                 size = BUF_PAGE_SIZE * 2;
1310
1311         if (size == buffer_size)
1312                 return size;
1313
1314         atomic_inc(&buffer->record_disabled);
1315
1316         /* Make sure all writers are done with this buffer. */
1317         synchronize_sched();
1318
1319         mutex_lock(&buffer->mutex);
1320         get_online_cpus();
1321
1322         nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
1323
1324         if (size < buffer_size) {
1325
1326                 /* easy case, just free pages */
1327                 if (RB_WARN_ON(buffer, nr_pages >= buffer->pages))
1328                         goto out_fail;
1329
1330                 rm_pages = buffer->pages - nr_pages;
1331
1332                 for_each_buffer_cpu(buffer, cpu) {
1333                         cpu_buffer = buffer->buffers[cpu];
1334                         rb_remove_pages(cpu_buffer, rm_pages);
1335                 }
1336                 goto out;
1337         }
1338
1339         /*
1340          * This is a bit more difficult. We only want to add pages
1341          * when we can allocate enough for all CPUs. We do this
1342          * by allocating all the pages and storing them on a local
1343          * link list. If we succeed in our allocation, then we
1344          * add these pages to the cpu_buffers. Otherwise we just free
1345          * them all and return -ENOMEM;
1346          */
1347         if (RB_WARN_ON(buffer, nr_pages <= buffer->pages))
1348                 goto out_fail;
1349
1350         new_pages = nr_pages - buffer->pages;
1351
1352         for_each_buffer_cpu(buffer, cpu) {
1353                 for (i = 0; i < new_pages; i++) {
1354                         struct page *page;
1355                         /*
1356                          * __GFP_NORETRY flag makes sure that the allocation
1357                          * fails gracefully without invoking oom-killer and
1358                          * the system is not destabilized.
1359                          */
1360                         bpage = kzalloc_node(ALIGN(sizeof(*bpage),
1361                                                   cache_line_size()),
1362                                             GFP_KERNEL | __GFP_NORETRY,
1363                                             cpu_to_node(cpu));
1364                         if (!bpage)
1365                                 goto free_pages;
1366                         list_add(&bpage->list, &pages);
1367                         page = alloc_pages_node(cpu_to_node(cpu),
1368                                                 GFP_KERNEL | __GFP_NORETRY, 0);
1369                         if (!page)
1370                                 goto free_pages;
1371                         bpage->page = page_address(page);
1372                         rb_init_page(bpage->page);
1373                 }
1374         }
1375
1376         for_each_buffer_cpu(buffer, cpu) {
1377                 cpu_buffer = buffer->buffers[cpu];
1378                 rb_insert_pages(cpu_buffer, &pages, new_pages);
1379         }
1380
1381         if (RB_WARN_ON(buffer, !list_empty(&pages)))
1382                 goto out_fail;
1383
1384  out:
1385         buffer->pages = nr_pages;
1386         put_online_cpus();
1387         mutex_unlock(&buffer->mutex);
1388
1389         atomic_dec(&buffer->record_disabled);
1390
1391         return size;
1392
1393  free_pages:
1394         list_for_each_entry_safe(bpage, tmp, &pages, list) {
1395                 list_del_init(&bpage->list);
1396                 free_buffer_page(bpage);
1397         }
1398         put_online_cpus();
1399         mutex_unlock(&buffer->mutex);
1400         atomic_dec(&buffer->record_disabled);
1401         return -ENOMEM;
1402
1403         /*
1404          * Something went totally wrong, and we are too paranoid
1405          * to even clean up the mess.
1406          */
1407  out_fail:
1408         put_online_cpus();
1409         mutex_unlock(&buffer->mutex);
1410         atomic_dec(&buffer->record_disabled);
1411         return -1;
1412 }
1413 EXPORT_SYMBOL_GPL(ring_buffer_resize);
1414
1415 void ring_buffer_change_overwrite(struct ring_buffer *buffer, int val)
1416 {
1417         mutex_lock(&buffer->mutex);
1418         if (val)
1419                 buffer->flags |= RB_FL_OVERWRITE;
1420         else
1421                 buffer->flags &= ~RB_FL_OVERWRITE;
1422         mutex_unlock(&buffer->mutex);
1423 }
1424 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite);
1425
1426 static inline void *
1427 __rb_data_page_index(struct buffer_data_page *bpage, unsigned index)
1428 {
1429         return bpage->data + index;
1430 }
1431
1432 static inline void *__rb_page_index(struct buffer_page *bpage, unsigned index)
1433 {
1434         return bpage->page->data + index;
1435 }
1436
1437 static inline struct ring_buffer_event *
1438 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
1439 {
1440         return __rb_page_index(cpu_buffer->reader_page,
1441                                cpu_buffer->reader_page->read);
1442 }
1443
1444 static inline struct ring_buffer_event *
1445 rb_iter_head_event(struct ring_buffer_iter *iter)
1446 {
1447         return __rb_page_index(iter->head_page, iter->head);
1448 }
1449
1450 static inline unsigned long rb_page_write(struct buffer_page *bpage)
1451 {
1452         return local_read(&bpage->write) & RB_WRITE_MASK;
1453 }
1454
1455 static inline unsigned rb_page_commit(struct buffer_page *bpage)
1456 {
1457         return local_read(&bpage->page->commit);
1458 }
1459
1460 static inline unsigned long rb_page_entries(struct buffer_page *bpage)
1461 {
1462         return local_read(&bpage->entries) & RB_WRITE_MASK;
1463 }
1464
1465 /* Size is determined by what has been committed */
1466 static inline unsigned rb_page_size(struct buffer_page *bpage)
1467 {
1468         return rb_page_commit(bpage);
1469 }
1470
1471 static inline unsigned
1472 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
1473 {
1474         return rb_page_commit(cpu_buffer->commit_page);
1475 }
1476
1477 static inline unsigned
1478 rb_event_index(struct ring_buffer_event *event)
1479 {
1480         unsigned long addr = (unsigned long)event;
1481
1482         return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE;
1483 }
1484
1485 static inline int
1486 rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
1487                    struct ring_buffer_event *event)
1488 {
1489         unsigned long addr = (unsigned long)event;
1490         unsigned long index;
1491
1492         index = rb_event_index(event);
1493         addr &= PAGE_MASK;
1494
1495         return cpu_buffer->commit_page->page == (void *)addr &&
1496                 rb_commit_index(cpu_buffer) == index;
1497 }
1498
1499 static void
1500 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
1501 {
1502         unsigned long max_count;
1503
1504         /*
1505          * We only race with interrupts and NMIs on this CPU.
1506          * If we own the commit event, then we can commit
1507          * all others that interrupted us, since the interruptions
1508          * are in stack format (they finish before they come
1509          * back to us). This allows us to do a simple loop to
1510          * assign the commit to the tail.
1511          */
1512  again:
1513         max_count = cpu_buffer->buffer->pages * 100;
1514
1515         while (cpu_buffer->commit_page != cpu_buffer->tail_page) {
1516                 if (RB_WARN_ON(cpu_buffer, !(--max_count)))
1517                         return;
1518                 if (RB_WARN_ON(cpu_buffer,
1519                                rb_is_reader_page(cpu_buffer->tail_page)))
1520                         return;
1521                 local_set(&cpu_buffer->commit_page->page->commit,
1522                           rb_page_write(cpu_buffer->commit_page));
1523                 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
1524                 cpu_buffer->write_stamp =
1525                         cpu_buffer->commit_page->page->time_stamp;
1526                 /* add barrier to keep gcc from optimizing too much */
1527                 barrier();
1528         }
1529         while (rb_commit_index(cpu_buffer) !=
1530                rb_page_write(cpu_buffer->commit_page)) {
1531
1532                 local_set(&cpu_buffer->commit_page->page->commit,
1533                           rb_page_write(cpu_buffer->commit_page));
1534                 RB_WARN_ON(cpu_buffer,
1535                            local_read(&cpu_buffer->commit_page->page->commit) &
1536                            ~RB_WRITE_MASK);
1537                 barrier();
1538         }
1539
1540         /* again, keep gcc from optimizing */
1541         barrier();
1542
1543         /*
1544          * If an interrupt came in just after the first while loop
1545          * and pushed the tail page forward, we will be left with
1546          * a dangling commit that will never go forward.
1547          */
1548         if (unlikely(cpu_buffer->commit_page != cpu_buffer->tail_page))
1549                 goto again;
1550 }
1551
1552 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
1553 {
1554         cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp;
1555         cpu_buffer->reader_page->read = 0;
1556 }
1557
1558 static void rb_inc_iter(struct ring_buffer_iter *iter)
1559 {
1560         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1561
1562         /*
1563          * The iterator could be on the reader page (it starts there).
1564          * But the head could have moved, since the reader was
1565          * found. Check for this case and assign the iterator
1566          * to the head page instead of next.
1567          */
1568         if (iter->head_page == cpu_buffer->reader_page)
1569                 iter->head_page = rb_set_head_page(cpu_buffer);
1570         else
1571                 rb_inc_page(cpu_buffer, &iter->head_page);
1572
1573         iter->read_stamp = iter->head_page->page->time_stamp;
1574         iter->head = 0;
1575 }
1576
1577 /* Slow path, do not inline */
1578 static noinline struct ring_buffer_event *
1579 rb_add_time_stamp(struct ring_buffer_event *event, u64 delta)
1580 {
1581         event->type_len = RINGBUF_TYPE_TIME_EXTEND;
1582
1583         /* Not the first event on the page? */
1584         if (rb_event_index(event)) {
1585                 event->time_delta = delta & TS_MASK;
1586                 event->array[0] = delta >> TS_SHIFT;
1587         } else {
1588                 /* nope, just zero it */
1589                 event->time_delta = 0;
1590                 event->array[0] = 0;
1591         }
1592
1593         return skip_time_extend(event);
1594 }
1595
1596 /**
1597  * ring_buffer_update_event - update event type and data
1598  * @event: the even to update
1599  * @type: the type of event
1600  * @length: the size of the event field in the ring buffer
1601  *
1602  * Update the type and data fields of the event. The length
1603  * is the actual size that is written to the ring buffer,
1604  * and with this, we can determine what to place into the
1605  * data field.
1606  */
1607 static void
1608 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer,
1609                 struct ring_buffer_event *event, unsigned length,
1610                 int add_timestamp, u64 delta)
1611 {
1612         /* Only a commit updates the timestamp */
1613         if (unlikely(!rb_event_is_commit(cpu_buffer, event)))
1614                 delta = 0;
1615
1616         /*
1617          * If we need to add a timestamp, then we
1618          * add it to the start of the resevered space.
1619          */
1620         if (unlikely(add_timestamp)) {
1621                 event = rb_add_time_stamp(event, delta);
1622                 length -= RB_LEN_TIME_EXTEND;
1623                 delta = 0;
1624         }
1625
1626         event->time_delta = delta;
1627         length -= RB_EVNT_HDR_SIZE;
1628         if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) {
1629                 event->type_len = 0;
1630                 event->array[0] = length;
1631         } else
1632                 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT);
1633 }
1634
1635 /*
1636  * rb_handle_head_page - writer hit the head page
1637  *
1638  * Returns: +1 to retry page
1639  *           0 to continue
1640  *          -1 on error
1641  */
1642 static int
1643 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
1644                     struct buffer_page *tail_page,
1645                     struct buffer_page *next_page)
1646 {
1647         struct buffer_page *new_head;
1648         int entries;
1649         int type;
1650         int ret;
1651
1652         entries = rb_page_entries(next_page);
1653
1654         /*
1655          * The hard part is here. We need to move the head
1656          * forward, and protect against both readers on
1657          * other CPUs and writers coming in via interrupts.
1658          */
1659         type = rb_head_page_set_update(cpu_buffer, next_page, tail_page,
1660                                        RB_PAGE_HEAD);
1661
1662         /*
1663          * type can be one of four:
1664          *  NORMAL - an interrupt already moved it for us
1665          *  HEAD   - we are the first to get here.
1666          *  UPDATE - we are the interrupt interrupting
1667          *           a current move.
1668          *  MOVED  - a reader on another CPU moved the next
1669          *           pointer to its reader page. Give up
1670          *           and try again.
1671          */
1672
1673         switch (type) {
1674         case RB_PAGE_HEAD:
1675                 /*
1676                  * We changed the head to UPDATE, thus
1677                  * it is our responsibility to update
1678                  * the counters.
1679                  */
1680                 local_add(entries, &cpu_buffer->overrun);
1681                 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
1682
1683                 /*
1684                  * The entries will be zeroed out when we move the
1685                  * tail page.
1686                  */
1687
1688                 /* still more to do */
1689                 break;
1690
1691         case RB_PAGE_UPDATE:
1692                 /*
1693                  * This is an interrupt that interrupt the
1694                  * previous update. Still more to do.
1695                  */
1696                 break;
1697         case RB_PAGE_NORMAL:
1698                 /*
1699                  * An interrupt came in before the update
1700                  * and processed this for us.
1701                  * Nothing left to do.
1702                  */
1703                 return 1;
1704         case RB_PAGE_MOVED:
1705                 /*
1706                  * The reader is on another CPU and just did
1707                  * a swap with our next_page.
1708                  * Try again.
1709                  */
1710                 return 1;
1711         default:
1712                 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */
1713                 return -1;
1714         }
1715
1716         /*
1717          * Now that we are here, the old head pointer is
1718          * set to UPDATE. This will keep the reader from
1719          * swapping the head page with the reader page.
1720          * The reader (on another CPU) will spin till
1721          * we are finished.
1722          *
1723          * We just need to protect against interrupts
1724          * doing the job. We will set the next pointer
1725          * to HEAD. After that, we set the old pointer
1726          * to NORMAL, but only if it was HEAD before.
1727          * otherwise we are an interrupt, and only
1728          * want the outer most commit to reset it.
1729          */
1730         new_head = next_page;
1731         rb_inc_page(cpu_buffer, &new_head);
1732
1733         ret = rb_head_page_set_head(cpu_buffer, new_head, next_page,
1734                                     RB_PAGE_NORMAL);
1735
1736         /*
1737          * Valid returns are:
1738          *  HEAD   - an interrupt came in and already set it.
1739          *  NORMAL - One of two things:
1740          *            1) We really set it.
1741          *            2) A bunch of interrupts came in and moved
1742          *               the page forward again.
1743          */
1744         switch (ret) {
1745         case RB_PAGE_HEAD:
1746         case RB_PAGE_NORMAL:
1747                 /* OK */
1748                 break;
1749         default:
1750                 RB_WARN_ON(cpu_buffer, 1);
1751                 return -1;
1752         }
1753
1754         /*
1755          * It is possible that an interrupt came in,
1756          * set the head up, then more interrupts came in
1757          * and moved it again. When we get back here,
1758          * the page would have been set to NORMAL but we
1759          * just set it back to HEAD.
1760          *
1761          * How do you detect this? Well, if that happened
1762          * the tail page would have moved.
1763          */
1764         if (ret == RB_PAGE_NORMAL) {
1765                 /*
1766                  * If the tail had moved passed next, then we need
1767                  * to reset the pointer.
1768                  */
1769                 if (cpu_buffer->tail_page != tail_page &&
1770                     cpu_buffer->tail_page != next_page)
1771                         rb_head_page_set_normal(cpu_buffer, new_head,
1772                                                 next_page,
1773                                                 RB_PAGE_HEAD);
1774         }
1775
1776         /*
1777          * If this was the outer most commit (the one that
1778          * changed the original pointer from HEAD to UPDATE),
1779          * then it is up to us to reset it to NORMAL.
1780          */
1781         if (type == RB_PAGE_HEAD) {
1782                 ret = rb_head_page_set_normal(cpu_buffer, next_page,
1783                                               tail_page,
1784                                               RB_PAGE_UPDATE);
1785                 if (RB_WARN_ON(cpu_buffer,
1786                                ret != RB_PAGE_UPDATE))
1787                         return -1;
1788         }
1789
1790         return 0;
1791 }
1792
1793 static unsigned rb_calculate_event_length(unsigned length)
1794 {
1795         struct ring_buffer_event event; /* Used only for sizeof array */
1796
1797         /* zero length can cause confusions */
1798         if (!length)
1799                 length = 1;
1800
1801         if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT)
1802                 length += sizeof(event.array[0]);
1803
1804         length += RB_EVNT_HDR_SIZE;
1805         length = ALIGN(length, RB_ARCH_ALIGNMENT);
1806
1807         return length;
1808 }
1809
1810 static inline void
1811 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
1812               struct buffer_page *tail_page,
1813               unsigned long tail, unsigned long length)
1814 {
1815         struct ring_buffer_event *event;
1816
1817         /*
1818          * Only the event that crossed the page boundary
1819          * must fill the old tail_page with padding.
1820          */
1821         if (tail >= BUF_PAGE_SIZE) {
1822                 /*
1823                  * If the page was filled, then we still need
1824                  * to update the real_end. Reset it to zero
1825                  * and the reader will ignore it.
1826                  */
1827                 if (tail == BUF_PAGE_SIZE)
1828                         tail_page->real_end = 0;
1829
1830                 local_sub(length, &tail_page->write);
1831                 return;
1832         }
1833
1834         event = __rb_page_index(tail_page, tail);
1835         kmemcheck_annotate_bitfield(event, bitfield);
1836
1837         /* account for padding bytes */
1838         local_add(BUF_PAGE_SIZE - tail, &cpu_buffer->entries_bytes);
1839
1840         /*
1841          * Save the original length to the meta data.
1842          * This will be used by the reader to add lost event
1843          * counter.
1844          */
1845         tail_page->real_end = tail;
1846
1847         /*
1848          * If this event is bigger than the minimum size, then
1849          * we need to be careful that we don't subtract the
1850          * write counter enough to allow another writer to slip
1851          * in on this page.
1852          * We put in a discarded commit instead, to make sure
1853          * that this space is not used again.
1854          *
1855          * If we are less than the minimum size, we don't need to
1856          * worry about it.
1857          */
1858         if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) {
1859                 /* No room for any events */
1860
1861                 /* Mark the rest of the page with padding */
1862                 rb_event_set_padding(event);
1863
1864                 /* Set the write back to the previous setting */
1865                 local_sub(length, &tail_page->write);
1866                 return;
1867         }
1868
1869         /* Put in a discarded event */
1870         event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE;
1871         event->type_len = RINGBUF_TYPE_PADDING;
1872         /* time delta must be non zero */
1873         event->time_delta = 1;
1874
1875         /* Set write to end of buffer */
1876         length = (tail + length) - BUF_PAGE_SIZE;
1877         local_sub(length, &tail_page->write);
1878 }
1879
1880 /*
1881  * This is the slow path, force gcc not to inline it.
1882  */
1883 static noinline struct ring_buffer_event *
1884 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
1885              unsigned long length, unsigned long tail,
1886              struct buffer_page *tail_page, u64 ts)
1887 {
1888         struct buffer_page *commit_page = cpu_buffer->commit_page;
1889         struct ring_buffer *buffer = cpu_buffer->buffer;
1890         struct buffer_page *next_page;
1891         int ret;
1892
1893         next_page = tail_page;
1894
1895         rb_inc_page(cpu_buffer, &next_page);
1896
1897         /*
1898          * If for some reason, we had an interrupt storm that made
1899          * it all the way around the buffer, bail, and warn
1900          * about it.
1901          */
1902         if (unlikely(next_page == commit_page)) {
1903                 local_inc(&cpu_buffer->commit_overrun);
1904                 goto out_reset;
1905         }
1906
1907         /*
1908          * This is where the fun begins!
1909          *
1910          * We are fighting against races between a reader that
1911          * could be on another CPU trying to swap its reader
1912          * page with the buffer head.
1913          *
1914          * We are also fighting against interrupts coming in and
1915          * moving the head or tail on us as well.
1916          *
1917          * If the next page is the head page then we have filled
1918          * the buffer, unless the commit page is still on the
1919          * reader page.
1920          */
1921         if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) {
1922
1923                 /*
1924                  * If the commit is not on the reader page, then
1925                  * move the header page.
1926                  */
1927                 if (!rb_is_reader_page(cpu_buffer->commit_page)) {
1928                         /*
1929                          * If we are not in overwrite mode,
1930                          * this is easy, just stop here.
1931                          */
1932                         if (!(buffer->flags & RB_FL_OVERWRITE))
1933                                 goto out_reset;
1934
1935                         ret = rb_handle_head_page(cpu_buffer,
1936                                                   tail_page,
1937                                                   next_page);
1938                         if (ret < 0)
1939                                 goto out_reset;
1940                         if (ret)
1941                                 goto out_again;
1942                 } else {
1943                         /*
1944                          * We need to be careful here too. The
1945                          * commit page could still be on the reader
1946                          * page. We could have a small buffer, and
1947                          * have filled up the buffer with events
1948                          * from interrupts and such, and wrapped.
1949                          *
1950                          * Note, if the tail page is also the on the
1951                          * reader_page, we let it move out.
1952                          */
1953                         if (unlikely((cpu_buffer->commit_page !=
1954                                       cpu_buffer->tail_page) &&
1955                                      (cpu_buffer->commit_page ==
1956                                       cpu_buffer->reader_page))) {
1957                                 local_inc(&cpu_buffer->commit_overrun);
1958                                 goto out_reset;
1959                         }
1960                 }
1961         }
1962
1963         ret = rb_tail_page_update(cpu_buffer, tail_page, next_page);
1964         if (ret) {
1965                 /*
1966                  * Nested commits always have zero deltas, so
1967                  * just reread the time stamp
1968                  */
1969                 ts = rb_time_stamp(buffer);
1970                 next_page->page->time_stamp = ts;
1971         }
1972
1973  out_again:
1974
1975         rb_reset_tail(cpu_buffer, tail_page, tail, length);
1976
1977         /* fail and let the caller try again */
1978         return ERR_PTR(-EAGAIN);
1979
1980  out_reset:
1981         /* reset write */
1982         rb_reset_tail(cpu_buffer, tail_page, tail, length);
1983
1984         return NULL;
1985 }
1986
1987 static struct ring_buffer_event *
1988 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
1989                   unsigned long length, u64 ts,
1990                   u64 delta, int add_timestamp)
1991 {
1992         struct buffer_page *tail_page;
1993         struct ring_buffer_event *event;
1994         unsigned long tail, write;
1995
1996         /*
1997          * If the time delta since the last event is too big to
1998          * hold in the time field of the event, then we append a
1999          * TIME EXTEND event ahead of the data event.
2000          */
2001         if (unlikely(add_timestamp))
2002                 length += RB_LEN_TIME_EXTEND;
2003
2004         tail_page = cpu_buffer->tail_page;
2005         write = local_add_return(length, &tail_page->write);
2006
2007         /* set write to only the index of the write */
2008         write &= RB_WRITE_MASK;
2009         tail = write - length;
2010
2011         /* See if we shot pass the end of this buffer page */
2012         if (unlikely(write > BUF_PAGE_SIZE))
2013                 return rb_move_tail(cpu_buffer, length, tail,
2014                                     tail_page, ts);
2015
2016         /* We reserved something on the buffer */
2017
2018         event = __rb_page_index(tail_page, tail);
2019         kmemcheck_annotate_bitfield(event, bitfield);
2020         rb_update_event(cpu_buffer, event, length, add_timestamp, delta);
2021
2022         local_inc(&tail_page->entries);
2023
2024         /*
2025          * If this is the first commit on the page, then update
2026          * its timestamp.
2027          */
2028         if (!tail)
2029                 tail_page->page->time_stamp = ts;
2030
2031         /* account for these added bytes */
2032         local_add(length, &cpu_buffer->entries_bytes);
2033
2034         return event;
2035 }
2036
2037 static inline int
2038 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
2039                   struct ring_buffer_event *event)
2040 {
2041         unsigned long new_index, old_index;
2042         struct buffer_page *bpage;
2043         unsigned long index;
2044         unsigned long addr;
2045
2046         new_index = rb_event_index(event);
2047         old_index = new_index + rb_event_ts_length(event);
2048         addr = (unsigned long)event;
2049         addr &= PAGE_MASK;
2050
2051         bpage = cpu_buffer->tail_page;
2052
2053         if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
2054                 unsigned long write_mask =
2055                         local_read(&bpage->write) & ~RB_WRITE_MASK;
2056                 unsigned long event_length = rb_event_length(event);
2057                 /*
2058                  * This is on the tail page. It is possible that
2059                  * a write could come in and move the tail page
2060                  * and write to the next page. That is fine
2061                  * because we just shorten what is on this page.
2062                  */
2063                 old_index += write_mask;
2064                 new_index += write_mask;
2065                 index = local_cmpxchg(&bpage->write, old_index, new_index);
2066                 if (index == old_index) {
2067                         /* update counters */
2068                         local_sub(event_length, &cpu_buffer->entries_bytes);
2069                         return 1;
2070                 }
2071         }
2072
2073         /* could not discard */
2074         return 0;
2075 }
2076
2077 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer)
2078 {
2079         local_inc(&cpu_buffer->committing);
2080         local_inc(&cpu_buffer->commits);
2081 }
2082
2083 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
2084 {
2085         unsigned long commits;
2086
2087         if (RB_WARN_ON(cpu_buffer,
2088                        !local_read(&cpu_buffer->committing)))
2089                 return;
2090
2091  again:
2092         commits = local_read(&cpu_buffer->commits);
2093         /* synchronize with interrupts */
2094         barrier();
2095         if (local_read(&cpu_buffer->committing) == 1)
2096                 rb_set_commit_to_write(cpu_buffer);
2097
2098         local_dec(&cpu_buffer->committing);
2099
2100         /* synchronize with interrupts */
2101         barrier();
2102
2103         /*
2104          * Need to account for interrupts coming in between the
2105          * updating of the commit page and the clearing of the
2106          * committing counter.
2107          */
2108         if (unlikely(local_read(&cpu_buffer->commits) != commits) &&
2109             !local_read(&cpu_buffer->committing)) {
2110                 local_inc(&cpu_buffer->committing);
2111                 goto again;
2112         }
2113 }
2114
2115 static struct ring_buffer_event *
2116 rb_reserve_next_event(struct ring_buffer *buffer,
2117                       struct ring_buffer_per_cpu *cpu_buffer,
2118                       unsigned long length)
2119 {
2120         struct ring_buffer_event *event;
2121         u64 ts, delta;
2122         int nr_loops = 0;
2123         int add_timestamp;
2124         u64 diff;
2125
2126         rb_start_commit(cpu_buffer);
2127
2128 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
2129         /*
2130          * Due to the ability to swap a cpu buffer from a buffer
2131          * it is possible it was swapped before we committed.
2132          * (committing stops a swap). We check for it here and
2133          * if it happened, we have to fail the write.
2134          */
2135         barrier();
2136         if (unlikely(ACCESS_ONCE(cpu_buffer->buffer) != buffer)) {
2137                 local_dec(&cpu_buffer->committing);
2138                 local_dec(&cpu_buffer->commits);
2139                 return NULL;
2140         }
2141 #endif
2142
2143         length = rb_calculate_event_length(length);
2144  again:
2145         add_timestamp = 0;
2146         delta = 0;
2147
2148         /*
2149          * We allow for interrupts to reenter here and do a trace.
2150          * If one does, it will cause this original code to loop
2151          * back here. Even with heavy interrupts happening, this
2152          * should only happen a few times in a row. If this happens
2153          * 1000 times in a row, there must be either an interrupt
2154          * storm or we have something buggy.
2155          * Bail!
2156          */
2157         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
2158                 goto out_fail;
2159
2160         ts = rb_time_stamp(cpu_buffer->buffer);
2161         diff = ts - cpu_buffer->write_stamp;
2162
2163         /* make sure this diff is calculated here */
2164         barrier();
2165
2166         /* Did the write stamp get updated already? */
2167         if (likely(ts >= cpu_buffer->write_stamp)) {
2168                 delta = diff;
2169                 if (unlikely(test_time_stamp(delta))) {
2170                         int local_clock_stable = 1;
2171 #ifdef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK
2172                         local_clock_stable = sched_clock_stable;
2173 #endif
2174                         WARN_ONCE(delta > (1ULL << 59),
2175                                   KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n%s",
2176                                   (unsigned long long)delta,
2177                                   (unsigned long long)ts,
2178                                   (unsigned long long)cpu_buffer->write_stamp,
2179                                   local_clock_stable ? "" :
2180                                   "If you just came from a suspend/resume,\n"
2181                                   "please switch to the trace global clock:\n"
2182                                   "  echo global > /sys/kernel/debug/tracing/trace_clock\n");
2183                         add_timestamp = 1;
2184                 }
2185         }
2186
2187         event = __rb_reserve_next(cpu_buffer, length, ts,
2188                                   delta, add_timestamp);
2189         if (unlikely(PTR_ERR(event) == -EAGAIN))
2190                 goto again;
2191
2192         if (!event)
2193                 goto out_fail;
2194
2195         return event;
2196
2197  out_fail:
2198         rb_end_commit(cpu_buffer);
2199         return NULL;
2200 }
2201
2202 #ifdef CONFIG_TRACING
2203
2204 #define TRACE_RECURSIVE_DEPTH 16
2205
2206 /* Keep this code out of the fast path cache */
2207 static noinline void trace_recursive_fail(void)
2208 {
2209         /* Disable all tracing before we do anything else */
2210         tracing_off_permanent();
2211
2212         printk_once(KERN_WARNING "Tracing recursion: depth[%ld]:"
2213                     "HC[%lu]:SC[%lu]:NMI[%lu]\n",
2214                     trace_recursion_buffer(),
2215                     hardirq_count() >> HARDIRQ_SHIFT,
2216                     softirq_count() >> SOFTIRQ_SHIFT,
2217                     in_nmi());
2218
2219         WARN_ON_ONCE(1);
2220 }
2221
2222 static inline int trace_recursive_lock(void)
2223 {
2224         trace_recursion_inc();
2225
2226         if (likely(trace_recursion_buffer() < TRACE_RECURSIVE_DEPTH))
2227                 return 0;
2228
2229         trace_recursive_fail();
2230
2231         return -1;
2232 }
2233
2234 static inline void trace_recursive_unlock(void)
2235 {
2236         WARN_ON_ONCE(!trace_recursion_buffer());
2237
2238         trace_recursion_dec();
2239 }
2240
2241 #else
2242
2243 #define trace_recursive_lock()          (0)
2244 #define trace_recursive_unlock()        do { } while (0)
2245
2246 #endif
2247
2248 /**
2249  * ring_buffer_lock_reserve - reserve a part of the buffer
2250  * @buffer: the ring buffer to reserve from
2251  * @length: the length of the data to reserve (excluding event header)
2252  *
2253  * Returns a reseverd event on the ring buffer to copy directly to.
2254  * The user of this interface will need to get the body to write into
2255  * and can use the ring_buffer_event_data() interface.
2256  *
2257  * The length is the length of the data needed, not the event length
2258  * which also includes the event header.
2259  *
2260  * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
2261  * If NULL is returned, then nothing has been allocated or locked.
2262  */
2263 struct ring_buffer_event *
2264 ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length)
2265 {
2266         struct ring_buffer_per_cpu *cpu_buffer;
2267         struct ring_buffer_event *event;
2268         int cpu;
2269
2270         if (ring_buffer_flags != RB_BUFFERS_ON)
2271                 return NULL;
2272
2273         /* If we are tracing schedule, we don't want to recurse */
2274         preempt_disable_notrace();
2275
2276         if (atomic_read(&buffer->record_disabled))
2277                 goto out_nocheck;
2278
2279         if (trace_recursive_lock())
2280                 goto out_nocheck;
2281
2282         cpu = raw_smp_processor_id();
2283
2284         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2285                 goto out;
2286
2287         cpu_buffer = buffer->buffers[cpu];
2288
2289         if (atomic_read(&cpu_buffer->record_disabled))
2290                 goto out;
2291
2292         if (length > BUF_MAX_DATA_SIZE)
2293                 goto out;
2294
2295         event = rb_reserve_next_event(buffer, cpu_buffer, length);
2296         if (!event)
2297                 goto out;
2298
2299         return event;
2300
2301  out:
2302         trace_recursive_unlock();
2303
2304  out_nocheck:
2305         preempt_enable_notrace();
2306         return NULL;
2307 }
2308 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
2309
2310 static void
2311 rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer,
2312                       struct ring_buffer_event *event)
2313 {
2314         u64 delta;
2315
2316         /*
2317          * The event first in the commit queue updates the
2318          * time stamp.
2319          */
2320         if (rb_event_is_commit(cpu_buffer, event)) {
2321                 /*
2322                  * A commit event that is first on a page
2323                  * updates the write timestamp with the page stamp
2324                  */
2325                 if (!rb_event_index(event))
2326                         cpu_buffer->write_stamp =
2327                                 cpu_buffer->commit_page->page->time_stamp;
2328                 else if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) {
2329                         delta = event->array[0];
2330                         delta <<= TS_SHIFT;
2331                         delta += event->time_delta;
2332                         cpu_buffer->write_stamp += delta;
2333                 } else
2334                         cpu_buffer->write_stamp += event->time_delta;
2335         }
2336 }
2337
2338 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
2339                       struct ring_buffer_event *event)
2340 {
2341         local_inc(&cpu_buffer->entries);
2342         rb_update_write_stamp(cpu_buffer, event);
2343         rb_end_commit(cpu_buffer);
2344 }
2345
2346 /**
2347  * ring_buffer_unlock_commit - commit a reserved
2348  * @buffer: The buffer to commit to
2349  * @event: The event pointer to commit.
2350  *
2351  * This commits the data to the ring buffer, and releases any locks held.
2352  *
2353  * Must be paired with ring_buffer_lock_reserve.
2354  */
2355 int ring_buffer_unlock_commit(struct ring_buffer *buffer,
2356                               struct ring_buffer_event *event)
2357 {
2358         struct ring_buffer_per_cpu *cpu_buffer;
2359         int cpu = raw_smp_processor_id();
2360
2361         cpu_buffer = buffer->buffers[cpu];
2362
2363         rb_commit(cpu_buffer, event);
2364
2365         trace_recursive_unlock();
2366
2367         preempt_enable_notrace();
2368
2369         return 0;
2370 }
2371 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
2372
2373 static inline void rb_event_discard(struct ring_buffer_event *event)
2374 {
2375         if (event->type_len == RINGBUF_TYPE_TIME_EXTEND)
2376                 event = skip_time_extend(event);
2377
2378         /* array[0] holds the actual length for the discarded event */
2379         event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE;
2380         event->type_len = RINGBUF_TYPE_PADDING;
2381         /* time delta must be non zero */
2382         if (!event->time_delta)
2383                 event->time_delta = 1;
2384 }
2385
2386 /*
2387  * Decrement the entries to the page that an event is on.
2388  * The event does not even need to exist, only the pointer
2389  * to the page it is on. This may only be called before the commit
2390  * takes place.
2391  */
2392 static inline void
2393 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer,
2394                    struct ring_buffer_event *event)
2395 {
2396         unsigned long addr = (unsigned long)event;
2397         struct buffer_page *bpage = cpu_buffer->commit_page;
2398         struct buffer_page *start;
2399
2400         addr &= PAGE_MASK;
2401
2402         /* Do the likely case first */
2403         if (likely(bpage->page == (void *)addr)) {
2404                 local_dec(&bpage->entries);
2405                 return;
2406         }
2407
2408         /*
2409          * Because the commit page may be on the reader page we
2410          * start with the next page and check the end loop there.
2411          */
2412         rb_inc_page(cpu_buffer, &bpage);
2413         start = bpage;
2414         do {
2415                 if (bpage->page == (void *)addr) {
2416                         local_dec(&bpage->entries);
2417                         return;
2418                 }
2419                 rb_inc_page(cpu_buffer, &bpage);
2420         } while (bpage != start);
2421
2422         /* commit not part of this buffer?? */
2423         RB_WARN_ON(cpu_buffer, 1);
2424 }
2425
2426 /**
2427  * ring_buffer_commit_discard - discard an event that has not been committed
2428  * @buffer: the ring buffer
2429  * @event: non committed event to discard
2430  *
2431  * Sometimes an event that is in the ring buffer needs to be ignored.
2432  * This function lets the user discard an event in the ring buffer
2433  * and then that event will not be read later.
2434  *
2435  * This function only works if it is called before the the item has been
2436  * committed. It will try to free the event from the ring buffer
2437  * if another event has not been added behind it.
2438  *
2439  * If another event has been added behind it, it will set the event
2440  * up as discarded, and perform the commit.
2441  *
2442  * If this function is called, do not call ring_buffer_unlock_commit on
2443  * the event.
2444  */
2445 void ring_buffer_discard_commit(struct ring_buffer *buffer,
2446                                 struct ring_buffer_event *event)
2447 {
2448         struct ring_buffer_per_cpu *cpu_buffer;
2449         int cpu;
2450
2451         /* The event is discarded regardless */
2452         rb_event_discard(event);
2453
2454         cpu = smp_processor_id();
2455         cpu_buffer = buffer->buffers[cpu];
2456
2457         /*
2458          * This must only be called if the event has not been
2459          * committed yet. Thus we can assume that preemption
2460          * is still disabled.
2461          */
2462         RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing));
2463
2464         rb_decrement_entry(cpu_buffer, event);
2465         if (rb_try_to_discard(cpu_buffer, event))
2466                 goto out;
2467
2468         /*
2469          * The commit is still visible by the reader, so we
2470          * must still update the timestamp.
2471          */
2472         rb_update_write_stamp(cpu_buffer, event);
2473  out:
2474         rb_end_commit(cpu_buffer);
2475
2476         trace_recursive_unlock();
2477
2478         preempt_enable_notrace();
2479
2480 }
2481 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit);
2482
2483 /**
2484  * ring_buffer_write - write data to the buffer without reserving
2485  * @buffer: The ring buffer to write to.
2486  * @length: The length of the data being written (excluding the event header)
2487  * @data: The data to write to the buffer.
2488  *
2489  * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
2490  * one function. If you already have the data to write to the buffer, it
2491  * may be easier to simply call this function.
2492  *
2493  * Note, like ring_buffer_lock_reserve, the length is the length of the data
2494  * and not the length of the event which would hold the header.
2495  */
2496 int ring_buffer_write(struct ring_buffer *buffer,
2497                         unsigned long length,
2498                         void *data)
2499 {
2500         struct ring_buffer_per_cpu *cpu_buffer;
2501         struct ring_buffer_event *event;
2502         void *body;
2503         int ret = -EBUSY;
2504         int cpu;
2505
2506         if (ring_buffer_flags != RB_BUFFERS_ON)
2507                 return -EBUSY;
2508
2509         preempt_disable_notrace();
2510
2511         if (atomic_read(&buffer->record_disabled))
2512                 goto out;
2513
2514         cpu = raw_smp_processor_id();
2515
2516         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2517                 goto out;
2518
2519         cpu_buffer = buffer->buffers[cpu];
2520
2521         if (atomic_read(&cpu_buffer->record_disabled))
2522                 goto out;
2523
2524         if (length > BUF_MAX_DATA_SIZE)
2525                 goto out;
2526
2527         event = rb_reserve_next_event(buffer, cpu_buffer, length);
2528         if (!event)
2529                 goto out;
2530
2531         body = rb_event_data(event);
2532
2533         memcpy(body, data, length);
2534
2535         rb_commit(cpu_buffer, event);
2536
2537         ret = 0;
2538  out:
2539         preempt_enable_notrace();
2540
2541         return ret;
2542 }
2543 EXPORT_SYMBOL_GPL(ring_buffer_write);
2544
2545 static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
2546 {
2547         struct buffer_page *reader = cpu_buffer->reader_page;
2548         struct buffer_page *head = rb_set_head_page(cpu_buffer);
2549         struct buffer_page *commit = cpu_buffer->commit_page;
2550
2551         /* In case of error, head will be NULL */
2552         if (unlikely(!head))
2553                 return 1;
2554
2555         return reader->read == rb_page_commit(reader) &&
2556                 (commit == reader ||
2557                  (commit == head &&
2558                   head->read == rb_page_commit(commit)));
2559 }
2560
2561 /**
2562  * ring_buffer_record_disable - stop all writes into the buffer
2563  * @buffer: The ring buffer to stop writes to.
2564  *
2565  * This prevents all writes to the buffer. Any attempt to write
2566  * to the buffer after this will fail and return NULL.
2567  *
2568  * The caller should call synchronize_sched() after this.
2569  */
2570 void ring_buffer_record_disable(struct ring_buffer *buffer)
2571 {
2572         atomic_inc(&buffer->record_disabled);
2573 }
2574 EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
2575
2576 /**
2577  * ring_buffer_record_enable - enable writes to the buffer
2578  * @buffer: The ring buffer to enable writes
2579  *
2580  * Note, multiple disables will need the same number of enables
2581  * to truly enable the writing (much like preempt_disable).
2582  */
2583 void ring_buffer_record_enable(struct ring_buffer *buffer)
2584 {
2585         atomic_dec(&buffer->record_disabled);
2586 }
2587 EXPORT_SYMBOL_GPL(ring_buffer_record_enable);
2588
2589 /**
2590  * ring_buffer_record_off - stop all writes into the buffer
2591  * @buffer: The ring buffer to stop writes to.
2592  *
2593  * This prevents all writes to the buffer. Any attempt to write
2594  * to the buffer after this will fail and return NULL.
2595  *
2596  * This is different than ring_buffer_record_disable() as
2597  * it works like an on/off switch, where as the disable() verison
2598  * must be paired with a enable().
2599  */
2600 void ring_buffer_record_off(struct ring_buffer *buffer)
2601 {
2602         unsigned int rd;
2603         unsigned int new_rd;
2604
2605         do {
2606                 rd = atomic_read(&buffer->record_disabled);
2607                 new_rd = rd | RB_BUFFER_OFF;
2608         } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd);
2609 }
2610 EXPORT_SYMBOL_GPL(ring_buffer_record_off);
2611
2612 /**
2613  * ring_buffer_record_on - restart writes into the buffer
2614  * @buffer: The ring buffer to start writes to.
2615  *
2616  * This enables all writes to the buffer that was disabled by
2617  * ring_buffer_record_off().
2618  *
2619  * This is different than ring_buffer_record_enable() as
2620  * it works like an on/off switch, where as the enable() verison
2621  * must be paired with a disable().
2622  */
2623 void ring_buffer_record_on(struct ring_buffer *buffer)
2624 {
2625         unsigned int rd;
2626         unsigned int new_rd;
2627
2628         do {
2629                 rd = atomic_read(&buffer->record_disabled);
2630                 new_rd = rd & ~RB_BUFFER_OFF;
2631         } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd);
2632 }
2633 EXPORT_SYMBOL_GPL(ring_buffer_record_on);
2634
2635 /**
2636  * ring_buffer_record_is_on - return true if the ring buffer can write
2637  * @buffer: The ring buffer to see if write is enabled
2638  *
2639  * Returns true if the ring buffer is in a state that it accepts writes.
2640  */
2641 int ring_buffer_record_is_on(struct ring_buffer *buffer)
2642 {
2643         return !atomic_read(&buffer->record_disabled);
2644 }
2645
2646 /**
2647  * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
2648  * @buffer: The ring buffer to stop writes to.
2649  * @cpu: The CPU buffer to stop
2650  *
2651  * This prevents all writes to the buffer. Any attempt to write
2652  * to the buffer after this will fail and return NULL.
2653  *
2654  * The caller should call synchronize_sched() after this.
2655  */
2656 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
2657 {
2658         struct ring_buffer_per_cpu *cpu_buffer;
2659
2660         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2661                 return;
2662
2663         cpu_buffer = buffer->buffers[cpu];
2664         atomic_inc(&cpu_buffer->record_disabled);
2665 }
2666 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
2667
2668 /**
2669  * ring_buffer_record_enable_cpu - enable writes to the buffer
2670  * @buffer: The ring buffer to enable writes
2671  * @cpu: The CPU to enable.
2672  *
2673  * Note, multiple disables will need the same number of enables
2674  * to truly enable the writing (much like preempt_disable).
2675  */
2676 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
2677 {
2678         struct ring_buffer_per_cpu *cpu_buffer;
2679
2680         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2681                 return;
2682
2683         cpu_buffer = buffer->buffers[cpu];
2684         atomic_dec(&cpu_buffer->record_disabled);
2685 }
2686 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu);
2687
2688 /*
2689  * The total entries in the ring buffer is the running counter
2690  * of entries entered into the ring buffer, minus the sum of
2691  * the entries read from the ring buffer and the number of
2692  * entries that were overwritten.
2693  */
2694 static inline unsigned long
2695 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer)
2696 {
2697         return local_read(&cpu_buffer->entries) -
2698                 (local_read(&cpu_buffer->overrun) + cpu_buffer->read);
2699 }
2700
2701 /**
2702  * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer
2703  * @buffer: The ring buffer
2704  * @cpu: The per CPU buffer to read from.
2705  */
2706 unsigned long ring_buffer_oldest_event_ts(struct ring_buffer *buffer, int cpu)
2707 {
2708         unsigned long flags;
2709         struct ring_buffer_per_cpu *cpu_buffer;
2710         struct buffer_page *bpage;
2711         unsigned long ret = 0;
2712
2713         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2714                 return 0;
2715
2716         cpu_buffer = buffer->buffers[cpu];
2717         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2718         /*
2719          * if the tail is on reader_page, oldest time stamp is on the reader
2720          * page
2721          */
2722         if (cpu_buffer->tail_page == cpu_buffer->reader_page)
2723                 bpage = cpu_buffer->reader_page;
2724         else
2725                 bpage = rb_set_head_page(cpu_buffer);
2726         if (bpage)
2727                 ret = bpage->page->time_stamp;
2728         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2729
2730         return ret;
2731 }
2732 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts);
2733
2734 /**
2735  * ring_buffer_bytes_cpu - get the number of bytes consumed in a cpu buffer
2736  * @buffer: The ring buffer
2737  * @cpu: The per CPU buffer to read from.
2738  */
2739 unsigned long ring_buffer_bytes_cpu(struct ring_buffer *buffer, int cpu)
2740 {
2741         struct ring_buffer_per_cpu *cpu_buffer;
2742         unsigned long ret;
2743
2744         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2745                 return 0;
2746
2747         cpu_buffer = buffer->buffers[cpu];
2748         ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes;
2749
2750         return ret;
2751 }
2752 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu);
2753
2754 /**
2755  * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
2756  * @buffer: The ring buffer
2757  * @cpu: The per CPU buffer to get the entries from.
2758  */
2759 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
2760 {
2761         struct ring_buffer_per_cpu *cpu_buffer;
2762
2763         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2764                 return 0;
2765
2766         cpu_buffer = buffer->buffers[cpu];
2767
2768         return rb_num_of_entries(cpu_buffer);
2769 }
2770 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu);
2771
2772 /**
2773  * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
2774  * @buffer: The ring buffer
2775  * @cpu: The per CPU buffer to get the number of overruns from
2776  */
2777 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
2778 {
2779         struct ring_buffer_per_cpu *cpu_buffer;
2780         unsigned long ret;
2781
2782         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2783                 return 0;
2784
2785         cpu_buffer = buffer->buffers[cpu];
2786         ret = local_read(&cpu_buffer->overrun);
2787
2788         return ret;
2789 }
2790 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
2791
2792 /**
2793  * ring_buffer_commit_overrun_cpu - get the number of overruns caused by commits
2794  * @buffer: The ring buffer
2795  * @cpu: The per CPU buffer to get the number of overruns from
2796  */
2797 unsigned long
2798 ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu)
2799 {
2800         struct ring_buffer_per_cpu *cpu_buffer;
2801         unsigned long ret;
2802
2803         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2804                 return 0;
2805
2806         cpu_buffer = buffer->buffers[cpu];
2807         ret = local_read(&cpu_buffer->commit_overrun);
2808
2809         return ret;
2810 }
2811 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu);
2812
2813 /**
2814  * ring_buffer_entries - get the number of entries in a buffer
2815  * @buffer: The ring buffer
2816  *
2817  * Returns the total number of entries in the ring buffer
2818  * (all CPU entries)
2819  */
2820 unsigned long ring_buffer_entries(struct ring_buffer *buffer)
2821 {
2822         struct ring_buffer_per_cpu *cpu_buffer;
2823         unsigned long entries = 0;
2824         int cpu;
2825
2826         /* if you care about this being correct, lock the buffer */
2827         for_each_buffer_cpu(buffer, cpu) {
2828                 cpu_buffer = buffer->buffers[cpu];
2829                 entries += rb_num_of_entries(cpu_buffer);
2830         }
2831
2832         return entries;
2833 }
2834 EXPORT_SYMBOL_GPL(ring_buffer_entries);
2835
2836 /**
2837  * ring_buffer_overruns - get the number of overruns in buffer
2838  * @buffer: The ring buffer
2839  *
2840  * Returns the total number of overruns in the ring buffer
2841  * (all CPU entries)
2842  */
2843 unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
2844 {
2845         struct ring_buffer_per_cpu *cpu_buffer;
2846         unsigned long overruns = 0;
2847         int cpu;
2848
2849         /* if you care about this being correct, lock the buffer */
2850         for_each_buffer_cpu(buffer, cpu) {
2851                 cpu_buffer = buffer->buffers[cpu];
2852                 overruns += local_read(&cpu_buffer->overrun);
2853         }
2854
2855         return overruns;
2856 }
2857 EXPORT_SYMBOL_GPL(ring_buffer_overruns);
2858
2859 static void rb_iter_reset(struct ring_buffer_iter *iter)
2860 {
2861         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2862
2863         /* Iterator usage is expected to have record disabled */
2864         if (list_empty(&cpu_buffer->reader_page->list)) {
2865                 iter->head_page = rb_set_head_page(cpu_buffer);
2866                 if (unlikely(!iter->head_page))
2867                         return;
2868                 iter->head = iter->head_page->read;
2869         } else {
2870                 iter->head_page = cpu_buffer->reader_page;
2871                 iter->head = cpu_buffer->reader_page->read;
2872         }
2873         if (iter->head)
2874                 iter->read_stamp = cpu_buffer->read_stamp;
2875         else
2876                 iter->read_stamp = iter->head_page->page->time_stamp;
2877         iter->cache_reader_page = cpu_buffer->reader_page;
2878         iter->cache_read = cpu_buffer->read;
2879 }
2880
2881 /**
2882  * ring_buffer_iter_reset - reset an iterator
2883  * @iter: The iterator to reset
2884  *
2885  * Resets the iterator, so that it will start from the beginning
2886  * again.
2887  */
2888 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
2889 {
2890         struct ring_buffer_per_cpu *cpu_buffer;
2891         unsigned long flags;
2892
2893         if (!iter)
2894                 return;
2895
2896         cpu_buffer = iter->cpu_buffer;
2897
2898         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2899         rb_iter_reset(iter);
2900         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2901 }
2902 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset);
2903
2904 /**
2905  * ring_buffer_iter_empty - check if an iterator has no more to read
2906  * @iter: The iterator to check
2907  */
2908 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
2909 {
2910         struct ring_buffer_per_cpu *cpu_buffer;
2911
2912         cpu_buffer = iter->cpu_buffer;
2913
2914         return iter->head_page == cpu_buffer->commit_page &&
2915                 iter->head == rb_commit_index(cpu_buffer);
2916 }
2917 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty);
2918
2919 static void
2920 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
2921                      struct ring_buffer_event *event)
2922 {
2923         u64 delta;
2924
2925         switch (event->type_len) {
2926         case RINGBUF_TYPE_PADDING:
2927                 return;
2928
2929         case RINGBUF_TYPE_TIME_EXTEND:
2930                 delta = event->array[0];
2931                 delta <<= TS_SHIFT;
2932                 delta += event->time_delta;
2933                 cpu_buffer->read_stamp += delta;
2934                 return;
2935
2936         case RINGBUF_TYPE_TIME_STAMP:
2937                 /* FIXME: not implemented */
2938                 return;
2939
2940         case RINGBUF_TYPE_DATA:
2941                 cpu_buffer->read_stamp += event->time_delta;
2942                 return;
2943
2944         default:
2945                 BUG();
2946         }
2947         return;
2948 }
2949
2950 static void
2951 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
2952                           struct ring_buffer_event *event)
2953 {
2954         u64 delta;
2955
2956         switch (event->type_len) {
2957         case RINGBUF_TYPE_PADDING:
2958                 return;
2959
2960         case RINGBUF_TYPE_TIME_EXTEND:
2961                 delta = event->array[0];
2962                 delta <<= TS_SHIFT;
2963                 delta += event->time_delta;
2964                 iter->read_stamp += delta;
2965                 return;
2966
2967         case RINGBUF_TYPE_TIME_STAMP:
2968                 /* FIXME: not implemented */
2969                 return;
2970
2971         case RINGBUF_TYPE_DATA:
2972                 iter->read_stamp += event->time_delta;
2973                 return;
2974
2975         default:
2976                 BUG();
2977         }
2978         return;
2979 }
2980
2981 static struct buffer_page *
2982 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
2983 {
2984         struct buffer_page *reader = NULL;
2985         unsigned long overwrite;
2986         unsigned long flags;
2987         int nr_loops = 0;
2988         int ret;
2989
2990         local_irq_save(flags);
2991         arch_spin_lock(&cpu_buffer->lock);
2992
2993  again:
2994         /*
2995          * This should normally only loop twice. But because the
2996          * start of the reader inserts an empty page, it causes
2997          * a case where we will loop three times. There should be no
2998          * reason to loop four times (that I know of).
2999          */
3000         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
3001                 reader = NULL;
3002                 goto out;
3003         }
3004
3005         reader = cpu_buffer->reader_page;
3006
3007         /* If there's more to read, return this page */
3008         if (cpu_buffer->reader_page->read < rb_page_size(reader))
3009                 goto out;
3010
3011         /* Never should we have an index greater than the size */
3012         if (RB_WARN_ON(cpu_buffer,
3013                        cpu_buffer->reader_page->read > rb_page_size(reader)))
3014                 goto out;
3015
3016         /* check if we caught up to the tail */
3017         reader = NULL;
3018         if (cpu_buffer->commit_page == cpu_buffer->reader_page)
3019                 goto out;
3020
3021         /*
3022          * Reset the reader page to size zero.
3023          */
3024         local_set(&cpu_buffer->reader_page->write, 0);
3025         local_set(&cpu_buffer->reader_page->entries, 0);
3026         local_set(&cpu_buffer->reader_page->page->commit, 0);
3027         cpu_buffer->reader_page->real_end = 0;
3028
3029  spin:
3030         /*
3031          * Splice the empty reader page into the list around the head.
3032          */
3033         reader = rb_set_head_page(cpu_buffer);
3034         if (!reader)
3035                 goto out;
3036         cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next);
3037         cpu_buffer->reader_page->list.prev = reader->list.prev;
3038
3039         /*
3040          * cpu_buffer->pages just needs to point to the buffer, it
3041          *  has no specific buffer page to point to. Lets move it out
3042          *  of our way so we don't accidentally swap it.
3043          */
3044         cpu_buffer->pages = reader->list.prev;
3045
3046         /* The reader page will be pointing to the new head */
3047         rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list);
3048
3049         /*
3050          * We want to make sure we read the overruns after we set up our
3051          * pointers to the next object. The writer side does a
3052          * cmpxchg to cross pages which acts as the mb on the writer
3053          * side. Note, the reader will constantly fail the swap
3054          * while the writer is updating the pointers, so this
3055          * guarantees that the overwrite recorded here is the one we
3056          * want to compare with the last_overrun.
3057          */
3058         smp_mb();
3059         overwrite = local_read(&(cpu_buffer->overrun));
3060
3061         /*
3062          * Here's the tricky part.
3063          *
3064          * We need to move the pointer past the header page.
3065          * But we can only do that if a writer is not currently
3066          * moving it. The page before the header page has the
3067          * flag bit '1' set if it is pointing to the page we want.
3068          * but if the writer is in the process of moving it
3069          * than it will be '2' or already moved '0'.
3070          */
3071
3072         ret = rb_head_page_replace(reader, cpu_buffer->reader_page);
3073
3074         /*
3075          * If we did not convert it, then we must try again.
3076          */
3077         if (!ret)
3078                 goto spin;
3079
3080         /*
3081          * Yeah! We succeeded in replacing the page.
3082          *
3083          * Now make the new head point back to the reader page.
3084          */
3085         rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list;
3086         rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
3087
3088         /* Finally update the reader page to the new head */
3089         cpu_buffer->reader_page = reader;
3090         rb_reset_reader_page(cpu_buffer);
3091
3092         if (overwrite != cpu_buffer->last_overrun) {
3093                 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun;
3094                 cpu_buffer->last_overrun = overwrite;
3095         }
3096
3097         goto again;
3098
3099  out:
3100         arch_spin_unlock(&cpu_buffer->lock);
3101         local_irq_restore(flags);
3102
3103         return reader;
3104 }
3105
3106 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
3107 {
3108         struct ring_buffer_event *event;
3109         struct buffer_page *reader;
3110         unsigned length;
3111
3112         reader = rb_get_reader_page(cpu_buffer);
3113
3114         /* This function should not be called when buffer is empty */
3115         if (RB_WARN_ON(cpu_buffer, !reader))
3116                 return;
3117
3118         event = rb_reader_event(cpu_buffer);
3119
3120         if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
3121                 cpu_buffer->read++;
3122
3123         rb_update_read_stamp(cpu_buffer, event);
3124
3125         length = rb_event_length(event);
3126         cpu_buffer->reader_page->read += length;
3127 }
3128
3129 static void rb_advance_iter(struct ring_buffer_iter *iter)
3130 {
3131         struct ring_buffer_per_cpu *cpu_buffer;
3132         struct ring_buffer_event *event;
3133         unsigned length;
3134
3135         cpu_buffer = iter->cpu_buffer;
3136
3137         /*
3138          * Check if we are at the end of the buffer.
3139          */
3140         if (iter->head >= rb_page_size(iter->head_page)) {
3141                 /* discarded commits can make the page empty */
3142                 if (iter->head_page == cpu_buffer->commit_page)
3143                         return;
3144                 rb_inc_iter(iter);
3145                 return;
3146         }
3147
3148         event = rb_iter_head_event(iter);
3149
3150         length = rb_event_length(event);
3151
3152         /*
3153          * This should not be called to advance the header if we are
3154          * at the tail of the buffer.
3155          */
3156         if (RB_WARN_ON(cpu_buffer,
3157                        (iter->head_page == cpu_buffer->commit_page) &&
3158                        (iter->head + length > rb_commit_index(cpu_buffer))))
3159                 return;
3160
3161         rb_update_iter_read_stamp(iter, event);
3162
3163         iter->head += length;
3164
3165         /* check for end of page padding */
3166         if ((iter->head >= rb_page_size(iter->head_page)) &&
3167             (iter->head_page != cpu_buffer->commit_page))
3168                 rb_advance_iter(iter);
3169 }
3170
3171 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer)
3172 {
3173         return cpu_buffer->lost_events;
3174 }
3175
3176 static struct ring_buffer_event *
3177 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts,
3178                unsigned long *lost_events)
3179 {
3180         struct ring_buffer_event *event;
3181         struct buffer_page *reader;
3182         int nr_loops = 0;
3183
3184  again:
3185         /*
3186          * We repeat when a time extend is encountered.
3187          * Since the time extend is always attached to a data event,
3188          * we should never loop more than once.
3189          * (We never hit the following condition more than twice).
3190          */
3191         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2))
3192                 return NULL;
3193
3194         reader = rb_get_reader_page(cpu_buffer);
3195         if (!reader)
3196                 return NULL;
3197
3198         event = rb_reader_event(cpu_buffer);
3199
3200         switch (event->type_len) {
3201         case RINGBUF_TYPE_PADDING:
3202                 if (rb_null_event(event))
3203                         RB_WARN_ON(cpu_buffer, 1);
3204                 /*
3205                  * Because the writer could be discarding every
3206                  * event it creates (which would probably be bad)
3207                  * if we were to go back to "again" then we may never
3208                  * catch up, and will trigger the warn on, or lock
3209                  * the box. Return the padding, and we will release
3210                  * the current locks, and try again.
3211                  */
3212                 return event;
3213
3214         case RINGBUF_TYPE_TIME_EXTEND:
3215                 /* Internal data, OK to advance */
3216                 rb_advance_reader(cpu_buffer);
3217                 goto again;
3218
3219         case RINGBUF_TYPE_TIME_STAMP:
3220                 /* FIXME: not implemented */
3221                 rb_advance_reader(cpu_buffer);
3222                 goto again;
3223
3224         case RINGBUF_TYPE_DATA:
3225                 if (ts) {
3226                         *ts = cpu_buffer->read_stamp + event->time_delta;
3227                         ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
3228                                                          cpu_buffer->cpu, ts);
3229                 }
3230                 if (lost_events)
3231                         *lost_events = rb_lost_events(cpu_buffer);
3232                 return event;
3233
3234         default:
3235                 BUG();
3236         }
3237
3238         return NULL;
3239 }
3240 EXPORT_SYMBOL_GPL(ring_buffer_peek);
3241
3242 static struct ring_buffer_event *
3243 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
3244 {
3245         struct ring_buffer *buffer;
3246         struct ring_buffer_per_cpu *cpu_buffer;
3247         struct ring_buffer_event *event;
3248         int nr_loops = 0;
3249
3250         cpu_buffer = iter->cpu_buffer;
3251         buffer = cpu_buffer->buffer;
3252
3253         /*
3254          * Check if someone performed a consuming read to
3255          * the buffer. A consuming read invalidates the iterator
3256          * and we need to reset the iterator in this case.
3257          */
3258         if (unlikely(iter->cache_read != cpu_buffer->read ||
3259                      iter->cache_reader_page != cpu_buffer->reader_page))
3260                 rb_iter_reset(iter);
3261
3262  again:
3263         if (ring_buffer_iter_empty(iter))
3264                 return NULL;
3265
3266         /*
3267          * We repeat when a time extend is encountered.
3268          * Since the time extend is always attached to a data event,
3269          * we should never loop more than once.
3270          * (We never hit the following condition more than twice).
3271          */
3272         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2))
3273                 return NULL;
3274
3275         if (rb_per_cpu_empty(cpu_buffer))
3276                 return NULL;
3277
3278         if (iter->head >= local_read(&iter->head_page->page->commit)) {
3279                 rb_inc_iter(iter);
3280                 goto again;
3281         }
3282
3283         event = rb_iter_head_event(iter);
3284
3285         switch (event->type_len) {
3286         case RINGBUF_TYPE_PADDING:
3287                 if (rb_null_event(event)) {
3288                         rb_inc_iter(iter);
3289                         goto again;
3290                 }
3291                 rb_advance_iter(iter);
3292                 return event;
3293
3294         case RINGBUF_TYPE_TIME_EXTEND:
3295                 /* Internal data, OK to advance */
3296                 rb_advance_iter(iter);
3297                 goto again;
3298
3299         case RINGBUF_TYPE_TIME_STAMP:
3300                 /* FIXME: not implemented */
3301                 rb_advance_iter(iter);
3302                 goto again;
3303
3304         case RINGBUF_TYPE_DATA:
3305                 if (ts) {
3306                         *ts = iter->read_stamp + event->time_delta;
3307                         ring_buffer_normalize_time_stamp(buffer,
3308                                                          cpu_buffer->cpu, ts);
3309                 }
3310                 return event;
3311
3312         default:
3313                 BUG();
3314         }
3315
3316         return NULL;
3317 }
3318 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
3319
3320 static inline int rb_ok_to_lock(void)
3321 {
3322         /*
3323          * If an NMI die dumps out the content of the ring buffer
3324          * do not grab locks. We also permanently disable the ring
3325          * buffer too. A one time deal is all you get from reading
3326          * the ring buffer from an NMI.
3327          */
3328         if (likely(!in_nmi()))
3329                 return 1;
3330
3331         tracing_off_permanent();
3332         return 0;
3333 }
3334
3335 /**
3336  * ring_buffer_peek - peek at the next event to be read
3337  * @buffer: The ring buffer to read
3338  * @cpu: The cpu to peak at
3339  * @ts: The timestamp counter of this event.
3340  * @lost_events: a variable to store if events were lost (may be NULL)
3341  *
3342  * This will return the event that will be read next, but does
3343  * not consume the data.
3344  */
3345 struct ring_buffer_event *
3346 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts,
3347                  unsigned long *lost_events)
3348 {
3349         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
3350         struct ring_buffer_event *event;
3351         unsigned long flags;
3352         int dolock;
3353
3354         if (!cpumask_test_cpu(cpu, buffer->cpumask))
3355                 return NULL;
3356
3357         dolock = rb_ok_to_lock();
3358  again:
3359         local_irq_save(flags);
3360         if (dolock)
3361                 raw_spin_lock(&cpu_buffer->reader_lock);
3362         event = rb_buffer_peek(cpu_buffer, ts, lost_events);
3363         if (event && event->type_len == RINGBUF_TYPE_PADDING)
3364                 rb_advance_reader(cpu_buffer);
3365         if (dolock)
3366                 raw_spin_unlock(&cpu_buffer->reader_lock);
3367         local_irq_restore(flags);
3368
3369         if (event && event->type_len == RINGBUF_TYPE_PADDING)
3370                 goto again;
3371
3372         return event;
3373 }
3374
3375 /**
3376  * ring_buffer_iter_peek - peek at the next event to be read
3377  * @iter: The ring buffer iterator
3378  * @ts: The timestamp counter of this event.
3379  *
3380  * This will return the event that will be read next, but does
3381  * not increment the iterator.
3382  */
3383 struct ring_buffer_event *
3384 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
3385 {
3386         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
3387         struct ring_buffer_event *event;
3388         unsigned long flags;
3389
3390  again:
3391         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3392         event = rb_iter_peek(iter, ts);
3393         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3394
3395         if (event && event->type_len == RINGBUF_TYPE_PADDING)
3396                 goto again;
3397
3398         return event;
3399 }
3400
3401 /**
3402  * ring_buffer_consume - return an event and consume it
3403  * @buffer: The ring buffer to get the next event from
3404  * @cpu: the cpu to read the buffer from
3405  * @ts: a variable to store the timestamp (may be NULL)
3406  * @lost_events: a variable to store if events were lost (may be NULL)
3407  *
3408  * Returns the next event in the ring buffer, and that event is consumed.
3409  * Meaning, that sequential reads will keep returning a different event,
3410  * and eventually empty the ring buffer if the producer is slower.
3411  */
3412 struct ring_buffer_event *
3413 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts,
3414                     unsigned long *lost_events)
3415 {
3416         struct ring_buffer_per_cpu *cpu_buffer;
3417         struct ring_buffer_event *event = NULL;
3418         unsigned long flags;
3419         int dolock;
3420
3421         dolock = rb_ok_to_lock();
3422
3423  again:
3424         /* might be called in atomic */
3425         preempt_disable();
3426
3427         if (!cpumask_test_cpu(cpu, buffer->cpumask))
3428                 goto out;
3429
3430         cpu_buffer = buffer->buffers[cpu];
3431         local_irq_save(flags);
3432         if (dolock)
3433                 raw_spin_lock(&cpu_buffer->reader_lock);
3434
3435         event = rb_buffer_peek(cpu_buffer, ts, lost_events);
3436         if (event) {
3437                 cpu_buffer->lost_events = 0;
3438                 rb_advance_reader(cpu_buffer);
3439         }
3440
3441         if (dolock)
3442                 raw_spin_unlock(&cpu_buffer->reader_lock);
3443         local_irq_restore(flags);
3444
3445  out:
3446         preempt_enable();
3447
3448         if (event && event->type_len == RINGBUF_TYPE_PADDING)
3449                 goto again;
3450
3451         return event;
3452 }
3453 EXPORT_SYMBOL_GPL(ring_buffer_consume);
3454
3455 /**
3456  * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer
3457  * @buffer: The ring buffer to read from
3458  * @cpu: The cpu buffer to iterate over
3459  *
3460  * This performs the initial preparations necessary to iterate
3461  * through the buffer.  Memory is allocated, buffer recording
3462  * is disabled, and the iterator pointer is returned to the caller.
3463  *
3464  * Disabling buffer recordng prevents the reading from being
3465  * corrupted. This is not a consuming read, so a producer is not
3466  * expected.
3467  *
3468  * After a sequence of ring_buffer_read_prepare calls, the user is
3469  * expected to make at least one call to ring_buffer_prepare_sync.
3470  * Afterwards, ring_buffer_read_start is invoked to get things going
3471  * for real.
3472  *
3473  * This overall must be paired with ring_buffer_finish.
3474  */
3475 struct ring_buffer_iter *
3476 ring_buffer_read_prepare(struct ring_buffer *buffer, int cpu)
3477 {
3478         struct ring_buffer_per_cpu *cpu_buffer;
3479         struct ring_buffer_iter *iter;
3480
3481         if (!cpumask_test_cpu(cpu, buffer->cpumask))
3482                 return NULL;
3483
3484         iter = kmalloc(sizeof(*iter), GFP_KERNEL);
3485         if (!iter)
3486                 return NULL;
3487
3488         cpu_buffer = buffer->buffers[cpu];
3489
3490         iter->cpu_buffer = cpu_buffer;
3491
3492         atomic_inc(&cpu_buffer->record_disabled);
3493
3494         return iter;
3495 }
3496 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare);
3497
3498 /**
3499  * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls
3500  *
3501  * All previously invoked ring_buffer_read_prepare calls to prepare
3502  * iterators will be synchronized.  Afterwards, read_buffer_read_start
3503  * calls on those iterators are allowed.
3504  */
3505 void
3506 ring_buffer_read_prepare_sync(void)
3507 {
3508         synchronize_sched();
3509 }
3510 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync);
3511
3512 /**
3513  * ring_buffer_read_start - start a non consuming read of the buffer
3514  * @iter: The iterator returned by ring_buffer_read_prepare
3515  *
3516  * This finalizes the startup of an iteration through the buffer.
3517  * The iterator comes from a call to ring_buffer_read_prepare and
3518  * an intervening ring_buffer_read_prepare_sync must have been
3519  * performed.
3520  *
3521  * Must be paired with ring_buffer_finish.
3522  */
3523 void
3524 ring_buffer_read_start(struct ring_buffer_iter *iter)
3525 {
3526         struct ring_buffer_per_cpu *cpu_buffer;
3527         unsigned long flags;
3528
3529         if (!iter)
3530                 return;
3531
3532         cpu_buffer = iter->cpu_buffer;
3533
3534         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3535         arch_spin_lock(&cpu_buffer->lock);
3536         rb_iter_reset(iter);
3537         arch_spin_unlock(&cpu_buffer->lock);
3538         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3539 }
3540 EXPORT_SYMBOL_GPL(ring_buffer_read_start);
3541
3542 /**
3543  * ring_buffer_finish - finish reading the iterator of the buffer
3544  * @iter: The iterator retrieved by ring_buffer_start
3545  *
3546  * This re-enables the recording to the buffer, and frees the
3547  * iterator.
3548  */
3549 void
3550 ring_buffer_read_finish(struct ring_buffer_iter *iter)
3551 {
3552         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
3553
3554         atomic_dec(&cpu_buffer->record_disabled);
3555         kfree(iter);
3556 }
3557 EXPORT_SYMBOL_GPL(ring_buffer_read_finish);
3558
3559 /**
3560  * ring_buffer_read - read the next item in the ring buffer by the iterator
3561  * @iter: The ring buffer iterator
3562  * @ts: The time stamp of the event read.
3563  *
3564  * This reads the next event in the ring buffer and increments the iterator.
3565  */
3566 struct ring_buffer_event *
3567 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
3568 {
3569         struct ring_buffer_event *event;
3570         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
3571         unsigned long flags;
3572
3573         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3574  again:
3575         event = rb_iter_peek(iter, ts);
3576         if (!event)
3577                 goto out;
3578
3579         if (event->type_len == RINGBUF_TYPE_PADDING)
3580                 goto again;
3581
3582         rb_advance_iter(iter);
3583  out:
3584         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3585
3586         return event;
3587 }
3588 EXPORT_SYMBOL_GPL(ring_buffer_read);
3589
3590 /**
3591  * ring_buffer_size - return the size of the ring buffer (in bytes)
3592  * @buffer: The ring buffer.
3593  */
3594 unsigned long ring_buffer_size(struct ring_buffer *buffer)
3595 {
3596         return BUF_PAGE_SIZE * buffer->pages;
3597 }
3598 EXPORT_SYMBOL_GPL(ring_buffer_size);
3599
3600 static void
3601 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
3602 {
3603         rb_head_page_deactivate(cpu_buffer);
3604
3605         cpu_buffer->head_page
3606                 = list_entry(cpu_buffer->pages, struct buffer_page, list);
3607         local_set(&cpu_buffer->head_page->write, 0);
3608         local_set(&cpu_buffer->head_page->entries, 0);
3609         local_set(&cpu_buffer->head_page->page->commit, 0);
3610
3611         cpu_buffer->head_page->read = 0;
3612
3613         cpu_buffer->tail_page = cpu_buffer->head_page;
3614         cpu_buffer->commit_page = cpu_buffer->head_page;
3615
3616         INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
3617         local_set(&cpu_buffer->reader_page->write, 0);
3618         local_set(&cpu_buffer->reader_page->entries, 0);
3619         local_set(&cpu_buffer->reader_page->page->commit, 0);
3620         cpu_buffer->reader_page->read = 0;
3621
3622         local_set(&cpu_buffer->commit_overrun, 0);
3623         local_set(&cpu_buffer->entries_bytes, 0);
3624         local_set(&cpu_buffer->overrun, 0);
3625         local_set(&cpu_buffer->entries, 0);
3626         local_set(&cpu_buffer->committing, 0);
3627         local_set(&cpu_buffer->commits, 0);
3628         cpu_buffer->read = 0;
3629         cpu_buffer->read_bytes = 0;
3630
3631         cpu_buffer->write_stamp = 0;
3632         cpu_buffer->read_stamp = 0;
3633
3634         cpu_buffer->lost_events = 0;
3635         cpu_buffer->last_overrun = 0;
3636
3637         rb_head_page_activate(cpu_buffer);
3638 }
3639
3640 /**
3641  * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
3642  * @buffer: The ring buffer to reset a per cpu buffer of
3643  * @cpu: The CPU buffer to be reset
3644  */
3645 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
3646 {
3647         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
3648         unsigned long flags;
3649
3650         if (!cpumask_test_cpu(cpu, buffer->cpumask))
3651                 return;
3652
3653         atomic_inc(&cpu_buffer->record_disabled);
3654
3655         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3656
3657         if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
3658                 goto out;
3659
3660         arch_spin_lock(&cpu_buffer->lock);
3661
3662         rb_reset_cpu(cpu_buffer);
3663
3664         arch_spin_unlock(&cpu_buffer->lock);
3665
3666  out:
3667         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3668
3669         atomic_dec(&cpu_buffer->record_disabled);
3670 }
3671 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
3672
3673 /**
3674  * ring_buffer_reset - reset a ring buffer
3675  * @buffer: The ring buffer to reset all cpu buffers
3676  */
3677 void ring_buffer_reset(struct ring_buffer *buffer)
3678 {
3679         int cpu;
3680
3681         for_each_buffer_cpu(buffer, cpu)
3682                 ring_buffer_reset_cpu(buffer, cpu);
3683 }
3684 EXPORT_SYMBOL_GPL(ring_buffer_reset);
3685
3686 /**
3687  * rind_buffer_empty - is the ring buffer empty?
3688  * @buffer: The ring buffer to test
3689  */
3690 int ring_buffer_empty(struct ring_buffer *buffer)
3691 {
3692         struct ring_buffer_per_cpu *cpu_buffer;
3693         unsigned long flags;
3694         int dolock;
3695         int cpu;
3696         int ret;
3697
3698         dolock = rb_ok_to_lock();
3699
3700         /* yes this is racy, but if you don't like the race, lock the buffer */
3701         for_each_buffer_cpu(buffer, cpu) {
3702                 cpu_buffer = buffer->buffers[cpu];
3703                 local_irq_save(flags);
3704                 if (dolock)
3705                         raw_spin_lock(&cpu_buffer->reader_lock);
3706                 ret = rb_per_cpu_empty(cpu_buffer);
3707                 if (dolock)
3708                         raw_spin_unlock(&cpu_buffer->reader_lock);
3709                 local_irq_restore(flags);
3710
3711                 if (!ret)
3712                         return 0;
3713         }
3714
3715         return 1;
3716 }
3717 EXPORT_SYMBOL_GPL(ring_buffer_empty);
3718
3719 /**
3720  * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
3721  * @buffer: The ring buffer
3722  * @cpu: The CPU buffer to test
3723  */
3724 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
3725 {
3726         struct ring_buffer_per_cpu *cpu_buffer;
3727         unsigned long flags;
3728         int dolock;
3729         int ret;
3730
3731         if (!cpumask_test_cpu(cpu, buffer->cpumask))
3732                 return 1;
3733
3734         dolock = rb_ok_to_lock();
3735
3736         cpu_buffer = buffer->buffers[cpu];
3737         local_irq_save(flags);
3738         if (dolock)
3739                 raw_spin_lock(&cpu_buffer->reader_lock);
3740         ret = rb_per_cpu_empty(cpu_buffer);
3741         if (dolock)
3742                 raw_spin_unlock(&cpu_buffer->reader_lock);
3743         local_irq_restore(flags);
3744
3745         return ret;
3746 }
3747 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
3748
3749 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
3750 /**
3751  * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
3752  * @buffer_a: One buffer to swap with
3753  * @buffer_b: The other buffer to swap with
3754  *
3755  * This function is useful for tracers that want to take a "snapshot"
3756  * of a CPU buffer and has another back up buffer lying around.
3757  * it is expected that the tracer handles the cpu buffer not being
3758  * used at the moment.
3759  */
3760 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
3761                          struct ring_buffer *buffer_b, int cpu)
3762 {
3763         struct ring_buffer_per_cpu *cpu_buffer_a;
3764         struct ring_buffer_per_cpu *cpu_buffer_b;
3765         int ret = -EINVAL;
3766
3767         if (!cpumask_test_cpu(cpu, buffer_a->cpumask) ||
3768             !cpumask_test_cpu(cpu, buffer_b->cpumask))
3769                 goto out;
3770
3771         /* At least make sure the two buffers are somewhat the same */
3772         if (buffer_a->pages != buffer_b->pages)
3773                 goto out;
3774
3775         ret = -EAGAIN;
3776
3777         if (ring_buffer_flags != RB_BUFFERS_ON)
3778                 goto out;
3779
3780         if (atomic_read(&buffer_a->record_disabled))
3781                 goto out;
3782
3783         if (atomic_read(&buffer_b->record_disabled))
3784                 goto out;
3785
3786         cpu_buffer_a = buffer_a->buffers[cpu];
3787         cpu_buffer_b = buffer_b->buffers[cpu];
3788
3789         if (atomic_read(&cpu_buffer_a->record_disabled))
3790                 goto out;
3791
3792         if (atomic_read(&cpu_buffer_b->record_disabled))
3793                 goto out;
3794
3795         /*
3796          * We can't do a synchronize_sched here because this
3797          * function can be called in atomic context.
3798          * Normally this will be called from the same CPU as cpu.
3799          * If not it's up to the caller to protect this.
3800          */
3801         atomic_inc(&cpu_buffer_a->record_disabled);
3802         atomic_inc(&cpu_buffer_b->record_disabled);
3803
3804         ret = -EBUSY;
3805         if (local_read(&cpu_buffer_a->committing))
3806                 goto out_dec;
3807         if (local_read(&cpu_buffer_b->committing))
3808                 goto out_dec;
3809
3810         buffer_a->buffers[cpu] = cpu_buffer_b;
3811         buffer_b->buffers[cpu] = cpu_buffer_a;
3812
3813         cpu_buffer_b->buffer = buffer_a;
3814         cpu_buffer_a->buffer = buffer_b;
3815
3816         ret = 0;
3817
3818 out_dec:
3819         atomic_dec(&cpu_buffer_a->record_disabled);
3820         atomic_dec(&cpu_buffer_b->record_disabled);
3821 out:
3822         return ret;
3823 }
3824 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
3825 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */
3826
3827 /**
3828  * ring_buffer_alloc_read_page - allocate a page to read from buffer
3829  * @buffer: the buffer to allocate for.
3830  *
3831  * This function is used in conjunction with ring_buffer_read_page.
3832  * When reading a full page from the ring buffer, these functions
3833  * can be used to speed up the process. The calling function should
3834  * allocate a few pages first with this function. Then when it
3835  * needs to get pages from the ring buffer, it passes the result
3836  * of this function into ring_buffer_read_page, which will swap
3837  * the page that was allocated, with the read page of the buffer.
3838  *
3839  * Returns:
3840  *  The page allocated, or NULL on error.
3841  */
3842 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer, int cpu)
3843 {
3844         struct buffer_data_page *bpage;
3845         struct page *page;
3846
3847         page = alloc_pages_node(cpu_to_node(cpu),
3848                                 GFP_KERNEL | __GFP_NORETRY, 0);
3849         if (!page)
3850                 return NULL;
3851
3852         bpage = page_address(page);
3853
3854         rb_init_page(bpage);
3855
3856         return bpage;
3857 }
3858 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page);
3859
3860 /**
3861  * ring_buffer_free_read_page - free an allocated read page
3862  * @buffer: the buffer the page was allocate for
3863  * @data: the page to free
3864  *
3865  * Free a page allocated from ring_buffer_alloc_read_page.
3866  */
3867 void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data)
3868 {
3869         free_page((unsigned long)data);
3870 }
3871 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page);
3872
3873 /**
3874  * ring_buffer_read_page - extract a page from the ring buffer
3875  * @buffer: buffer to extract from
3876  * @data_page: the page to use allocated from ring_buffer_alloc_read_page
3877  * @len: amount to extract
3878  * @cpu: the cpu of the buffer to extract
3879  * @full: should the extraction only happen when the page is full.
3880  *
3881  * This function will pull out a page from the ring buffer and consume it.
3882  * @data_page must be the address of the variable that was returned
3883  * from ring_buffer_alloc_read_page. This is because the page might be used
3884  * to swap with a page in the ring buffer.
3885  *
3886  * for example:
3887  *      rpage = ring_buffer_alloc_read_page(buffer);
3888  *      if (!rpage)
3889  *              return error;
3890  *      ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0);
3891  *      if (ret >= 0)
3892  *              process_page(rpage, ret);
3893  *
3894  * When @full is set, the function will not return true unless
3895  * the writer is off the reader page.
3896  *
3897  * Note: it is up to the calling functions to handle sleeps and wakeups.
3898  *  The ring buffer can be used anywhere in the kernel and can not
3899  *  blindly call wake_up. The layer that uses the ring buffer must be
3900  *  responsible for that.
3901  *
3902  * Returns:
3903  *  >=0 if data has been transferred, returns the offset of consumed data.
3904  *  <0 if no data has been transferred.
3905  */
3906 int ring_buffer_read_page(struct ring_buffer *buffer,
3907                           void **data_page, size_t len, int cpu, int full)
3908 {
3909         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
3910         struct ring_buffer_event *event;
3911         struct buffer_data_page *bpage;
3912         struct buffer_page *reader;
3913         unsigned long missed_events;
3914         unsigned long flags;
3915         unsigned int commit;
3916         unsigned int read;
3917         u64 save_timestamp;
3918         int ret = -1;
3919
3920         if (!cpumask_test_cpu(cpu, buffer->cpumask))
3921                 goto out;
3922
3923         /*
3924          * If len is not big enough to hold the page header, then
3925          * we can not copy anything.
3926          */
3927         if (len <= BUF_PAGE_HDR_SIZE)
3928                 goto out;
3929
3930         len -= BUF_PAGE_HDR_SIZE;
3931
3932         if (!data_page)
3933                 goto out;
3934
3935         bpage = *data_page;
3936         if (!bpage)
3937                 goto out;
3938
3939         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3940
3941         reader = rb_get_reader_page(cpu_buffer);
3942         if (!reader)
3943                 goto out_unlock;
3944
3945         event = rb_reader_event(cpu_buffer);
3946
3947         read = reader->read;
3948         commit = rb_page_commit(reader);
3949
3950         /* Check if any events were dropped */
3951         missed_events = cpu_buffer->lost_events;
3952
3953         /*
3954          * If this page has been partially read or
3955          * if len is not big enough to read the rest of the page or
3956          * a writer is still on the page, then
3957          * we must copy the data from the page to the buffer.
3958          * Otherwise, we can simply swap the page with the one passed in.
3959          */
3960         if (read || (len < (commit - read)) ||
3961             cpu_buffer->reader_page == cpu_buffer->commit_page) {
3962                 struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
3963                 unsigned int rpos = read;
3964                 unsigned int pos = 0;
3965                 unsigned int size;
3966
3967                 if (full)
3968                         goto out_unlock;
3969
3970                 if (len > (commit - read))
3971                         len = (commit - read);
3972
3973                 /* Always keep the time extend and data together */
3974                 size = rb_event_ts_length(event);
3975
3976                 if (len < size)
3977                         goto out_unlock;
3978
3979                 /* save the current timestamp, since the user will need it */
3980                 save_timestamp = cpu_buffer->read_stamp;
3981
3982                 /* Need to copy one event at a time */
3983                 do {
3984                         /* We need the size of one event, because
3985                          * rb_advance_reader only advances by one event,
3986                          * whereas rb_event_ts_length may include the size of
3987                          * one or two events.
3988                          * We have already ensured there's enough space if this
3989                          * is a time extend. */
3990                         size = rb_event_length(event);
3991                         memcpy(bpage->data + pos, rpage->data + rpos, size);
3992
3993                         len -= size;
3994
3995                         rb_advance_reader(cpu_buffer);
3996                         rpos = reader->read;
3997                         pos += size;
3998
3999                         if (rpos >= commit)
4000                                 break;
4001
4002                         event = rb_reader_event(cpu_buffer);
4003                         /* Always keep the time extend and data together */
4004                         size = rb_event_ts_length(event);
4005                 } while (len >= size);
4006
4007                 /* update bpage */
4008                 local_set(&bpage->commit, pos);
4009                 bpage->time_stamp = save_timestamp;
4010
4011                 /* we copied everything to the beginning */
4012                 read = 0;
4013         } else {
4014                 /* update the entry counter */
4015                 cpu_buffer->read += rb_page_entries(reader);
4016                 cpu_buffer->read_bytes += BUF_PAGE_SIZE;
4017
4018                 /* swap the pages */
4019                 rb_init_page(bpage);
4020                 bpage = reader->page;
4021                 reader->page = *data_page;
4022                 local_set(&reader->write, 0);
4023                 local_set(&reader->entries, 0);
4024                 reader->read = 0;
4025                 *data_page = bpage;
4026
4027                 /*
4028                  * Use the real_end for the data size,
4029                  * This gives us a chance to store the lost events
4030                  * on the page.
4031                  */
4032                 if (reader->real_end)
4033                         local_set(&bpage->commit, reader->real_end);
4034         }
4035         ret = read;
4036
4037         cpu_buffer->lost_events = 0;
4038
4039         commit = local_read(&bpage->commit);
4040         /*
4041          * Set a flag in the commit field if we lost events
4042          */
4043         if (missed_events) {
4044                 /* If there is room at the end of the page to save the
4045                  * missed events, then record it there.
4046                  */
4047                 if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) {
4048                         memcpy(&bpage->data[commit], &missed_events,
4049                                sizeof(missed_events));
4050                         local_add(RB_MISSED_STORED, &bpage->commit);
4051                         commit += sizeof(missed_events);
4052                 }
4053                 local_add(RB_MISSED_EVENTS, &bpage->commit);
4054         }
4055
4056         /*
4057          * This page may be off to user land. Zero it out here.
4058          */
4059         if (commit < BUF_PAGE_SIZE)
4060                 memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit);
4061
4062  out_unlock:
4063         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4064
4065  out:
4066         return ret;
4067 }
4068 EXPORT_SYMBOL_GPL(ring_buffer_read_page);
4069
4070 #ifdef CONFIG_HOTPLUG_CPU
4071 static int rb_cpu_notify(struct notifier_block *self,
4072                          unsigned long action, void *hcpu)
4073 {
4074         struct ring_buffer *buffer =
4075                 container_of(self, struct ring_buffer, cpu_notify);
4076         long cpu = (long)hcpu;
4077
4078         switch (action) {
4079         case CPU_UP_PREPARE:
4080         case CPU_UP_PREPARE_FROZEN:
4081                 if (cpumask_test_cpu(cpu, buffer->cpumask))
4082                         return NOTIFY_OK;
4083
4084                 buffer->buffers[cpu] =
4085                         rb_allocate_cpu_buffer(buffer, cpu);
4086                 if (!buffer->buffers[cpu]) {
4087                         WARN(1, "failed to allocate ring buffer on CPU %ld\n",
4088                              cpu);
4089                         return NOTIFY_OK;
4090                 }
4091                 smp_wmb();
4092                 cpumask_set_cpu(cpu, buffer->cpumask);
4093                 break;
4094         case CPU_DOWN_PREPARE:
4095         case CPU_DOWN_PREPARE_FROZEN:
4096                 /*
4097                  * Do nothing.
4098                  *  If we were to free the buffer, then the user would
4099                  *  lose any trace that was in the buffer.
4100                  */
4101                 break;
4102         default:
4103                 break;
4104         }
4105         return NOTIFY_OK;
4106 }
4107 #endif