1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 6 */ 7 #include <linux/trace_recursion.h> 8 #include <linux/trace_events.h> 9 #include <linux/ring_buffer.h> 10 #include <linux/trace_clock.h> 11 #include <linux/sched/clock.h> 12 #include <linux/cacheflush.h> 13 #include <linux/trace_seq.h> 14 #include <linux/spinlock.h> 15 #include <linux/irq_work.h> 16 #include <linux/security.h> 17 #include <linux/uaccess.h> 18 #include <linux/hardirq.h> 19 #include <linux/kthread.h> /* for self test */ 20 #include <linux/module.h> 21 #include <linux/percpu.h> 22 #include <linux/mutex.h> 23 #include <linux/delay.h> 24 #include <linux/slab.h> 25 #include <linux/init.h> 26 #include <linux/hash.h> 27 #include <linux/list.h> 28 #include <linux/cpu.h> 29 #include <linux/oom.h> 30 #include <linux/mm.h> 31 32 #include <asm/local64.h> 33 #include <asm/local.h> 34 #include <asm/setup.h> 35 36 #include "trace.h" 37 38 /* 39 * The "absolute" timestamp in the buffer is only 59 bits. 40 * If a clock has the 5 MSBs set, it needs to be saved and 41 * reinserted. 42 */ 43 #define TS_MSB (0xf8ULL << 56) 44 #define ABS_TS_MASK (~TS_MSB) 45 46 static void update_pages_handler(struct work_struct *work); 47 48 #define RING_BUFFER_META_MAGIC 0xBADFEED 49 50 struct ring_buffer_meta { 51 int magic; 52 int struct_sizes; 53 unsigned long total_size; 54 unsigned long buffers_offset; 55 }; 56 57 struct ring_buffer_cpu_meta { 58 unsigned long first_buffer; 59 unsigned long head_buffer; 60 unsigned long commit_buffer; 61 __u32 subbuf_size; 62 __u32 nr_subbufs; 63 int buffers[]; 64 }; 65 66 /* 67 * The ring buffer header is special. We must manually up keep it. 68 */ 69 int ring_buffer_print_entry_header(struct trace_seq *s) 70 { 71 trace_seq_puts(s, "# compressed entry header\n"); 72 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 73 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 74 trace_seq_puts(s, "\tarray : 32 bits\n"); 75 trace_seq_putc(s, '\n'); 76 trace_seq_printf(s, "\tpadding : type == %d\n", 77 RINGBUF_TYPE_PADDING); 78 trace_seq_printf(s, "\ttime_extend : type == %d\n", 79 RINGBUF_TYPE_TIME_EXTEND); 80 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 81 RINGBUF_TYPE_TIME_STAMP); 82 trace_seq_printf(s, "\tdata max type_len == %d\n", 83 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 84 85 return !trace_seq_has_overflowed(s); 86 } 87 88 /* 89 * The ring buffer is made up of a list of pages. A separate list of pages is 90 * allocated for each CPU. A writer may only write to a buffer that is 91 * associated with the CPU it is currently executing on. A reader may read 92 * from any per cpu buffer. 93 * 94 * The reader is special. For each per cpu buffer, the reader has its own 95 * reader page. When a reader has read the entire reader page, this reader 96 * page is swapped with another page in the ring buffer. 97 * 98 * Now, as long as the writer is off the reader page, the reader can do what 99 * ever it wants with that page. The writer will never write to that page 100 * again (as long as it is out of the ring buffer). 101 * 102 * Here's some silly ASCII art. 103 * 104 * +------+ 105 * |reader| RING BUFFER 106 * |page | 107 * +------+ +---+ +---+ +---+ 108 * | |-->| |-->| | 109 * +---+ +---+ +---+ 110 * ^ | 111 * | | 112 * +---------------+ 113 * 114 * 115 * +------+ 116 * |reader| RING BUFFER 117 * |page |------------------v 118 * +------+ +---+ +---+ +---+ 119 * | |-->| |-->| | 120 * +---+ +---+ +---+ 121 * ^ | 122 * | | 123 * +---------------+ 124 * 125 * 126 * +------+ 127 * |reader| RING BUFFER 128 * |page |------------------v 129 * +------+ +---+ +---+ +---+ 130 * ^ | |-->| |-->| | 131 * | +---+ +---+ +---+ 132 * | | 133 * | | 134 * +------------------------------+ 135 * 136 * 137 * +------+ 138 * |buffer| RING BUFFER 139 * |page |------------------v 140 * +------+ +---+ +---+ +---+ 141 * ^ | | | |-->| | 142 * | New +---+ +---+ +---+ 143 * | Reader------^ | 144 * | page | 145 * +------------------------------+ 146 * 147 * 148 * After we make this swap, the reader can hand this page off to the splice 149 * code and be done with it. It can even allocate a new page if it needs to 150 * and swap that into the ring buffer. 151 * 152 * We will be using cmpxchg soon to make all this lockless. 153 * 154 */ 155 156 /* Used for individual buffers (after the counter) */ 157 #define RB_BUFFER_OFF (1 << 20) 158 159 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 160 161 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 162 #define RB_ALIGNMENT 4U 163 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 164 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 165 166 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS 167 # define RB_FORCE_8BYTE_ALIGNMENT 0 168 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 169 #else 170 # define RB_FORCE_8BYTE_ALIGNMENT 1 171 # define RB_ARCH_ALIGNMENT 8U 172 #endif 173 174 #define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT) 175 176 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 177 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 178 179 enum { 180 RB_LEN_TIME_EXTEND = 8, 181 RB_LEN_TIME_STAMP = 8, 182 }; 183 184 #define skip_time_extend(event) \ 185 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 186 187 #define extended_time(event) \ 188 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 189 190 static inline bool rb_null_event(struct ring_buffer_event *event) 191 { 192 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 193 } 194 195 static void rb_event_set_padding(struct ring_buffer_event *event) 196 { 197 /* padding has a NULL time_delta */ 198 event->type_len = RINGBUF_TYPE_PADDING; 199 event->time_delta = 0; 200 } 201 202 static unsigned 203 rb_event_data_length(struct ring_buffer_event *event) 204 { 205 unsigned length; 206 207 if (event->type_len) 208 length = event->type_len * RB_ALIGNMENT; 209 else 210 length = event->array[0]; 211 return length + RB_EVNT_HDR_SIZE; 212 } 213 214 /* 215 * Return the length of the given event. Will return 216 * the length of the time extend if the event is a 217 * time extend. 218 */ 219 static inline unsigned 220 rb_event_length(struct ring_buffer_event *event) 221 { 222 switch (event->type_len) { 223 case RINGBUF_TYPE_PADDING: 224 if (rb_null_event(event)) 225 /* undefined */ 226 return -1; 227 return event->array[0] + RB_EVNT_HDR_SIZE; 228 229 case RINGBUF_TYPE_TIME_EXTEND: 230 return RB_LEN_TIME_EXTEND; 231 232 case RINGBUF_TYPE_TIME_STAMP: 233 return RB_LEN_TIME_STAMP; 234 235 case RINGBUF_TYPE_DATA: 236 return rb_event_data_length(event); 237 default: 238 WARN_ON_ONCE(1); 239 } 240 /* not hit */ 241 return 0; 242 } 243 244 /* 245 * Return total length of time extend and data, 246 * or just the event length for all other events. 247 */ 248 static inline unsigned 249 rb_event_ts_length(struct ring_buffer_event *event) 250 { 251 unsigned len = 0; 252 253 if (extended_time(event)) { 254 /* time extends include the data event after it */ 255 len = RB_LEN_TIME_EXTEND; 256 event = skip_time_extend(event); 257 } 258 return len + rb_event_length(event); 259 } 260 261 /** 262 * ring_buffer_event_length - return the length of the event 263 * @event: the event to get the length of 264 * 265 * Returns the size of the data load of a data event. 266 * If the event is something other than a data event, it 267 * returns the size of the event itself. With the exception 268 * of a TIME EXTEND, where it still returns the size of the 269 * data load of the data event after it. 270 */ 271 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 272 { 273 unsigned length; 274 275 if (extended_time(event)) 276 event = skip_time_extend(event); 277 278 length = rb_event_length(event); 279 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 280 return length; 281 length -= RB_EVNT_HDR_SIZE; 282 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 283 length -= sizeof(event->array[0]); 284 return length; 285 } 286 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 287 288 /* inline for ring buffer fast paths */ 289 static __always_inline void * 290 rb_event_data(struct ring_buffer_event *event) 291 { 292 if (extended_time(event)) 293 event = skip_time_extend(event); 294 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 295 /* If length is in len field, then array[0] has the data */ 296 if (event->type_len) 297 return (void *)&event->array[0]; 298 /* Otherwise length is in array[0] and array[1] has the data */ 299 return (void *)&event->array[1]; 300 } 301 302 /** 303 * ring_buffer_event_data - return the data of the event 304 * @event: the event to get the data from 305 */ 306 void *ring_buffer_event_data(struct ring_buffer_event *event) 307 { 308 return rb_event_data(event); 309 } 310 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 311 312 #define for_each_buffer_cpu(buffer, cpu) \ 313 for_each_cpu(cpu, buffer->cpumask) 314 315 #define for_each_online_buffer_cpu(buffer, cpu) \ 316 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) 317 318 #define TS_SHIFT 27 319 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 320 #define TS_DELTA_TEST (~TS_MASK) 321 322 static u64 rb_event_time_stamp(struct ring_buffer_event *event) 323 { 324 u64 ts; 325 326 ts = event->array[0]; 327 ts <<= TS_SHIFT; 328 ts += event->time_delta; 329 330 return ts; 331 } 332 333 /* Flag when events were overwritten */ 334 #define RB_MISSED_EVENTS (1 << 31) 335 /* Missed count stored at end */ 336 #define RB_MISSED_STORED (1 << 30) 337 338 #define RB_MISSED_MASK (3 << 30) 339 340 struct buffer_data_page { 341 u64 time_stamp; /* page time stamp */ 342 local_t commit; /* write committed index */ 343 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */ 344 }; 345 346 struct buffer_data_read_page { 347 unsigned order; /* order of the page */ 348 struct buffer_data_page *data; /* actual data, stored in this page */ 349 }; 350 351 /* 352 * Note, the buffer_page list must be first. The buffer pages 353 * are allocated in cache lines, which means that each buffer 354 * page will be at the beginning of a cache line, and thus 355 * the least significant bits will be zero. We use this to 356 * add flags in the list struct pointers, to make the ring buffer 357 * lockless. 358 */ 359 struct buffer_page { 360 struct list_head list; /* list of buffer pages */ 361 local_t write; /* index for next write */ 362 unsigned read; /* index for next read */ 363 local_t entries; /* entries on this page */ 364 unsigned long real_end; /* real end of data */ 365 unsigned order; /* order of the page */ 366 u32 id:30; /* ID for external mapping */ 367 u32 range:1; /* Mapped via a range */ 368 struct buffer_data_page *page; /* Actual data page */ 369 }; 370 371 /* 372 * The buffer page counters, write and entries, must be reset 373 * atomically when crossing page boundaries. To synchronize this 374 * update, two counters are inserted into the number. One is 375 * the actual counter for the write position or count on the page. 376 * 377 * The other is a counter of updaters. Before an update happens 378 * the update partition of the counter is incremented. This will 379 * allow the updater to update the counter atomically. 380 * 381 * The counter is 20 bits, and the state data is 12. 382 */ 383 #define RB_WRITE_MASK 0xfffff 384 #define RB_WRITE_INTCNT (1 << 20) 385 386 static void rb_init_page(struct buffer_data_page *bpage) 387 { 388 local_set(&bpage->commit, 0); 389 } 390 391 static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage) 392 { 393 return local_read(&bpage->page->commit); 394 } 395 396 static void free_buffer_page(struct buffer_page *bpage) 397 { 398 /* Range pages are not to be freed */ 399 if (!bpage->range) 400 free_pages((unsigned long)bpage->page, bpage->order); 401 kfree(bpage); 402 } 403 404 /* 405 * We need to fit the time_stamp delta into 27 bits. 406 */ 407 static inline bool test_time_stamp(u64 delta) 408 { 409 return !!(delta & TS_DELTA_TEST); 410 } 411 412 struct rb_irq_work { 413 struct irq_work work; 414 wait_queue_head_t waiters; 415 wait_queue_head_t full_waiters; 416 atomic_t seq; 417 bool waiters_pending; 418 bool full_waiters_pending; 419 bool wakeup_full; 420 }; 421 422 /* 423 * Structure to hold event state and handle nested events. 424 */ 425 struct rb_event_info { 426 u64 ts; 427 u64 delta; 428 u64 before; 429 u64 after; 430 unsigned long length; 431 struct buffer_page *tail_page; 432 int add_timestamp; 433 }; 434 435 /* 436 * Used for the add_timestamp 437 * NONE 438 * EXTEND - wants a time extend 439 * ABSOLUTE - the buffer requests all events to have absolute time stamps 440 * FORCE - force a full time stamp. 441 */ 442 enum { 443 RB_ADD_STAMP_NONE = 0, 444 RB_ADD_STAMP_EXTEND = BIT(1), 445 RB_ADD_STAMP_ABSOLUTE = BIT(2), 446 RB_ADD_STAMP_FORCE = BIT(3) 447 }; 448 /* 449 * Used for which event context the event is in. 450 * TRANSITION = 0 451 * NMI = 1 452 * IRQ = 2 453 * SOFTIRQ = 3 454 * NORMAL = 4 455 * 456 * See trace_recursive_lock() comment below for more details. 457 */ 458 enum { 459 RB_CTX_TRANSITION, 460 RB_CTX_NMI, 461 RB_CTX_IRQ, 462 RB_CTX_SOFTIRQ, 463 RB_CTX_NORMAL, 464 RB_CTX_MAX 465 }; 466 467 struct rb_time_struct { 468 local64_t time; 469 }; 470 typedef struct rb_time_struct rb_time_t; 471 472 #define MAX_NEST 5 473 474 /* 475 * head_page == tail_page && head == tail then buffer is empty. 476 */ 477 struct ring_buffer_per_cpu { 478 int cpu; 479 atomic_t record_disabled; 480 atomic_t resize_disabled; 481 struct trace_buffer *buffer; 482 raw_spinlock_t reader_lock; /* serialize readers */ 483 arch_spinlock_t lock; 484 struct lock_class_key lock_key; 485 struct buffer_data_page *free_page; 486 unsigned long nr_pages; 487 unsigned int current_context; 488 struct list_head *pages; 489 /* pages generation counter, incremented when the list changes */ 490 unsigned long cnt; 491 struct buffer_page *head_page; /* read from head */ 492 struct buffer_page *tail_page; /* write to tail */ 493 struct buffer_page *commit_page; /* committed pages */ 494 struct buffer_page *reader_page; 495 unsigned long lost_events; 496 unsigned long last_overrun; 497 unsigned long nest; 498 local_t entries_bytes; 499 local_t entries; 500 local_t overrun; 501 local_t commit_overrun; 502 local_t dropped_events; 503 local_t committing; 504 local_t commits; 505 local_t pages_touched; 506 local_t pages_lost; 507 local_t pages_read; 508 long last_pages_touch; 509 size_t shortest_full; 510 unsigned long read; 511 unsigned long read_bytes; 512 rb_time_t write_stamp; 513 rb_time_t before_stamp; 514 u64 event_stamp[MAX_NEST]; 515 u64 read_stamp; 516 /* pages removed since last reset */ 517 unsigned long pages_removed; 518 519 unsigned int mapped; 520 unsigned int user_mapped; /* user space mapping */ 521 struct mutex mapping_lock; 522 unsigned long *subbuf_ids; /* ID to subbuf VA */ 523 struct trace_buffer_meta *meta_page; 524 struct ring_buffer_cpu_meta *ring_meta; 525 526 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 527 long nr_pages_to_update; 528 struct list_head new_pages; /* new pages to add */ 529 struct work_struct update_pages_work; 530 struct completion update_done; 531 532 struct rb_irq_work irq_work; 533 }; 534 535 struct trace_buffer { 536 unsigned flags; 537 int cpus; 538 atomic_t record_disabled; 539 atomic_t resizing; 540 cpumask_var_t cpumask; 541 542 struct lock_class_key *reader_lock_key; 543 544 struct mutex mutex; 545 546 struct ring_buffer_per_cpu **buffers; 547 548 struct hlist_node node; 549 u64 (*clock)(void); 550 551 struct rb_irq_work irq_work; 552 bool time_stamp_abs; 553 554 unsigned long range_addr_start; 555 unsigned long range_addr_end; 556 557 struct ring_buffer_meta *meta; 558 559 unsigned int subbuf_size; 560 unsigned int subbuf_order; 561 unsigned int max_data_size; 562 }; 563 564 struct ring_buffer_iter { 565 struct ring_buffer_per_cpu *cpu_buffer; 566 unsigned long head; 567 unsigned long next_event; 568 struct buffer_page *head_page; 569 struct buffer_page *cache_reader_page; 570 unsigned long cache_read; 571 unsigned long cache_pages_removed; 572 u64 read_stamp; 573 u64 page_stamp; 574 struct ring_buffer_event *event; 575 size_t event_size; 576 int missed_events; 577 }; 578 579 int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq *s) 580 { 581 struct buffer_data_page field; 582 583 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 584 "offset:0;\tsize:%u;\tsigned:%u;\n", 585 (unsigned int)sizeof(field.time_stamp), 586 (unsigned int)is_signed_type(u64)); 587 588 trace_seq_printf(s, "\tfield: local_t commit;\t" 589 "offset:%u;\tsize:%u;\tsigned:%u;\n", 590 (unsigned int)offsetof(typeof(field), commit), 591 (unsigned int)sizeof(field.commit), 592 (unsigned int)is_signed_type(long)); 593 594 trace_seq_printf(s, "\tfield: int overwrite;\t" 595 "offset:%u;\tsize:%u;\tsigned:%u;\n", 596 (unsigned int)offsetof(typeof(field), commit), 597 1, 598 (unsigned int)is_signed_type(long)); 599 600 trace_seq_printf(s, "\tfield: char data;\t" 601 "offset:%u;\tsize:%u;\tsigned:%u;\n", 602 (unsigned int)offsetof(typeof(field), data), 603 (unsigned int)buffer->subbuf_size, 604 (unsigned int)is_signed_type(char)); 605 606 return !trace_seq_has_overflowed(s); 607 } 608 609 static inline void rb_time_read(rb_time_t *t, u64 *ret) 610 { 611 *ret = local64_read(&t->time); 612 } 613 static void rb_time_set(rb_time_t *t, u64 val) 614 { 615 local64_set(&t->time, val); 616 } 617 618 /* 619 * Enable this to make sure that the event passed to 620 * ring_buffer_event_time_stamp() is not committed and also 621 * is on the buffer that it passed in. 622 */ 623 //#define RB_VERIFY_EVENT 624 #ifdef RB_VERIFY_EVENT 625 static struct list_head *rb_list_head(struct list_head *list); 626 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 627 void *event) 628 { 629 struct buffer_page *page = cpu_buffer->commit_page; 630 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page); 631 struct list_head *next; 632 long commit, write; 633 unsigned long addr = (unsigned long)event; 634 bool done = false; 635 int stop = 0; 636 637 /* Make sure the event exists and is not committed yet */ 638 do { 639 if (page == tail_page || WARN_ON_ONCE(stop++ > 100)) 640 done = true; 641 commit = local_read(&page->page->commit); 642 write = local_read(&page->write); 643 if (addr >= (unsigned long)&page->page->data[commit] && 644 addr < (unsigned long)&page->page->data[write]) 645 return; 646 647 next = rb_list_head(page->list.next); 648 page = list_entry(next, struct buffer_page, list); 649 } while (!done); 650 WARN_ON_ONCE(1); 651 } 652 #else 653 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 654 void *event) 655 { 656 } 657 #endif 658 659 /* 660 * The absolute time stamp drops the 5 MSBs and some clocks may 661 * require them. The rb_fix_abs_ts() will take a previous full 662 * time stamp, and add the 5 MSB of that time stamp on to the 663 * saved absolute time stamp. Then they are compared in case of 664 * the unlikely event that the latest time stamp incremented 665 * the 5 MSB. 666 */ 667 static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts) 668 { 669 if (save_ts & TS_MSB) { 670 abs |= save_ts & TS_MSB; 671 /* Check for overflow */ 672 if (unlikely(abs < save_ts)) 673 abs += 1ULL << 59; 674 } 675 return abs; 676 } 677 678 static inline u64 rb_time_stamp(struct trace_buffer *buffer); 679 680 /** 681 * ring_buffer_event_time_stamp - return the event's current time stamp 682 * @buffer: The buffer that the event is on 683 * @event: the event to get the time stamp of 684 * 685 * Note, this must be called after @event is reserved, and before it is 686 * committed to the ring buffer. And must be called from the same 687 * context where the event was reserved (normal, softirq, irq, etc). 688 * 689 * Returns the time stamp associated with the current event. 690 * If the event has an extended time stamp, then that is used as 691 * the time stamp to return. 692 * In the highly unlikely case that the event was nested more than 693 * the max nesting, then the write_stamp of the buffer is returned, 694 * otherwise current time is returned, but that really neither of 695 * the last two cases should ever happen. 696 */ 697 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, 698 struct ring_buffer_event *event) 699 { 700 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()]; 701 unsigned int nest; 702 u64 ts; 703 704 /* If the event includes an absolute time, then just use that */ 705 if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { 706 ts = rb_event_time_stamp(event); 707 return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp); 708 } 709 710 nest = local_read(&cpu_buffer->committing); 711 verify_event(cpu_buffer, event); 712 if (WARN_ON_ONCE(!nest)) 713 goto fail; 714 715 /* Read the current saved nesting level time stamp */ 716 if (likely(--nest < MAX_NEST)) 717 return cpu_buffer->event_stamp[nest]; 718 719 /* Shouldn't happen, warn if it does */ 720 WARN_ONCE(1, "nest (%d) greater than max", nest); 721 722 fail: 723 rb_time_read(&cpu_buffer->write_stamp, &ts); 724 725 return ts; 726 } 727 728 /** 729 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer 730 * @buffer: The ring_buffer to get the number of pages from 731 * @cpu: The cpu of the ring_buffer to get the number of pages from 732 * 733 * Returns the number of pages that have content in the ring buffer. 734 */ 735 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu) 736 { 737 size_t read; 738 size_t lost; 739 size_t cnt; 740 741 read = local_read(&buffer->buffers[cpu]->pages_read); 742 lost = local_read(&buffer->buffers[cpu]->pages_lost); 743 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 744 745 if (WARN_ON_ONCE(cnt < lost)) 746 return 0; 747 748 cnt -= lost; 749 750 /* The reader can read an empty page, but not more than that */ 751 if (cnt < read) { 752 WARN_ON_ONCE(read > cnt + 1); 753 return 0; 754 } 755 756 return cnt - read; 757 } 758 759 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full) 760 { 761 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 762 size_t nr_pages; 763 size_t dirty; 764 765 nr_pages = cpu_buffer->nr_pages; 766 if (!nr_pages || !full) 767 return true; 768 769 /* 770 * Add one as dirty will never equal nr_pages, as the sub-buffer 771 * that the writer is on is not counted as dirty. 772 * This is needed if "buffer_percent" is set to 100. 773 */ 774 dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1; 775 776 return (dirty * 100) >= (full * nr_pages); 777 } 778 779 /* 780 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 781 * 782 * Schedules a delayed work to wake up any task that is blocked on the 783 * ring buffer waiters queue. 784 */ 785 static void rb_wake_up_waiters(struct irq_work *work) 786 { 787 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 788 789 /* For waiters waiting for the first wake up */ 790 (void)atomic_fetch_inc_release(&rbwork->seq); 791 792 wake_up_all(&rbwork->waiters); 793 if (rbwork->full_waiters_pending || rbwork->wakeup_full) { 794 /* Only cpu_buffer sets the above flags */ 795 struct ring_buffer_per_cpu *cpu_buffer = 796 container_of(rbwork, struct ring_buffer_per_cpu, irq_work); 797 798 /* Called from interrupt context */ 799 raw_spin_lock(&cpu_buffer->reader_lock); 800 rbwork->wakeup_full = false; 801 rbwork->full_waiters_pending = false; 802 803 /* Waking up all waiters, they will reset the shortest full */ 804 cpu_buffer->shortest_full = 0; 805 raw_spin_unlock(&cpu_buffer->reader_lock); 806 807 wake_up_all(&rbwork->full_waiters); 808 } 809 } 810 811 /** 812 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer 813 * @buffer: The ring buffer to wake waiters on 814 * @cpu: The CPU buffer to wake waiters on 815 * 816 * In the case of a file that represents a ring buffer is closing, 817 * it is prudent to wake up any waiters that are on this. 818 */ 819 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu) 820 { 821 struct ring_buffer_per_cpu *cpu_buffer; 822 struct rb_irq_work *rbwork; 823 824 if (!buffer) 825 return; 826 827 if (cpu == RING_BUFFER_ALL_CPUS) { 828 829 /* Wake up individual ones too. One level recursion */ 830 for_each_buffer_cpu(buffer, cpu) 831 ring_buffer_wake_waiters(buffer, cpu); 832 833 rbwork = &buffer->irq_work; 834 } else { 835 if (WARN_ON_ONCE(!buffer->buffers)) 836 return; 837 if (WARN_ON_ONCE(cpu >= nr_cpu_ids)) 838 return; 839 840 cpu_buffer = buffer->buffers[cpu]; 841 /* The CPU buffer may not have been initialized yet */ 842 if (!cpu_buffer) 843 return; 844 rbwork = &cpu_buffer->irq_work; 845 } 846 847 /* This can be called in any context */ 848 irq_work_queue(&rbwork->work); 849 } 850 851 static bool rb_watermark_hit(struct trace_buffer *buffer, int cpu, int full) 852 { 853 struct ring_buffer_per_cpu *cpu_buffer; 854 bool ret = false; 855 856 /* Reads of all CPUs always waits for any data */ 857 if (cpu == RING_BUFFER_ALL_CPUS) 858 return !ring_buffer_empty(buffer); 859 860 cpu_buffer = buffer->buffers[cpu]; 861 862 if (!ring_buffer_empty_cpu(buffer, cpu)) { 863 unsigned long flags; 864 bool pagebusy; 865 866 if (!full) 867 return true; 868 869 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 870 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 871 ret = !pagebusy && full_hit(buffer, cpu, full); 872 873 if (!ret && (!cpu_buffer->shortest_full || 874 cpu_buffer->shortest_full > full)) { 875 cpu_buffer->shortest_full = full; 876 } 877 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 878 } 879 return ret; 880 } 881 882 static inline bool 883 rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer, 884 int cpu, int full, ring_buffer_cond_fn cond, void *data) 885 { 886 if (rb_watermark_hit(buffer, cpu, full)) 887 return true; 888 889 if (cond(data)) 890 return true; 891 892 /* 893 * The events can happen in critical sections where 894 * checking a work queue can cause deadlocks. 895 * After adding a task to the queue, this flag is set 896 * only to notify events to try to wake up the queue 897 * using irq_work. 898 * 899 * We don't clear it even if the buffer is no longer 900 * empty. The flag only causes the next event to run 901 * irq_work to do the work queue wake up. The worse 902 * that can happen if we race with !trace_empty() is that 903 * an event will cause an irq_work to try to wake up 904 * an empty queue. 905 * 906 * There's no reason to protect this flag either, as 907 * the work queue and irq_work logic will do the necessary 908 * synchronization for the wake ups. The only thing 909 * that is necessary is that the wake up happens after 910 * a task has been queued. It's OK for spurious wake ups. 911 */ 912 if (full) 913 rbwork->full_waiters_pending = true; 914 else 915 rbwork->waiters_pending = true; 916 917 return false; 918 } 919 920 struct rb_wait_data { 921 struct rb_irq_work *irq_work; 922 int seq; 923 }; 924 925 /* 926 * The default wait condition for ring_buffer_wait() is to just to exit the 927 * wait loop the first time it is woken up. 928 */ 929 static bool rb_wait_once(void *data) 930 { 931 struct rb_wait_data *rdata = data; 932 struct rb_irq_work *rbwork = rdata->irq_work; 933 934 return atomic_read_acquire(&rbwork->seq) != rdata->seq; 935 } 936 937 /** 938 * ring_buffer_wait - wait for input to the ring buffer 939 * @buffer: buffer to wait on 940 * @cpu: the cpu buffer to wait on 941 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 942 * @cond: condition function to break out of wait (NULL to run once) 943 * @data: the data to pass to @cond. 944 * 945 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 946 * as data is added to any of the @buffer's cpu buffers. Otherwise 947 * it will wait for data to be added to a specific cpu buffer. 948 */ 949 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full, 950 ring_buffer_cond_fn cond, void *data) 951 { 952 struct ring_buffer_per_cpu *cpu_buffer; 953 struct wait_queue_head *waitq; 954 struct rb_irq_work *rbwork; 955 struct rb_wait_data rdata; 956 int ret = 0; 957 958 /* 959 * Depending on what the caller is waiting for, either any 960 * data in any cpu buffer, or a specific buffer, put the 961 * caller on the appropriate wait queue. 962 */ 963 if (cpu == RING_BUFFER_ALL_CPUS) { 964 rbwork = &buffer->irq_work; 965 /* Full only makes sense on per cpu reads */ 966 full = 0; 967 } else { 968 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 969 return -ENODEV; 970 cpu_buffer = buffer->buffers[cpu]; 971 rbwork = &cpu_buffer->irq_work; 972 } 973 974 if (full) 975 waitq = &rbwork->full_waiters; 976 else 977 waitq = &rbwork->waiters; 978 979 /* Set up to exit loop as soon as it is woken */ 980 if (!cond) { 981 cond = rb_wait_once; 982 rdata.irq_work = rbwork; 983 rdata.seq = atomic_read_acquire(&rbwork->seq); 984 data = &rdata; 985 } 986 987 ret = wait_event_interruptible((*waitq), 988 rb_wait_cond(rbwork, buffer, cpu, full, cond, data)); 989 990 return ret; 991 } 992 993 /** 994 * ring_buffer_poll_wait - poll on buffer input 995 * @buffer: buffer to wait on 996 * @cpu: the cpu buffer to wait on 997 * @filp: the file descriptor 998 * @poll_table: The poll descriptor 999 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 1000 * 1001 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 1002 * as data is added to any of the @buffer's cpu buffers. Otherwise 1003 * it will wait for data to be added to a specific cpu buffer. 1004 * 1005 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 1006 * zero otherwise. 1007 */ 1008 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 1009 struct file *filp, poll_table *poll_table, int full) 1010 { 1011 struct ring_buffer_per_cpu *cpu_buffer; 1012 struct rb_irq_work *rbwork; 1013 1014 if (cpu == RING_BUFFER_ALL_CPUS) { 1015 rbwork = &buffer->irq_work; 1016 full = 0; 1017 } else { 1018 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1019 return EPOLLERR; 1020 1021 cpu_buffer = buffer->buffers[cpu]; 1022 rbwork = &cpu_buffer->irq_work; 1023 } 1024 1025 if (full) { 1026 poll_wait(filp, &rbwork->full_waiters, poll_table); 1027 1028 if (rb_watermark_hit(buffer, cpu, full)) 1029 return EPOLLIN | EPOLLRDNORM; 1030 /* 1031 * Only allow full_waiters_pending update to be seen after 1032 * the shortest_full is set (in rb_watermark_hit). If the 1033 * writer sees the full_waiters_pending flag set, it will 1034 * compare the amount in the ring buffer to shortest_full. 1035 * If the amount in the ring buffer is greater than the 1036 * shortest_full percent, it will call the irq_work handler 1037 * to wake up this list. The irq_handler will reset shortest_full 1038 * back to zero. That's done under the reader_lock, but 1039 * the below smp_mb() makes sure that the update to 1040 * full_waiters_pending doesn't leak up into the above. 1041 */ 1042 smp_mb(); 1043 rbwork->full_waiters_pending = true; 1044 return 0; 1045 } 1046 1047 poll_wait(filp, &rbwork->waiters, poll_table); 1048 rbwork->waiters_pending = true; 1049 1050 /* 1051 * There's a tight race between setting the waiters_pending and 1052 * checking if the ring buffer is empty. Once the waiters_pending bit 1053 * is set, the next event will wake the task up, but we can get stuck 1054 * if there's only a single event in. 1055 * 1056 * FIXME: Ideally, we need a memory barrier on the writer side as well, 1057 * but adding a memory barrier to all events will cause too much of a 1058 * performance hit in the fast path. We only need a memory barrier when 1059 * the buffer goes from empty to having content. But as this race is 1060 * extremely small, and it's not a problem if another event comes in, we 1061 * will fix it later. 1062 */ 1063 smp_mb(); 1064 1065 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 1066 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 1067 return EPOLLIN | EPOLLRDNORM; 1068 return 0; 1069 } 1070 1071 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 1072 #define RB_WARN_ON(b, cond) \ 1073 ({ \ 1074 int _____ret = unlikely(cond); \ 1075 if (_____ret) { \ 1076 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 1077 struct ring_buffer_per_cpu *__b = \ 1078 (void *)b; \ 1079 atomic_inc(&__b->buffer->record_disabled); \ 1080 } else \ 1081 atomic_inc(&b->record_disabled); \ 1082 WARN_ON(1); \ 1083 } \ 1084 _____ret; \ 1085 }) 1086 1087 /* Up this if you want to test the TIME_EXTENTS and normalization */ 1088 #define DEBUG_SHIFT 0 1089 1090 static inline u64 rb_time_stamp(struct trace_buffer *buffer) 1091 { 1092 u64 ts; 1093 1094 /* Skip retpolines :-( */ 1095 if (IS_ENABLED(CONFIG_MITIGATION_RETPOLINE) && likely(buffer->clock == trace_clock_local)) 1096 ts = trace_clock_local(); 1097 else 1098 ts = buffer->clock(); 1099 1100 /* shift to debug/test normalization and TIME_EXTENTS */ 1101 return ts << DEBUG_SHIFT; 1102 } 1103 1104 u64 ring_buffer_time_stamp(struct trace_buffer *buffer) 1105 { 1106 u64 time; 1107 1108 preempt_disable_notrace(); 1109 time = rb_time_stamp(buffer); 1110 preempt_enable_notrace(); 1111 1112 return time; 1113 } 1114 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 1115 1116 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, 1117 int cpu, u64 *ts) 1118 { 1119 /* Just stupid testing the normalize function and deltas */ 1120 *ts >>= DEBUG_SHIFT; 1121 } 1122 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 1123 1124 /* 1125 * Making the ring buffer lockless makes things tricky. 1126 * Although writes only happen on the CPU that they are on, 1127 * and they only need to worry about interrupts. Reads can 1128 * happen on any CPU. 1129 * 1130 * The reader page is always off the ring buffer, but when the 1131 * reader finishes with a page, it needs to swap its page with 1132 * a new one from the buffer. The reader needs to take from 1133 * the head (writes go to the tail). But if a writer is in overwrite 1134 * mode and wraps, it must push the head page forward. 1135 * 1136 * Here lies the problem. 1137 * 1138 * The reader must be careful to replace only the head page, and 1139 * not another one. As described at the top of the file in the 1140 * ASCII art, the reader sets its old page to point to the next 1141 * page after head. It then sets the page after head to point to 1142 * the old reader page. But if the writer moves the head page 1143 * during this operation, the reader could end up with the tail. 1144 * 1145 * We use cmpxchg to help prevent this race. We also do something 1146 * special with the page before head. We set the LSB to 1. 1147 * 1148 * When the writer must push the page forward, it will clear the 1149 * bit that points to the head page, move the head, and then set 1150 * the bit that points to the new head page. 1151 * 1152 * We also don't want an interrupt coming in and moving the head 1153 * page on another writer. Thus we use the second LSB to catch 1154 * that too. Thus: 1155 * 1156 * head->list->prev->next bit 1 bit 0 1157 * ------- ------- 1158 * Normal page 0 0 1159 * Points to head page 0 1 1160 * New head page 1 0 1161 * 1162 * Note we can not trust the prev pointer of the head page, because: 1163 * 1164 * +----+ +-----+ +-----+ 1165 * | |------>| T |---X--->| N | 1166 * | |<------| | | | 1167 * +----+ +-----+ +-----+ 1168 * ^ ^ | 1169 * | +-----+ | | 1170 * +----------| R |----------+ | 1171 * | |<-----------+ 1172 * +-----+ 1173 * 1174 * Key: ---X--> HEAD flag set in pointer 1175 * T Tail page 1176 * R Reader page 1177 * N Next page 1178 * 1179 * (see __rb_reserve_next() to see where this happens) 1180 * 1181 * What the above shows is that the reader just swapped out 1182 * the reader page with a page in the buffer, but before it 1183 * could make the new header point back to the new page added 1184 * it was preempted by a writer. The writer moved forward onto 1185 * the new page added by the reader and is about to move forward 1186 * again. 1187 * 1188 * You can see, it is legitimate for the previous pointer of 1189 * the head (or any page) not to point back to itself. But only 1190 * temporarily. 1191 */ 1192 1193 #define RB_PAGE_NORMAL 0UL 1194 #define RB_PAGE_HEAD 1UL 1195 #define RB_PAGE_UPDATE 2UL 1196 1197 1198 #define RB_FLAG_MASK 3UL 1199 1200 /* PAGE_MOVED is not part of the mask */ 1201 #define RB_PAGE_MOVED 4UL 1202 1203 /* 1204 * rb_list_head - remove any bit 1205 */ 1206 static struct list_head *rb_list_head(struct list_head *list) 1207 { 1208 unsigned long val = (unsigned long)list; 1209 1210 return (struct list_head *)(val & ~RB_FLAG_MASK); 1211 } 1212 1213 /* 1214 * rb_is_head_page - test if the given page is the head page 1215 * 1216 * Because the reader may move the head_page pointer, we can 1217 * not trust what the head page is (it may be pointing to 1218 * the reader page). But if the next page is a header page, 1219 * its flags will be non zero. 1220 */ 1221 static inline int 1222 rb_is_head_page(struct buffer_page *page, struct list_head *list) 1223 { 1224 unsigned long val; 1225 1226 val = (unsigned long)list->next; 1227 1228 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 1229 return RB_PAGE_MOVED; 1230 1231 return val & RB_FLAG_MASK; 1232 } 1233 1234 /* 1235 * rb_is_reader_page 1236 * 1237 * The unique thing about the reader page, is that, if the 1238 * writer is ever on it, the previous pointer never points 1239 * back to the reader page. 1240 */ 1241 static bool rb_is_reader_page(struct buffer_page *page) 1242 { 1243 struct list_head *list = page->list.prev; 1244 1245 return rb_list_head(list->next) != &page->list; 1246 } 1247 1248 /* 1249 * rb_set_list_to_head - set a list_head to be pointing to head. 1250 */ 1251 static void rb_set_list_to_head(struct list_head *list) 1252 { 1253 unsigned long *ptr; 1254 1255 ptr = (unsigned long *)&list->next; 1256 *ptr |= RB_PAGE_HEAD; 1257 *ptr &= ~RB_PAGE_UPDATE; 1258 } 1259 1260 /* 1261 * rb_head_page_activate - sets up head page 1262 */ 1263 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 1264 { 1265 struct buffer_page *head; 1266 1267 head = cpu_buffer->head_page; 1268 if (!head) 1269 return; 1270 1271 /* 1272 * Set the previous list pointer to have the HEAD flag. 1273 */ 1274 rb_set_list_to_head(head->list.prev); 1275 1276 if (cpu_buffer->ring_meta) { 1277 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 1278 meta->head_buffer = (unsigned long)head->page; 1279 } 1280 } 1281 1282 static void rb_list_head_clear(struct list_head *list) 1283 { 1284 unsigned long *ptr = (unsigned long *)&list->next; 1285 1286 *ptr &= ~RB_FLAG_MASK; 1287 } 1288 1289 /* 1290 * rb_head_page_deactivate - clears head page ptr (for free list) 1291 */ 1292 static void 1293 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 1294 { 1295 struct list_head *hd; 1296 1297 /* Go through the whole list and clear any pointers found. */ 1298 rb_list_head_clear(cpu_buffer->pages); 1299 1300 list_for_each(hd, cpu_buffer->pages) 1301 rb_list_head_clear(hd); 1302 } 1303 1304 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 1305 struct buffer_page *head, 1306 struct buffer_page *prev, 1307 int old_flag, int new_flag) 1308 { 1309 struct list_head *list; 1310 unsigned long val = (unsigned long)&head->list; 1311 unsigned long ret; 1312 1313 list = &prev->list; 1314 1315 val &= ~RB_FLAG_MASK; 1316 1317 ret = cmpxchg((unsigned long *)&list->next, 1318 val | old_flag, val | new_flag); 1319 1320 /* check if the reader took the page */ 1321 if ((ret & ~RB_FLAG_MASK) != val) 1322 return RB_PAGE_MOVED; 1323 1324 return ret & RB_FLAG_MASK; 1325 } 1326 1327 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 1328 struct buffer_page *head, 1329 struct buffer_page *prev, 1330 int old_flag) 1331 { 1332 return rb_head_page_set(cpu_buffer, head, prev, 1333 old_flag, RB_PAGE_UPDATE); 1334 } 1335 1336 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 1337 struct buffer_page *head, 1338 struct buffer_page *prev, 1339 int old_flag) 1340 { 1341 return rb_head_page_set(cpu_buffer, head, prev, 1342 old_flag, RB_PAGE_HEAD); 1343 } 1344 1345 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 1346 struct buffer_page *head, 1347 struct buffer_page *prev, 1348 int old_flag) 1349 { 1350 return rb_head_page_set(cpu_buffer, head, prev, 1351 old_flag, RB_PAGE_NORMAL); 1352 } 1353 1354 static inline void rb_inc_page(struct buffer_page **bpage) 1355 { 1356 struct list_head *p = rb_list_head((*bpage)->list.next); 1357 1358 *bpage = list_entry(p, struct buffer_page, list); 1359 } 1360 1361 static struct buffer_page * 1362 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1363 { 1364 struct buffer_page *head; 1365 struct buffer_page *page; 1366 struct list_head *list; 1367 int i; 1368 1369 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1370 return NULL; 1371 1372 /* sanity check */ 1373 list = cpu_buffer->pages; 1374 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1375 return NULL; 1376 1377 page = head = cpu_buffer->head_page; 1378 /* 1379 * It is possible that the writer moves the header behind 1380 * where we started, and we miss in one loop. 1381 * A second loop should grab the header, but we'll do 1382 * three loops just because I'm paranoid. 1383 */ 1384 for (i = 0; i < 3; i++) { 1385 do { 1386 if (rb_is_head_page(page, page->list.prev)) { 1387 cpu_buffer->head_page = page; 1388 return page; 1389 } 1390 rb_inc_page(&page); 1391 } while (page != head); 1392 } 1393 1394 RB_WARN_ON(cpu_buffer, 1); 1395 1396 return NULL; 1397 } 1398 1399 static bool rb_head_page_replace(struct buffer_page *old, 1400 struct buffer_page *new) 1401 { 1402 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1403 unsigned long val; 1404 1405 val = *ptr & ~RB_FLAG_MASK; 1406 val |= RB_PAGE_HEAD; 1407 1408 return try_cmpxchg(ptr, &val, (unsigned long)&new->list); 1409 } 1410 1411 /* 1412 * rb_tail_page_update - move the tail page forward 1413 */ 1414 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1415 struct buffer_page *tail_page, 1416 struct buffer_page *next_page) 1417 { 1418 unsigned long old_entries; 1419 unsigned long old_write; 1420 1421 /* 1422 * The tail page now needs to be moved forward. 1423 * 1424 * We need to reset the tail page, but without messing 1425 * with possible erasing of data brought in by interrupts 1426 * that have moved the tail page and are currently on it. 1427 * 1428 * We add a counter to the write field to denote this. 1429 */ 1430 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1431 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1432 1433 /* 1434 * Just make sure we have seen our old_write and synchronize 1435 * with any interrupts that come in. 1436 */ 1437 barrier(); 1438 1439 /* 1440 * If the tail page is still the same as what we think 1441 * it is, then it is up to us to update the tail 1442 * pointer. 1443 */ 1444 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1445 /* Zero the write counter */ 1446 unsigned long val = old_write & ~RB_WRITE_MASK; 1447 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1448 1449 /* 1450 * This will only succeed if an interrupt did 1451 * not come in and change it. In which case, we 1452 * do not want to modify it. 1453 * 1454 * We add (void) to let the compiler know that we do not care 1455 * about the return value of these functions. We use the 1456 * cmpxchg to only update if an interrupt did not already 1457 * do it for us. If the cmpxchg fails, we don't care. 1458 */ 1459 (void)local_cmpxchg(&next_page->write, old_write, val); 1460 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1461 1462 /* 1463 * No need to worry about races with clearing out the commit. 1464 * it only can increment when a commit takes place. But that 1465 * only happens in the outer most nested commit. 1466 */ 1467 local_set(&next_page->page->commit, 0); 1468 1469 /* Either we update tail_page or an interrupt does */ 1470 if (try_cmpxchg(&cpu_buffer->tail_page, &tail_page, next_page)) 1471 local_inc(&cpu_buffer->pages_touched); 1472 } 1473 } 1474 1475 static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1476 struct buffer_page *bpage) 1477 { 1478 unsigned long val = (unsigned long)bpage; 1479 1480 RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK); 1481 } 1482 1483 static bool rb_check_links(struct ring_buffer_per_cpu *cpu_buffer, 1484 struct list_head *list) 1485 { 1486 if (RB_WARN_ON(cpu_buffer, 1487 rb_list_head(rb_list_head(list->next)->prev) != list)) 1488 return false; 1489 1490 if (RB_WARN_ON(cpu_buffer, 1491 rb_list_head(rb_list_head(list->prev)->next) != list)) 1492 return false; 1493 1494 return true; 1495 } 1496 1497 /** 1498 * rb_check_pages - integrity check of buffer pages 1499 * @cpu_buffer: CPU buffer with pages to test 1500 * 1501 * As a safety measure we check to make sure the data pages have not 1502 * been corrupted. 1503 */ 1504 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1505 { 1506 struct list_head *head, *tmp; 1507 unsigned long buffer_cnt; 1508 unsigned long flags; 1509 int nr_loops = 0; 1510 1511 /* 1512 * Walk the linked list underpinning the ring buffer and validate all 1513 * its next and prev links. 1514 * 1515 * The check acquires the reader_lock to avoid concurrent processing 1516 * with code that could be modifying the list. However, the lock cannot 1517 * be held for the entire duration of the walk, as this would make the 1518 * time when interrupts are disabled non-deterministic, dependent on the 1519 * ring buffer size. Therefore, the code releases and re-acquires the 1520 * lock after checking each page. The ring_buffer_per_cpu.cnt variable 1521 * is then used to detect if the list was modified while the lock was 1522 * not held, in which case the check needs to be restarted. 1523 * 1524 * The code attempts to perform the check at most three times before 1525 * giving up. This is acceptable because this is only a self-validation 1526 * to detect problems early on. In practice, the list modification 1527 * operations are fairly spaced, and so this check typically succeeds at 1528 * most on the second try. 1529 */ 1530 again: 1531 if (++nr_loops > 3) 1532 return; 1533 1534 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1535 head = rb_list_head(cpu_buffer->pages); 1536 if (!rb_check_links(cpu_buffer, head)) 1537 goto out_locked; 1538 buffer_cnt = cpu_buffer->cnt; 1539 tmp = head; 1540 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1541 1542 while (true) { 1543 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1544 1545 if (buffer_cnt != cpu_buffer->cnt) { 1546 /* The list was updated, try again. */ 1547 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1548 goto again; 1549 } 1550 1551 tmp = rb_list_head(tmp->next); 1552 if (tmp == head) 1553 /* The iteration circled back, all is done. */ 1554 goto out_locked; 1555 1556 if (!rb_check_links(cpu_buffer, tmp)) 1557 goto out_locked; 1558 1559 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1560 } 1561 1562 out_locked: 1563 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1564 } 1565 1566 /* 1567 * Take an address, add the meta data size as well as the array of 1568 * array subbuffer indexes, then align it to a subbuffer size. 1569 * 1570 * This is used to help find the next per cpu subbuffer within a mapped range. 1571 */ 1572 static unsigned long 1573 rb_range_align_subbuf(unsigned long addr, int subbuf_size, int nr_subbufs) 1574 { 1575 addr += sizeof(struct ring_buffer_cpu_meta) + 1576 sizeof(int) * nr_subbufs; 1577 return ALIGN(addr, subbuf_size); 1578 } 1579 1580 /* 1581 * Return the ring_buffer_meta for a given @cpu. 1582 */ 1583 static void *rb_range_meta(struct trace_buffer *buffer, int nr_pages, int cpu) 1584 { 1585 int subbuf_size = buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 1586 struct ring_buffer_cpu_meta *meta; 1587 struct ring_buffer_meta *bmeta; 1588 unsigned long ptr; 1589 int nr_subbufs; 1590 1591 bmeta = buffer->meta; 1592 if (!bmeta) 1593 return NULL; 1594 1595 ptr = (unsigned long)bmeta + bmeta->buffers_offset; 1596 meta = (struct ring_buffer_cpu_meta *)ptr; 1597 1598 /* When nr_pages passed in is zero, the first meta has already been initialized */ 1599 if (!nr_pages) { 1600 nr_subbufs = meta->nr_subbufs; 1601 } else { 1602 /* Include the reader page */ 1603 nr_subbufs = nr_pages + 1; 1604 } 1605 1606 /* 1607 * The first chunk may not be subbuffer aligned, where as 1608 * the rest of the chunks are. 1609 */ 1610 if (cpu) { 1611 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1612 ptr += subbuf_size * nr_subbufs; 1613 1614 /* We can use multiplication to find chunks greater than 1 */ 1615 if (cpu > 1) { 1616 unsigned long size; 1617 unsigned long p; 1618 1619 /* Save the beginning of this CPU chunk */ 1620 p = ptr; 1621 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1622 ptr += subbuf_size * nr_subbufs; 1623 1624 /* Now all chunks after this are the same size */ 1625 size = ptr - p; 1626 ptr += size * (cpu - 2); 1627 } 1628 } 1629 return (void *)ptr; 1630 } 1631 1632 /* Return the start of subbufs given the meta pointer */ 1633 static void *rb_subbufs_from_meta(struct ring_buffer_cpu_meta *meta) 1634 { 1635 int subbuf_size = meta->subbuf_size; 1636 unsigned long ptr; 1637 1638 ptr = (unsigned long)meta; 1639 ptr = rb_range_align_subbuf(ptr, subbuf_size, meta->nr_subbufs); 1640 1641 return (void *)ptr; 1642 } 1643 1644 /* 1645 * Return a specific sub-buffer for a given @cpu defined by @idx. 1646 */ 1647 static void *rb_range_buffer(struct ring_buffer_per_cpu *cpu_buffer, int idx) 1648 { 1649 struct ring_buffer_cpu_meta *meta; 1650 unsigned long ptr; 1651 int subbuf_size; 1652 1653 meta = rb_range_meta(cpu_buffer->buffer, 0, cpu_buffer->cpu); 1654 if (!meta) 1655 return NULL; 1656 1657 if (WARN_ON_ONCE(idx >= meta->nr_subbufs)) 1658 return NULL; 1659 1660 subbuf_size = meta->subbuf_size; 1661 1662 /* Map this buffer to the order that's in meta->buffers[] */ 1663 idx = meta->buffers[idx]; 1664 1665 ptr = (unsigned long)rb_subbufs_from_meta(meta); 1666 1667 ptr += subbuf_size * idx; 1668 if (ptr + subbuf_size > cpu_buffer->buffer->range_addr_end) 1669 return NULL; 1670 1671 return (void *)ptr; 1672 } 1673 1674 /* 1675 * See if the existing memory contains a valid meta section. 1676 * if so, use that, otherwise initialize it. 1677 */ 1678 static bool rb_meta_init(struct trace_buffer *buffer, int scratch_size) 1679 { 1680 unsigned long ptr = buffer->range_addr_start; 1681 struct ring_buffer_meta *bmeta; 1682 unsigned long total_size; 1683 int struct_sizes; 1684 1685 bmeta = (struct ring_buffer_meta *)ptr; 1686 buffer->meta = bmeta; 1687 1688 total_size = buffer->range_addr_end - buffer->range_addr_start; 1689 1690 struct_sizes = sizeof(struct ring_buffer_cpu_meta); 1691 struct_sizes |= sizeof(*bmeta) << 16; 1692 1693 /* The first buffer will start word size after the meta page */ 1694 ptr += sizeof(*bmeta); 1695 ptr = ALIGN(ptr, sizeof(long)); 1696 ptr += scratch_size; 1697 1698 if (bmeta->magic != RING_BUFFER_META_MAGIC) { 1699 pr_info("Ring buffer boot meta mismatch of magic\n"); 1700 goto init; 1701 } 1702 1703 if (bmeta->struct_sizes != struct_sizes) { 1704 pr_info("Ring buffer boot meta mismatch of struct size\n"); 1705 goto init; 1706 } 1707 1708 if (bmeta->total_size != total_size) { 1709 pr_info("Ring buffer boot meta mismatch of total size\n"); 1710 goto init; 1711 } 1712 1713 if (bmeta->buffers_offset > bmeta->total_size) { 1714 pr_info("Ring buffer boot meta mismatch of offset outside of total size\n"); 1715 goto init; 1716 } 1717 1718 if (bmeta->buffers_offset != (void *)ptr - (void *)bmeta) { 1719 pr_info("Ring buffer boot meta mismatch of first buffer offset\n"); 1720 goto init; 1721 } 1722 1723 return true; 1724 1725 init: 1726 bmeta->magic = RING_BUFFER_META_MAGIC; 1727 bmeta->struct_sizes = struct_sizes; 1728 bmeta->total_size = total_size; 1729 bmeta->buffers_offset = (void *)ptr - (void *)bmeta; 1730 1731 /* Zero out the scatch pad */ 1732 memset((void *)bmeta + sizeof(*bmeta), 0, bmeta->buffers_offset - sizeof(*bmeta)); 1733 1734 return false; 1735 } 1736 1737 /* 1738 * See if the existing memory contains valid ring buffer data. 1739 * As the previous kernel must be the same as this kernel, all 1740 * the calculations (size of buffers and number of buffers) 1741 * must be the same. 1742 */ 1743 static bool rb_cpu_meta_valid(struct ring_buffer_cpu_meta *meta, int cpu, 1744 struct trace_buffer *buffer, int nr_pages, 1745 unsigned long *subbuf_mask) 1746 { 1747 int subbuf_size = PAGE_SIZE; 1748 struct buffer_data_page *subbuf; 1749 unsigned long buffers_start; 1750 unsigned long buffers_end; 1751 int i; 1752 1753 if (!subbuf_mask) 1754 return false; 1755 1756 buffers_start = meta->first_buffer; 1757 buffers_end = meta->first_buffer + (subbuf_size * meta->nr_subbufs); 1758 1759 /* Is the head and commit buffers within the range of buffers? */ 1760 if (meta->head_buffer < buffers_start || 1761 meta->head_buffer >= buffers_end) { 1762 pr_info("Ring buffer boot meta [%d] head buffer out of range\n", cpu); 1763 return false; 1764 } 1765 1766 if (meta->commit_buffer < buffers_start || 1767 meta->commit_buffer >= buffers_end) { 1768 pr_info("Ring buffer boot meta [%d] commit buffer out of range\n", cpu); 1769 return false; 1770 } 1771 1772 subbuf = rb_subbufs_from_meta(meta); 1773 1774 bitmap_clear(subbuf_mask, 0, meta->nr_subbufs); 1775 1776 /* Is the meta buffers and the subbufs themselves have correct data? */ 1777 for (i = 0; i < meta->nr_subbufs; i++) { 1778 if (meta->buffers[i] < 0 || 1779 meta->buffers[i] >= meta->nr_subbufs) { 1780 pr_info("Ring buffer boot meta [%d] array out of range\n", cpu); 1781 return false; 1782 } 1783 1784 if ((unsigned)local_read(&subbuf->commit) > subbuf_size) { 1785 pr_info("Ring buffer boot meta [%d] buffer invalid commit\n", cpu); 1786 return false; 1787 } 1788 1789 if (test_bit(meta->buffers[i], subbuf_mask)) { 1790 pr_info("Ring buffer boot meta [%d] array has duplicates\n", cpu); 1791 return false; 1792 } 1793 1794 set_bit(meta->buffers[i], subbuf_mask); 1795 subbuf = (void *)subbuf + subbuf_size; 1796 } 1797 1798 return true; 1799 } 1800 1801 static int rb_meta_subbuf_idx(struct ring_buffer_cpu_meta *meta, void *subbuf); 1802 1803 static int rb_read_data_buffer(struct buffer_data_page *dpage, int tail, int cpu, 1804 unsigned long long *timestamp, u64 *delta_ptr) 1805 { 1806 struct ring_buffer_event *event; 1807 u64 ts, delta; 1808 int events = 0; 1809 int e; 1810 1811 *delta_ptr = 0; 1812 *timestamp = 0; 1813 1814 ts = dpage->time_stamp; 1815 1816 for (e = 0; e < tail; e += rb_event_length(event)) { 1817 1818 event = (struct ring_buffer_event *)(dpage->data + e); 1819 1820 switch (event->type_len) { 1821 1822 case RINGBUF_TYPE_TIME_EXTEND: 1823 delta = rb_event_time_stamp(event); 1824 ts += delta; 1825 break; 1826 1827 case RINGBUF_TYPE_TIME_STAMP: 1828 delta = rb_event_time_stamp(event); 1829 delta = rb_fix_abs_ts(delta, ts); 1830 if (delta < ts) { 1831 *delta_ptr = delta; 1832 *timestamp = ts; 1833 return -1; 1834 } 1835 ts = delta; 1836 break; 1837 1838 case RINGBUF_TYPE_PADDING: 1839 if (event->time_delta == 1) 1840 break; 1841 fallthrough; 1842 case RINGBUF_TYPE_DATA: 1843 events++; 1844 ts += event->time_delta; 1845 break; 1846 1847 default: 1848 return -1; 1849 } 1850 } 1851 *timestamp = ts; 1852 return events; 1853 } 1854 1855 static int rb_validate_buffer(struct buffer_data_page *dpage, int cpu) 1856 { 1857 unsigned long long ts; 1858 u64 delta; 1859 int tail; 1860 1861 tail = local_read(&dpage->commit); 1862 return rb_read_data_buffer(dpage, tail, cpu, &ts, &delta); 1863 } 1864 1865 /* If the meta data has been validated, now validate the events */ 1866 static void rb_meta_validate_events(struct ring_buffer_per_cpu *cpu_buffer) 1867 { 1868 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 1869 struct buffer_page *head_page; 1870 unsigned long entry_bytes = 0; 1871 unsigned long entries = 0; 1872 int ret; 1873 int i; 1874 1875 if (!meta || !meta->head_buffer) 1876 return; 1877 1878 /* Do the reader page first */ 1879 ret = rb_validate_buffer(cpu_buffer->reader_page->page, cpu_buffer->cpu); 1880 if (ret < 0) { 1881 pr_info("Ring buffer reader page is invalid\n"); 1882 goto invalid; 1883 } 1884 entries += ret; 1885 entry_bytes += local_read(&cpu_buffer->reader_page->page->commit); 1886 local_set(&cpu_buffer->reader_page->entries, ret); 1887 1888 head_page = cpu_buffer->head_page; 1889 1890 /* If the commit_buffer is the reader page, update the commit page */ 1891 if (meta->commit_buffer == (unsigned long)cpu_buffer->reader_page->page) { 1892 cpu_buffer->commit_page = cpu_buffer->reader_page; 1893 /* Nothing more to do, the only page is the reader page */ 1894 goto done; 1895 } 1896 1897 /* Iterate until finding the commit page */ 1898 for (i = 0; i < meta->nr_subbufs + 1; i++, rb_inc_page(&head_page)) { 1899 1900 /* Reader page has already been done */ 1901 if (head_page == cpu_buffer->reader_page) 1902 continue; 1903 1904 ret = rb_validate_buffer(head_page->page, cpu_buffer->cpu); 1905 if (ret < 0) { 1906 pr_info("Ring buffer meta [%d] invalid buffer page\n", 1907 cpu_buffer->cpu); 1908 goto invalid; 1909 } 1910 1911 /* If the buffer has content, update pages_touched */ 1912 if (ret) 1913 local_inc(&cpu_buffer->pages_touched); 1914 1915 entries += ret; 1916 entry_bytes += local_read(&head_page->page->commit); 1917 local_set(&cpu_buffer->head_page->entries, ret); 1918 1919 if (head_page == cpu_buffer->commit_page) 1920 break; 1921 } 1922 1923 if (head_page != cpu_buffer->commit_page) { 1924 pr_info("Ring buffer meta [%d] commit page not found\n", 1925 cpu_buffer->cpu); 1926 goto invalid; 1927 } 1928 done: 1929 local_set(&cpu_buffer->entries, entries); 1930 local_set(&cpu_buffer->entries_bytes, entry_bytes); 1931 1932 pr_info("Ring buffer meta [%d] is from previous boot!\n", cpu_buffer->cpu); 1933 return; 1934 1935 invalid: 1936 /* The content of the buffers are invalid, reset the meta data */ 1937 meta->head_buffer = 0; 1938 meta->commit_buffer = 0; 1939 1940 /* Reset the reader page */ 1941 local_set(&cpu_buffer->reader_page->entries, 0); 1942 local_set(&cpu_buffer->reader_page->page->commit, 0); 1943 1944 /* Reset all the subbuffers */ 1945 for (i = 0; i < meta->nr_subbufs - 1; i++, rb_inc_page(&head_page)) { 1946 local_set(&head_page->entries, 0); 1947 local_set(&head_page->page->commit, 0); 1948 } 1949 } 1950 1951 static void rb_range_meta_init(struct trace_buffer *buffer, int nr_pages, int scratch_size) 1952 { 1953 struct ring_buffer_cpu_meta *meta; 1954 unsigned long *subbuf_mask; 1955 unsigned long delta; 1956 void *subbuf; 1957 bool valid = false; 1958 int cpu; 1959 int i; 1960 1961 /* Create a mask to test the subbuf array */ 1962 subbuf_mask = bitmap_alloc(nr_pages + 1, GFP_KERNEL); 1963 /* If subbuf_mask fails to allocate, then rb_meta_valid() will return false */ 1964 1965 if (rb_meta_init(buffer, scratch_size)) 1966 valid = true; 1967 1968 for (cpu = 0; cpu < nr_cpu_ids; cpu++) { 1969 void *next_meta; 1970 1971 meta = rb_range_meta(buffer, nr_pages, cpu); 1972 1973 if (valid && rb_cpu_meta_valid(meta, cpu, buffer, nr_pages, subbuf_mask)) { 1974 /* Make the mappings match the current address */ 1975 subbuf = rb_subbufs_from_meta(meta); 1976 delta = (unsigned long)subbuf - meta->first_buffer; 1977 meta->first_buffer += delta; 1978 meta->head_buffer += delta; 1979 meta->commit_buffer += delta; 1980 continue; 1981 } 1982 1983 if (cpu < nr_cpu_ids - 1) 1984 next_meta = rb_range_meta(buffer, nr_pages, cpu + 1); 1985 else 1986 next_meta = (void *)buffer->range_addr_end; 1987 1988 memset(meta, 0, next_meta - (void *)meta); 1989 1990 meta->nr_subbufs = nr_pages + 1; 1991 meta->subbuf_size = PAGE_SIZE; 1992 1993 subbuf = rb_subbufs_from_meta(meta); 1994 1995 meta->first_buffer = (unsigned long)subbuf; 1996 1997 /* 1998 * The buffers[] array holds the order of the sub-buffers 1999 * that are after the meta data. The sub-buffers may 2000 * be swapped out when read and inserted into a different 2001 * location of the ring buffer. Although their addresses 2002 * remain the same, the buffers[] array contains the 2003 * index into the sub-buffers holding their actual order. 2004 */ 2005 for (i = 0; i < meta->nr_subbufs; i++) { 2006 meta->buffers[i] = i; 2007 rb_init_page(subbuf); 2008 subbuf += meta->subbuf_size; 2009 } 2010 } 2011 bitmap_free(subbuf_mask); 2012 } 2013 2014 static void *rbm_start(struct seq_file *m, loff_t *pos) 2015 { 2016 struct ring_buffer_per_cpu *cpu_buffer = m->private; 2017 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2018 unsigned long val; 2019 2020 if (!meta) 2021 return NULL; 2022 2023 if (*pos > meta->nr_subbufs) 2024 return NULL; 2025 2026 val = *pos; 2027 val++; 2028 2029 return (void *)val; 2030 } 2031 2032 static void *rbm_next(struct seq_file *m, void *v, loff_t *pos) 2033 { 2034 (*pos)++; 2035 2036 return rbm_start(m, pos); 2037 } 2038 2039 static int rbm_show(struct seq_file *m, void *v) 2040 { 2041 struct ring_buffer_per_cpu *cpu_buffer = m->private; 2042 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2043 unsigned long val = (unsigned long)v; 2044 2045 if (val == 1) { 2046 seq_printf(m, "head_buffer: %d\n", 2047 rb_meta_subbuf_idx(meta, (void *)meta->head_buffer)); 2048 seq_printf(m, "commit_buffer: %d\n", 2049 rb_meta_subbuf_idx(meta, (void *)meta->commit_buffer)); 2050 seq_printf(m, "subbuf_size: %d\n", meta->subbuf_size); 2051 seq_printf(m, "nr_subbufs: %d\n", meta->nr_subbufs); 2052 return 0; 2053 } 2054 2055 val -= 2; 2056 seq_printf(m, "buffer[%ld]: %d\n", val, meta->buffers[val]); 2057 2058 return 0; 2059 } 2060 2061 static void rbm_stop(struct seq_file *m, void *p) 2062 { 2063 } 2064 2065 static const struct seq_operations rb_meta_seq_ops = { 2066 .start = rbm_start, 2067 .next = rbm_next, 2068 .show = rbm_show, 2069 .stop = rbm_stop, 2070 }; 2071 2072 int ring_buffer_meta_seq_init(struct file *file, struct trace_buffer *buffer, int cpu) 2073 { 2074 struct seq_file *m; 2075 int ret; 2076 2077 ret = seq_open(file, &rb_meta_seq_ops); 2078 if (ret) 2079 return ret; 2080 2081 m = file->private_data; 2082 m->private = buffer->buffers[cpu]; 2083 2084 return 0; 2085 } 2086 2087 /* Map the buffer_pages to the previous head and commit pages */ 2088 static void rb_meta_buffer_update(struct ring_buffer_per_cpu *cpu_buffer, 2089 struct buffer_page *bpage) 2090 { 2091 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2092 2093 if (meta->head_buffer == (unsigned long)bpage->page) 2094 cpu_buffer->head_page = bpage; 2095 2096 if (meta->commit_buffer == (unsigned long)bpage->page) { 2097 cpu_buffer->commit_page = bpage; 2098 cpu_buffer->tail_page = bpage; 2099 } 2100 } 2101 2102 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2103 long nr_pages, struct list_head *pages) 2104 { 2105 struct trace_buffer *buffer = cpu_buffer->buffer; 2106 struct ring_buffer_cpu_meta *meta = NULL; 2107 struct buffer_page *bpage, *tmp; 2108 bool user_thread = current->mm != NULL; 2109 gfp_t mflags; 2110 long i; 2111 2112 /* 2113 * Check if the available memory is there first. 2114 * Note, si_mem_available() only gives us a rough estimate of available 2115 * memory. It may not be accurate. But we don't care, we just want 2116 * to prevent doing any allocation when it is obvious that it is 2117 * not going to succeed. 2118 */ 2119 i = si_mem_available(); 2120 if (i < nr_pages) 2121 return -ENOMEM; 2122 2123 /* 2124 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 2125 * gracefully without invoking oom-killer and the system is not 2126 * destabilized. 2127 */ 2128 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL; 2129 2130 /* 2131 * If a user thread allocates too much, and si_mem_available() 2132 * reports there's enough memory, even though there is not. 2133 * Make sure the OOM killer kills this thread. This can happen 2134 * even with RETRY_MAYFAIL because another task may be doing 2135 * an allocation after this task has taken all memory. 2136 * This is the task the OOM killer needs to take out during this 2137 * loop, even if it was triggered by an allocation somewhere else. 2138 */ 2139 if (user_thread) 2140 set_current_oom_origin(); 2141 2142 if (buffer->range_addr_start) 2143 meta = rb_range_meta(buffer, nr_pages, cpu_buffer->cpu); 2144 2145 for (i = 0; i < nr_pages; i++) { 2146 struct page *page; 2147 2148 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 2149 mflags, cpu_to_node(cpu_buffer->cpu)); 2150 if (!bpage) 2151 goto free_pages; 2152 2153 rb_check_bpage(cpu_buffer, bpage); 2154 2155 /* 2156 * Append the pages as for mapped buffers we want to keep 2157 * the order 2158 */ 2159 list_add_tail(&bpage->list, pages); 2160 2161 if (meta) { 2162 /* A range was given. Use that for the buffer page */ 2163 bpage->page = rb_range_buffer(cpu_buffer, i + 1); 2164 if (!bpage->page) 2165 goto free_pages; 2166 /* If this is valid from a previous boot */ 2167 if (meta->head_buffer) 2168 rb_meta_buffer_update(cpu_buffer, bpage); 2169 bpage->range = 1; 2170 bpage->id = i + 1; 2171 } else { 2172 page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu), 2173 mflags | __GFP_COMP | __GFP_ZERO, 2174 cpu_buffer->buffer->subbuf_order); 2175 if (!page) 2176 goto free_pages; 2177 bpage->page = page_address(page); 2178 rb_init_page(bpage->page); 2179 } 2180 bpage->order = cpu_buffer->buffer->subbuf_order; 2181 2182 if (user_thread && fatal_signal_pending(current)) 2183 goto free_pages; 2184 } 2185 if (user_thread) 2186 clear_current_oom_origin(); 2187 2188 return 0; 2189 2190 free_pages: 2191 list_for_each_entry_safe(bpage, tmp, pages, list) { 2192 list_del_init(&bpage->list); 2193 free_buffer_page(bpage); 2194 } 2195 if (user_thread) 2196 clear_current_oom_origin(); 2197 2198 return -ENOMEM; 2199 } 2200 2201 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2202 unsigned long nr_pages) 2203 { 2204 LIST_HEAD(pages); 2205 2206 WARN_ON(!nr_pages); 2207 2208 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages)) 2209 return -ENOMEM; 2210 2211 /* 2212 * The ring buffer page list is a circular list that does not 2213 * start and end with a list head. All page list items point to 2214 * other pages. 2215 */ 2216 cpu_buffer->pages = pages.next; 2217 list_del(&pages); 2218 2219 cpu_buffer->nr_pages = nr_pages; 2220 2221 rb_check_pages(cpu_buffer); 2222 2223 return 0; 2224 } 2225 2226 static struct ring_buffer_per_cpu * 2227 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) 2228 { 2229 struct ring_buffer_per_cpu *cpu_buffer __free(kfree) = NULL; 2230 struct ring_buffer_cpu_meta *meta; 2231 struct buffer_page *bpage; 2232 struct page *page; 2233 int ret; 2234 2235 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 2236 GFP_KERNEL, cpu_to_node(cpu)); 2237 if (!cpu_buffer) 2238 return NULL; 2239 2240 cpu_buffer->cpu = cpu; 2241 cpu_buffer->buffer = buffer; 2242 raw_spin_lock_init(&cpu_buffer->reader_lock); 2243 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 2244 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 2245 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 2246 init_completion(&cpu_buffer->update_done); 2247 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 2248 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 2249 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 2250 mutex_init(&cpu_buffer->mapping_lock); 2251 2252 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 2253 GFP_KERNEL, cpu_to_node(cpu)); 2254 if (!bpage) 2255 return NULL; 2256 2257 rb_check_bpage(cpu_buffer, bpage); 2258 2259 cpu_buffer->reader_page = bpage; 2260 2261 if (buffer->range_addr_start) { 2262 /* 2263 * Range mapped buffers have the same restrictions as memory 2264 * mapped ones do. 2265 */ 2266 cpu_buffer->mapped = 1; 2267 cpu_buffer->ring_meta = rb_range_meta(buffer, nr_pages, cpu); 2268 bpage->page = rb_range_buffer(cpu_buffer, 0); 2269 if (!bpage->page) 2270 goto fail_free_reader; 2271 if (cpu_buffer->ring_meta->head_buffer) 2272 rb_meta_buffer_update(cpu_buffer, bpage); 2273 bpage->range = 1; 2274 } else { 2275 page = alloc_pages_node(cpu_to_node(cpu), 2276 GFP_KERNEL | __GFP_COMP | __GFP_ZERO, 2277 cpu_buffer->buffer->subbuf_order); 2278 if (!page) 2279 goto fail_free_reader; 2280 bpage->page = page_address(page); 2281 rb_init_page(bpage->page); 2282 } 2283 2284 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 2285 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2286 2287 ret = rb_allocate_pages(cpu_buffer, nr_pages); 2288 if (ret < 0) 2289 goto fail_free_reader; 2290 2291 rb_meta_validate_events(cpu_buffer); 2292 2293 /* If the boot meta was valid then this has already been updated */ 2294 meta = cpu_buffer->ring_meta; 2295 if (!meta || !meta->head_buffer || 2296 !cpu_buffer->head_page || !cpu_buffer->commit_page || !cpu_buffer->tail_page) { 2297 if (meta && meta->head_buffer && 2298 (cpu_buffer->head_page || cpu_buffer->commit_page || cpu_buffer->tail_page)) { 2299 pr_warn("Ring buffer meta buffers not all mapped\n"); 2300 if (!cpu_buffer->head_page) 2301 pr_warn(" Missing head_page\n"); 2302 if (!cpu_buffer->commit_page) 2303 pr_warn(" Missing commit_page\n"); 2304 if (!cpu_buffer->tail_page) 2305 pr_warn(" Missing tail_page\n"); 2306 } 2307 2308 cpu_buffer->head_page 2309 = list_entry(cpu_buffer->pages, struct buffer_page, list); 2310 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 2311 2312 rb_head_page_activate(cpu_buffer); 2313 2314 if (cpu_buffer->ring_meta) 2315 meta->commit_buffer = meta->head_buffer; 2316 } else { 2317 /* The valid meta buffer still needs to activate the head page */ 2318 rb_head_page_activate(cpu_buffer); 2319 } 2320 2321 return_ptr(cpu_buffer); 2322 2323 fail_free_reader: 2324 free_buffer_page(cpu_buffer->reader_page); 2325 2326 return NULL; 2327 } 2328 2329 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 2330 { 2331 struct list_head *head = cpu_buffer->pages; 2332 struct buffer_page *bpage, *tmp; 2333 2334 irq_work_sync(&cpu_buffer->irq_work.work); 2335 2336 free_buffer_page(cpu_buffer->reader_page); 2337 2338 if (head) { 2339 rb_head_page_deactivate(cpu_buffer); 2340 2341 list_for_each_entry_safe(bpage, tmp, head, list) { 2342 list_del_init(&bpage->list); 2343 free_buffer_page(bpage); 2344 } 2345 bpage = list_entry(head, struct buffer_page, list); 2346 free_buffer_page(bpage); 2347 } 2348 2349 free_page((unsigned long)cpu_buffer->free_page); 2350 2351 kfree(cpu_buffer); 2352 } 2353 2354 static struct trace_buffer *alloc_buffer(unsigned long size, unsigned flags, 2355 int order, unsigned long start, 2356 unsigned long end, 2357 unsigned long scratch_size, 2358 struct lock_class_key *key) 2359 { 2360 struct trace_buffer *buffer __free(kfree) = NULL; 2361 long nr_pages; 2362 int subbuf_size; 2363 int bsize; 2364 int cpu; 2365 int ret; 2366 2367 /* keep it in its own cache line */ 2368 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 2369 GFP_KERNEL); 2370 if (!buffer) 2371 return NULL; 2372 2373 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 2374 return NULL; 2375 2376 buffer->subbuf_order = order; 2377 subbuf_size = (PAGE_SIZE << order); 2378 buffer->subbuf_size = subbuf_size - BUF_PAGE_HDR_SIZE; 2379 2380 /* Max payload is buffer page size - header (8bytes) */ 2381 buffer->max_data_size = buffer->subbuf_size - (sizeof(u32) * 2); 2382 2383 buffer->flags = flags; 2384 buffer->clock = trace_clock_local; 2385 buffer->reader_lock_key = key; 2386 2387 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 2388 init_waitqueue_head(&buffer->irq_work.waiters); 2389 2390 buffer->cpus = nr_cpu_ids; 2391 2392 bsize = sizeof(void *) * nr_cpu_ids; 2393 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 2394 GFP_KERNEL); 2395 if (!buffer->buffers) 2396 goto fail_free_cpumask; 2397 2398 /* If start/end are specified, then that overrides size */ 2399 if (start && end) { 2400 unsigned long buffers_start; 2401 unsigned long ptr; 2402 int n; 2403 2404 /* Make sure that start is word aligned */ 2405 start = ALIGN(start, sizeof(long)); 2406 2407 /* scratch_size needs to be aligned too */ 2408 scratch_size = ALIGN(scratch_size, sizeof(long)); 2409 2410 /* Subtract the buffer meta data and word aligned */ 2411 buffers_start = start + sizeof(struct ring_buffer_cpu_meta); 2412 buffers_start = ALIGN(buffers_start, sizeof(long)); 2413 buffers_start += scratch_size; 2414 2415 /* Calculate the size for the per CPU data */ 2416 size = end - buffers_start; 2417 size = size / nr_cpu_ids; 2418 2419 /* 2420 * The number of sub-buffers (nr_pages) is determined by the 2421 * total size allocated minus the meta data size. 2422 * Then that is divided by the number of per CPU buffers 2423 * needed, plus account for the integer array index that 2424 * will be appended to the meta data. 2425 */ 2426 nr_pages = (size - sizeof(struct ring_buffer_cpu_meta)) / 2427 (subbuf_size + sizeof(int)); 2428 /* Need at least two pages plus the reader page */ 2429 if (nr_pages < 3) 2430 goto fail_free_buffers; 2431 2432 again: 2433 /* Make sure that the size fits aligned */ 2434 for (n = 0, ptr = buffers_start; n < nr_cpu_ids; n++) { 2435 ptr += sizeof(struct ring_buffer_cpu_meta) + 2436 sizeof(int) * nr_pages; 2437 ptr = ALIGN(ptr, subbuf_size); 2438 ptr += subbuf_size * nr_pages; 2439 } 2440 if (ptr > end) { 2441 if (nr_pages <= 3) 2442 goto fail_free_buffers; 2443 nr_pages--; 2444 goto again; 2445 } 2446 2447 /* nr_pages should not count the reader page */ 2448 nr_pages--; 2449 buffer->range_addr_start = start; 2450 buffer->range_addr_end = end; 2451 2452 rb_range_meta_init(buffer, nr_pages, scratch_size); 2453 } else { 2454 2455 /* need at least two pages */ 2456 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2457 if (nr_pages < 2) 2458 nr_pages = 2; 2459 } 2460 2461 cpu = raw_smp_processor_id(); 2462 cpumask_set_cpu(cpu, buffer->cpumask); 2463 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 2464 if (!buffer->buffers[cpu]) 2465 goto fail_free_buffers; 2466 2467 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2468 if (ret < 0) 2469 goto fail_free_buffers; 2470 2471 mutex_init(&buffer->mutex); 2472 2473 return_ptr(buffer); 2474 2475 fail_free_buffers: 2476 for_each_buffer_cpu(buffer, cpu) { 2477 if (buffer->buffers[cpu]) 2478 rb_free_cpu_buffer(buffer->buffers[cpu]); 2479 } 2480 kfree(buffer->buffers); 2481 2482 fail_free_cpumask: 2483 free_cpumask_var(buffer->cpumask); 2484 2485 return NULL; 2486 } 2487 2488 /** 2489 * __ring_buffer_alloc - allocate a new ring_buffer 2490 * @size: the size in bytes per cpu that is needed. 2491 * @flags: attributes to set for the ring buffer. 2492 * @key: ring buffer reader_lock_key. 2493 * 2494 * Currently the only flag that is available is the RB_FL_OVERWRITE 2495 * flag. This flag means that the buffer will overwrite old data 2496 * when the buffer wraps. If this flag is not set, the buffer will 2497 * drop data when the tail hits the head. 2498 */ 2499 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 2500 struct lock_class_key *key) 2501 { 2502 /* Default buffer page size - one system page */ 2503 return alloc_buffer(size, flags, 0, 0, 0, 0, key); 2504 2505 } 2506 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 2507 2508 /** 2509 * __ring_buffer_alloc_range - allocate a new ring_buffer from existing memory 2510 * @size: the size in bytes per cpu that is needed. 2511 * @flags: attributes to set for the ring buffer. 2512 * @order: sub-buffer order 2513 * @start: start of allocated range 2514 * @range_size: size of allocated range 2515 * @scratch_size: size of scratch area (for preallocated memory buffers) 2516 * @key: ring buffer reader_lock_key. 2517 * 2518 * Currently the only flag that is available is the RB_FL_OVERWRITE 2519 * flag. This flag means that the buffer will overwrite old data 2520 * when the buffer wraps. If this flag is not set, the buffer will 2521 * drop data when the tail hits the head. 2522 */ 2523 struct trace_buffer *__ring_buffer_alloc_range(unsigned long size, unsigned flags, 2524 int order, unsigned long start, 2525 unsigned long range_size, 2526 unsigned long scratch_size, 2527 struct lock_class_key *key) 2528 { 2529 return alloc_buffer(size, flags, order, start, start + range_size, 2530 scratch_size, key); 2531 } 2532 2533 void *ring_buffer_meta_scratch(struct trace_buffer *buffer, unsigned int *size) 2534 { 2535 struct ring_buffer_meta *meta; 2536 void *ptr; 2537 2538 if (!buffer || !buffer->meta) 2539 return NULL; 2540 2541 meta = buffer->meta; 2542 2543 ptr = (void *)ALIGN((unsigned long)meta + sizeof(*meta), sizeof(long)); 2544 2545 if (size) 2546 *size = (void *)meta + meta->buffers_offset - ptr; 2547 2548 return ptr; 2549 } 2550 2551 /** 2552 * ring_buffer_free - free a ring buffer. 2553 * @buffer: the buffer to free. 2554 */ 2555 void 2556 ring_buffer_free(struct trace_buffer *buffer) 2557 { 2558 int cpu; 2559 2560 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2561 2562 irq_work_sync(&buffer->irq_work.work); 2563 2564 for_each_buffer_cpu(buffer, cpu) 2565 rb_free_cpu_buffer(buffer->buffers[cpu]); 2566 2567 kfree(buffer->buffers); 2568 free_cpumask_var(buffer->cpumask); 2569 2570 kfree(buffer); 2571 } 2572 EXPORT_SYMBOL_GPL(ring_buffer_free); 2573 2574 void ring_buffer_set_clock(struct trace_buffer *buffer, 2575 u64 (*clock)(void)) 2576 { 2577 buffer->clock = clock; 2578 } 2579 2580 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs) 2581 { 2582 buffer->time_stamp_abs = abs; 2583 } 2584 2585 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer) 2586 { 2587 return buffer->time_stamp_abs; 2588 } 2589 2590 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 2591 { 2592 return local_read(&bpage->entries) & RB_WRITE_MASK; 2593 } 2594 2595 static inline unsigned long rb_page_write(struct buffer_page *bpage) 2596 { 2597 return local_read(&bpage->write) & RB_WRITE_MASK; 2598 } 2599 2600 static bool 2601 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 2602 { 2603 struct list_head *tail_page, *to_remove, *next_page; 2604 struct buffer_page *to_remove_page, *tmp_iter_page; 2605 struct buffer_page *last_page, *first_page; 2606 unsigned long nr_removed; 2607 unsigned long head_bit; 2608 int page_entries; 2609 2610 head_bit = 0; 2611 2612 raw_spin_lock_irq(&cpu_buffer->reader_lock); 2613 atomic_inc(&cpu_buffer->record_disabled); 2614 /* 2615 * We don't race with the readers since we have acquired the reader 2616 * lock. We also don't race with writers after disabling recording. 2617 * This makes it easy to figure out the first and the last page to be 2618 * removed from the list. We unlink all the pages in between including 2619 * the first and last pages. This is done in a busy loop so that we 2620 * lose the least number of traces. 2621 * The pages are freed after we restart recording and unlock readers. 2622 */ 2623 tail_page = &cpu_buffer->tail_page->list; 2624 2625 /* 2626 * tail page might be on reader page, we remove the next page 2627 * from the ring buffer 2628 */ 2629 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 2630 tail_page = rb_list_head(tail_page->next); 2631 to_remove = tail_page; 2632 2633 /* start of pages to remove */ 2634 first_page = list_entry(rb_list_head(to_remove->next), 2635 struct buffer_page, list); 2636 2637 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 2638 to_remove = rb_list_head(to_remove)->next; 2639 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 2640 } 2641 /* Read iterators need to reset themselves when some pages removed */ 2642 cpu_buffer->pages_removed += nr_removed; 2643 2644 next_page = rb_list_head(to_remove)->next; 2645 2646 /* 2647 * Now we remove all pages between tail_page and next_page. 2648 * Make sure that we have head_bit value preserved for the 2649 * next page 2650 */ 2651 tail_page->next = (struct list_head *)((unsigned long)next_page | 2652 head_bit); 2653 next_page = rb_list_head(next_page); 2654 next_page->prev = tail_page; 2655 2656 /* make sure pages points to a valid page in the ring buffer */ 2657 cpu_buffer->pages = next_page; 2658 cpu_buffer->cnt++; 2659 2660 /* update head page */ 2661 if (head_bit) 2662 cpu_buffer->head_page = list_entry(next_page, 2663 struct buffer_page, list); 2664 2665 /* pages are removed, resume tracing and then free the pages */ 2666 atomic_dec(&cpu_buffer->record_disabled); 2667 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 2668 2669 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 2670 2671 /* last buffer page to remove */ 2672 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 2673 list); 2674 tmp_iter_page = first_page; 2675 2676 do { 2677 cond_resched(); 2678 2679 to_remove_page = tmp_iter_page; 2680 rb_inc_page(&tmp_iter_page); 2681 2682 /* update the counters */ 2683 page_entries = rb_page_entries(to_remove_page); 2684 if (page_entries) { 2685 /* 2686 * If something was added to this page, it was full 2687 * since it is not the tail page. So we deduct the 2688 * bytes consumed in ring buffer from here. 2689 * Increment overrun to account for the lost events. 2690 */ 2691 local_add(page_entries, &cpu_buffer->overrun); 2692 local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes); 2693 local_inc(&cpu_buffer->pages_lost); 2694 } 2695 2696 /* 2697 * We have already removed references to this list item, just 2698 * free up the buffer_page and its page 2699 */ 2700 free_buffer_page(to_remove_page); 2701 nr_removed--; 2702 2703 } while (to_remove_page != last_page); 2704 2705 RB_WARN_ON(cpu_buffer, nr_removed); 2706 2707 return nr_removed == 0; 2708 } 2709 2710 static bool 2711 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 2712 { 2713 struct list_head *pages = &cpu_buffer->new_pages; 2714 unsigned long flags; 2715 bool success; 2716 int retries; 2717 2718 /* Can be called at early boot up, where interrupts must not been enabled */ 2719 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2720 /* 2721 * We are holding the reader lock, so the reader page won't be swapped 2722 * in the ring buffer. Now we are racing with the writer trying to 2723 * move head page and the tail page. 2724 * We are going to adapt the reader page update process where: 2725 * 1. We first splice the start and end of list of new pages between 2726 * the head page and its previous page. 2727 * 2. We cmpxchg the prev_page->next to point from head page to the 2728 * start of new pages list. 2729 * 3. Finally, we update the head->prev to the end of new list. 2730 * 2731 * We will try this process 10 times, to make sure that we don't keep 2732 * spinning. 2733 */ 2734 retries = 10; 2735 success = false; 2736 while (retries--) { 2737 struct list_head *head_page, *prev_page; 2738 struct list_head *last_page, *first_page; 2739 struct list_head *head_page_with_bit; 2740 struct buffer_page *hpage = rb_set_head_page(cpu_buffer); 2741 2742 if (!hpage) 2743 break; 2744 head_page = &hpage->list; 2745 prev_page = head_page->prev; 2746 2747 first_page = pages->next; 2748 last_page = pages->prev; 2749 2750 head_page_with_bit = (struct list_head *) 2751 ((unsigned long)head_page | RB_PAGE_HEAD); 2752 2753 last_page->next = head_page_with_bit; 2754 first_page->prev = prev_page; 2755 2756 /* caution: head_page_with_bit gets updated on cmpxchg failure */ 2757 if (try_cmpxchg(&prev_page->next, 2758 &head_page_with_bit, first_page)) { 2759 /* 2760 * yay, we replaced the page pointer to our new list, 2761 * now, we just have to update to head page's prev 2762 * pointer to point to end of list 2763 */ 2764 head_page->prev = last_page; 2765 cpu_buffer->cnt++; 2766 success = true; 2767 break; 2768 } 2769 } 2770 2771 if (success) 2772 INIT_LIST_HEAD(pages); 2773 /* 2774 * If we weren't successful in adding in new pages, warn and stop 2775 * tracing 2776 */ 2777 RB_WARN_ON(cpu_buffer, !success); 2778 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2779 2780 /* free pages if they weren't inserted */ 2781 if (!success) { 2782 struct buffer_page *bpage, *tmp; 2783 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2784 list) { 2785 list_del_init(&bpage->list); 2786 free_buffer_page(bpage); 2787 } 2788 } 2789 return success; 2790 } 2791 2792 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 2793 { 2794 bool success; 2795 2796 if (cpu_buffer->nr_pages_to_update > 0) 2797 success = rb_insert_pages(cpu_buffer); 2798 else 2799 success = rb_remove_pages(cpu_buffer, 2800 -cpu_buffer->nr_pages_to_update); 2801 2802 if (success) 2803 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 2804 } 2805 2806 static void update_pages_handler(struct work_struct *work) 2807 { 2808 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 2809 struct ring_buffer_per_cpu, update_pages_work); 2810 rb_update_pages(cpu_buffer); 2811 complete(&cpu_buffer->update_done); 2812 } 2813 2814 /** 2815 * ring_buffer_resize - resize the ring buffer 2816 * @buffer: the buffer to resize. 2817 * @size: the new size. 2818 * @cpu_id: the cpu buffer to resize 2819 * 2820 * Minimum size is 2 * buffer->subbuf_size. 2821 * 2822 * Returns 0 on success and < 0 on failure. 2823 */ 2824 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, 2825 int cpu_id) 2826 { 2827 struct ring_buffer_per_cpu *cpu_buffer; 2828 unsigned long nr_pages; 2829 int cpu, err; 2830 2831 /* 2832 * Always succeed at resizing a non-existent buffer: 2833 */ 2834 if (!buffer) 2835 return 0; 2836 2837 /* Make sure the requested buffer exists */ 2838 if (cpu_id != RING_BUFFER_ALL_CPUS && 2839 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 2840 return 0; 2841 2842 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2843 2844 /* we need a minimum of two pages */ 2845 if (nr_pages < 2) 2846 nr_pages = 2; 2847 2848 /* 2849 * Keep CPUs from coming online while resizing to synchronize 2850 * with new per CPU buffers being created. 2851 */ 2852 guard(cpus_read_lock)(); 2853 2854 /* prevent another thread from changing buffer sizes */ 2855 mutex_lock(&buffer->mutex); 2856 atomic_inc(&buffer->resizing); 2857 2858 if (cpu_id == RING_BUFFER_ALL_CPUS) { 2859 /* 2860 * Don't succeed if resizing is disabled, as a reader might be 2861 * manipulating the ring buffer and is expecting a sane state while 2862 * this is true. 2863 */ 2864 for_each_buffer_cpu(buffer, cpu) { 2865 cpu_buffer = buffer->buffers[cpu]; 2866 if (atomic_read(&cpu_buffer->resize_disabled)) { 2867 err = -EBUSY; 2868 goto out_err_unlock; 2869 } 2870 } 2871 2872 /* calculate the pages to update */ 2873 for_each_buffer_cpu(buffer, cpu) { 2874 cpu_buffer = buffer->buffers[cpu]; 2875 2876 cpu_buffer->nr_pages_to_update = nr_pages - 2877 cpu_buffer->nr_pages; 2878 /* 2879 * nothing more to do for removing pages or no update 2880 */ 2881 if (cpu_buffer->nr_pages_to_update <= 0) 2882 continue; 2883 /* 2884 * to add pages, make sure all new pages can be 2885 * allocated without receiving ENOMEM 2886 */ 2887 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2888 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2889 &cpu_buffer->new_pages)) { 2890 /* not enough memory for new pages */ 2891 err = -ENOMEM; 2892 goto out_err; 2893 } 2894 2895 cond_resched(); 2896 } 2897 2898 /* 2899 * Fire off all the required work handlers 2900 * We can't schedule on offline CPUs, but it's not necessary 2901 * since we can change their buffer sizes without any race. 2902 */ 2903 for_each_buffer_cpu(buffer, cpu) { 2904 cpu_buffer = buffer->buffers[cpu]; 2905 if (!cpu_buffer->nr_pages_to_update) 2906 continue; 2907 2908 /* Can't run something on an offline CPU. */ 2909 if (!cpu_online(cpu)) { 2910 rb_update_pages(cpu_buffer); 2911 cpu_buffer->nr_pages_to_update = 0; 2912 } else { 2913 /* Run directly if possible. */ 2914 migrate_disable(); 2915 if (cpu != smp_processor_id()) { 2916 migrate_enable(); 2917 schedule_work_on(cpu, 2918 &cpu_buffer->update_pages_work); 2919 } else { 2920 update_pages_handler(&cpu_buffer->update_pages_work); 2921 migrate_enable(); 2922 } 2923 } 2924 } 2925 2926 /* wait for all the updates to complete */ 2927 for_each_buffer_cpu(buffer, cpu) { 2928 cpu_buffer = buffer->buffers[cpu]; 2929 if (!cpu_buffer->nr_pages_to_update) 2930 continue; 2931 2932 if (cpu_online(cpu)) 2933 wait_for_completion(&cpu_buffer->update_done); 2934 cpu_buffer->nr_pages_to_update = 0; 2935 } 2936 2937 } else { 2938 cpu_buffer = buffer->buffers[cpu_id]; 2939 2940 if (nr_pages == cpu_buffer->nr_pages) 2941 goto out; 2942 2943 /* 2944 * Don't succeed if resizing is disabled, as a reader might be 2945 * manipulating the ring buffer and is expecting a sane state while 2946 * this is true. 2947 */ 2948 if (atomic_read(&cpu_buffer->resize_disabled)) { 2949 err = -EBUSY; 2950 goto out_err_unlock; 2951 } 2952 2953 cpu_buffer->nr_pages_to_update = nr_pages - 2954 cpu_buffer->nr_pages; 2955 2956 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2957 if (cpu_buffer->nr_pages_to_update > 0 && 2958 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2959 &cpu_buffer->new_pages)) { 2960 err = -ENOMEM; 2961 goto out_err; 2962 } 2963 2964 /* Can't run something on an offline CPU. */ 2965 if (!cpu_online(cpu_id)) 2966 rb_update_pages(cpu_buffer); 2967 else { 2968 /* Run directly if possible. */ 2969 migrate_disable(); 2970 if (cpu_id == smp_processor_id()) { 2971 rb_update_pages(cpu_buffer); 2972 migrate_enable(); 2973 } else { 2974 migrate_enable(); 2975 schedule_work_on(cpu_id, 2976 &cpu_buffer->update_pages_work); 2977 wait_for_completion(&cpu_buffer->update_done); 2978 } 2979 } 2980 2981 cpu_buffer->nr_pages_to_update = 0; 2982 } 2983 2984 out: 2985 /* 2986 * The ring buffer resize can happen with the ring buffer 2987 * enabled, so that the update disturbs the tracing as little 2988 * as possible. But if the buffer is disabled, we do not need 2989 * to worry about that, and we can take the time to verify 2990 * that the buffer is not corrupt. 2991 */ 2992 if (atomic_read(&buffer->record_disabled)) { 2993 atomic_inc(&buffer->record_disabled); 2994 /* 2995 * Even though the buffer was disabled, we must make sure 2996 * that it is truly disabled before calling rb_check_pages. 2997 * There could have been a race between checking 2998 * record_disable and incrementing it. 2999 */ 3000 synchronize_rcu(); 3001 for_each_buffer_cpu(buffer, cpu) { 3002 cpu_buffer = buffer->buffers[cpu]; 3003 rb_check_pages(cpu_buffer); 3004 } 3005 atomic_dec(&buffer->record_disabled); 3006 } 3007 3008 atomic_dec(&buffer->resizing); 3009 mutex_unlock(&buffer->mutex); 3010 return 0; 3011 3012 out_err: 3013 for_each_buffer_cpu(buffer, cpu) { 3014 struct buffer_page *bpage, *tmp; 3015 3016 cpu_buffer = buffer->buffers[cpu]; 3017 cpu_buffer->nr_pages_to_update = 0; 3018 3019 if (list_empty(&cpu_buffer->new_pages)) 3020 continue; 3021 3022 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 3023 list) { 3024 list_del_init(&bpage->list); 3025 free_buffer_page(bpage); 3026 } 3027 } 3028 out_err_unlock: 3029 atomic_dec(&buffer->resizing); 3030 mutex_unlock(&buffer->mutex); 3031 return err; 3032 } 3033 EXPORT_SYMBOL_GPL(ring_buffer_resize); 3034 3035 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val) 3036 { 3037 mutex_lock(&buffer->mutex); 3038 if (val) 3039 buffer->flags |= RB_FL_OVERWRITE; 3040 else 3041 buffer->flags &= ~RB_FL_OVERWRITE; 3042 mutex_unlock(&buffer->mutex); 3043 } 3044 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 3045 3046 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 3047 { 3048 return bpage->page->data + index; 3049 } 3050 3051 static __always_inline struct ring_buffer_event * 3052 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 3053 { 3054 return __rb_page_index(cpu_buffer->reader_page, 3055 cpu_buffer->reader_page->read); 3056 } 3057 3058 static struct ring_buffer_event * 3059 rb_iter_head_event(struct ring_buffer_iter *iter) 3060 { 3061 struct ring_buffer_event *event; 3062 struct buffer_page *iter_head_page = iter->head_page; 3063 unsigned long commit; 3064 unsigned length; 3065 3066 if (iter->head != iter->next_event) 3067 return iter->event; 3068 3069 /* 3070 * When the writer goes across pages, it issues a cmpxchg which 3071 * is a mb(), which will synchronize with the rmb here. 3072 * (see rb_tail_page_update() and __rb_reserve_next()) 3073 */ 3074 commit = rb_page_commit(iter_head_page); 3075 smp_rmb(); 3076 3077 /* An event needs to be at least 8 bytes in size */ 3078 if (iter->head > commit - 8) 3079 goto reset; 3080 3081 event = __rb_page_index(iter_head_page, iter->head); 3082 length = rb_event_length(event); 3083 3084 /* 3085 * READ_ONCE() doesn't work on functions and we don't want the 3086 * compiler doing any crazy optimizations with length. 3087 */ 3088 barrier(); 3089 3090 if ((iter->head + length) > commit || length > iter->event_size) 3091 /* Writer corrupted the read? */ 3092 goto reset; 3093 3094 memcpy(iter->event, event, length); 3095 /* 3096 * If the page stamp is still the same after this rmb() then the 3097 * event was safely copied without the writer entering the page. 3098 */ 3099 smp_rmb(); 3100 3101 /* Make sure the page didn't change since we read this */ 3102 if (iter->page_stamp != iter_head_page->page->time_stamp || 3103 commit > rb_page_commit(iter_head_page)) 3104 goto reset; 3105 3106 iter->next_event = iter->head + length; 3107 return iter->event; 3108 reset: 3109 /* Reset to the beginning */ 3110 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3111 iter->head = 0; 3112 iter->next_event = 0; 3113 iter->missed_events = 1; 3114 return NULL; 3115 } 3116 3117 /* Size is determined by what has been committed */ 3118 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 3119 { 3120 return rb_page_commit(bpage) & ~RB_MISSED_MASK; 3121 } 3122 3123 static __always_inline unsigned 3124 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 3125 { 3126 return rb_page_commit(cpu_buffer->commit_page); 3127 } 3128 3129 static __always_inline unsigned 3130 rb_event_index(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event) 3131 { 3132 unsigned long addr = (unsigned long)event; 3133 3134 addr &= (PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1; 3135 3136 return addr - BUF_PAGE_HDR_SIZE; 3137 } 3138 3139 static void rb_inc_iter(struct ring_buffer_iter *iter) 3140 { 3141 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3142 3143 /* 3144 * The iterator could be on the reader page (it starts there). 3145 * But the head could have moved, since the reader was 3146 * found. Check for this case and assign the iterator 3147 * to the head page instead of next. 3148 */ 3149 if (iter->head_page == cpu_buffer->reader_page) 3150 iter->head_page = rb_set_head_page(cpu_buffer); 3151 else 3152 rb_inc_page(&iter->head_page); 3153 3154 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3155 iter->head = 0; 3156 iter->next_event = 0; 3157 } 3158 3159 /* Return the index into the sub-buffers for a given sub-buffer */ 3160 static int rb_meta_subbuf_idx(struct ring_buffer_cpu_meta *meta, void *subbuf) 3161 { 3162 void *subbuf_array; 3163 3164 subbuf_array = (void *)meta + sizeof(int) * meta->nr_subbufs; 3165 subbuf_array = (void *)ALIGN((unsigned long)subbuf_array, meta->subbuf_size); 3166 return (subbuf - subbuf_array) / meta->subbuf_size; 3167 } 3168 3169 static void rb_update_meta_head(struct ring_buffer_per_cpu *cpu_buffer, 3170 struct buffer_page *next_page) 3171 { 3172 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3173 unsigned long old_head = (unsigned long)next_page->page; 3174 unsigned long new_head; 3175 3176 rb_inc_page(&next_page); 3177 new_head = (unsigned long)next_page->page; 3178 3179 /* 3180 * Only move it forward once, if something else came in and 3181 * moved it forward, then we don't want to touch it. 3182 */ 3183 (void)cmpxchg(&meta->head_buffer, old_head, new_head); 3184 } 3185 3186 static void rb_update_meta_reader(struct ring_buffer_per_cpu *cpu_buffer, 3187 struct buffer_page *reader) 3188 { 3189 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3190 void *old_reader = cpu_buffer->reader_page->page; 3191 void *new_reader = reader->page; 3192 int id; 3193 3194 id = reader->id; 3195 cpu_buffer->reader_page->id = id; 3196 reader->id = 0; 3197 3198 meta->buffers[0] = rb_meta_subbuf_idx(meta, new_reader); 3199 meta->buffers[id] = rb_meta_subbuf_idx(meta, old_reader); 3200 3201 /* The head pointer is the one after the reader */ 3202 rb_update_meta_head(cpu_buffer, reader); 3203 } 3204 3205 /* 3206 * rb_handle_head_page - writer hit the head page 3207 * 3208 * Returns: +1 to retry page 3209 * 0 to continue 3210 * -1 on error 3211 */ 3212 static int 3213 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 3214 struct buffer_page *tail_page, 3215 struct buffer_page *next_page) 3216 { 3217 struct buffer_page *new_head; 3218 int entries; 3219 int type; 3220 int ret; 3221 3222 entries = rb_page_entries(next_page); 3223 3224 /* 3225 * The hard part is here. We need to move the head 3226 * forward, and protect against both readers on 3227 * other CPUs and writers coming in via interrupts. 3228 */ 3229 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 3230 RB_PAGE_HEAD); 3231 3232 /* 3233 * type can be one of four: 3234 * NORMAL - an interrupt already moved it for us 3235 * HEAD - we are the first to get here. 3236 * UPDATE - we are the interrupt interrupting 3237 * a current move. 3238 * MOVED - a reader on another CPU moved the next 3239 * pointer to its reader page. Give up 3240 * and try again. 3241 */ 3242 3243 switch (type) { 3244 case RB_PAGE_HEAD: 3245 /* 3246 * We changed the head to UPDATE, thus 3247 * it is our responsibility to update 3248 * the counters. 3249 */ 3250 local_add(entries, &cpu_buffer->overrun); 3251 local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes); 3252 local_inc(&cpu_buffer->pages_lost); 3253 3254 if (cpu_buffer->ring_meta) 3255 rb_update_meta_head(cpu_buffer, next_page); 3256 /* 3257 * The entries will be zeroed out when we move the 3258 * tail page. 3259 */ 3260 3261 /* still more to do */ 3262 break; 3263 3264 case RB_PAGE_UPDATE: 3265 /* 3266 * This is an interrupt that interrupt the 3267 * previous update. Still more to do. 3268 */ 3269 break; 3270 case RB_PAGE_NORMAL: 3271 /* 3272 * An interrupt came in before the update 3273 * and processed this for us. 3274 * Nothing left to do. 3275 */ 3276 return 1; 3277 case RB_PAGE_MOVED: 3278 /* 3279 * The reader is on another CPU and just did 3280 * a swap with our next_page. 3281 * Try again. 3282 */ 3283 return 1; 3284 default: 3285 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 3286 return -1; 3287 } 3288 3289 /* 3290 * Now that we are here, the old head pointer is 3291 * set to UPDATE. This will keep the reader from 3292 * swapping the head page with the reader page. 3293 * The reader (on another CPU) will spin till 3294 * we are finished. 3295 * 3296 * We just need to protect against interrupts 3297 * doing the job. We will set the next pointer 3298 * to HEAD. After that, we set the old pointer 3299 * to NORMAL, but only if it was HEAD before. 3300 * otherwise we are an interrupt, and only 3301 * want the outer most commit to reset it. 3302 */ 3303 new_head = next_page; 3304 rb_inc_page(&new_head); 3305 3306 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 3307 RB_PAGE_NORMAL); 3308 3309 /* 3310 * Valid returns are: 3311 * HEAD - an interrupt came in and already set it. 3312 * NORMAL - One of two things: 3313 * 1) We really set it. 3314 * 2) A bunch of interrupts came in and moved 3315 * the page forward again. 3316 */ 3317 switch (ret) { 3318 case RB_PAGE_HEAD: 3319 case RB_PAGE_NORMAL: 3320 /* OK */ 3321 break; 3322 default: 3323 RB_WARN_ON(cpu_buffer, 1); 3324 return -1; 3325 } 3326 3327 /* 3328 * It is possible that an interrupt came in, 3329 * set the head up, then more interrupts came in 3330 * and moved it again. When we get back here, 3331 * the page would have been set to NORMAL but we 3332 * just set it back to HEAD. 3333 * 3334 * How do you detect this? Well, if that happened 3335 * the tail page would have moved. 3336 */ 3337 if (ret == RB_PAGE_NORMAL) { 3338 struct buffer_page *buffer_tail_page; 3339 3340 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 3341 /* 3342 * If the tail had moved passed next, then we need 3343 * to reset the pointer. 3344 */ 3345 if (buffer_tail_page != tail_page && 3346 buffer_tail_page != next_page) 3347 rb_head_page_set_normal(cpu_buffer, new_head, 3348 next_page, 3349 RB_PAGE_HEAD); 3350 } 3351 3352 /* 3353 * If this was the outer most commit (the one that 3354 * changed the original pointer from HEAD to UPDATE), 3355 * then it is up to us to reset it to NORMAL. 3356 */ 3357 if (type == RB_PAGE_HEAD) { 3358 ret = rb_head_page_set_normal(cpu_buffer, next_page, 3359 tail_page, 3360 RB_PAGE_UPDATE); 3361 if (RB_WARN_ON(cpu_buffer, 3362 ret != RB_PAGE_UPDATE)) 3363 return -1; 3364 } 3365 3366 return 0; 3367 } 3368 3369 static inline void 3370 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 3371 unsigned long tail, struct rb_event_info *info) 3372 { 3373 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 3374 struct buffer_page *tail_page = info->tail_page; 3375 struct ring_buffer_event *event; 3376 unsigned long length = info->length; 3377 3378 /* 3379 * Only the event that crossed the page boundary 3380 * must fill the old tail_page with padding. 3381 */ 3382 if (tail >= bsize) { 3383 /* 3384 * If the page was filled, then we still need 3385 * to update the real_end. Reset it to zero 3386 * and the reader will ignore it. 3387 */ 3388 if (tail == bsize) 3389 tail_page->real_end = 0; 3390 3391 local_sub(length, &tail_page->write); 3392 return; 3393 } 3394 3395 event = __rb_page_index(tail_page, tail); 3396 3397 /* 3398 * Save the original length to the meta data. 3399 * This will be used by the reader to add lost event 3400 * counter. 3401 */ 3402 tail_page->real_end = tail; 3403 3404 /* 3405 * If this event is bigger than the minimum size, then 3406 * we need to be careful that we don't subtract the 3407 * write counter enough to allow another writer to slip 3408 * in on this page. 3409 * We put in a discarded commit instead, to make sure 3410 * that this space is not used again, and this space will 3411 * not be accounted into 'entries_bytes'. 3412 * 3413 * If we are less than the minimum size, we don't need to 3414 * worry about it. 3415 */ 3416 if (tail > (bsize - RB_EVNT_MIN_SIZE)) { 3417 /* No room for any events */ 3418 3419 /* Mark the rest of the page with padding */ 3420 rb_event_set_padding(event); 3421 3422 /* Make sure the padding is visible before the write update */ 3423 smp_wmb(); 3424 3425 /* Set the write back to the previous setting */ 3426 local_sub(length, &tail_page->write); 3427 return; 3428 } 3429 3430 /* Put in a discarded event */ 3431 event->array[0] = (bsize - tail) - RB_EVNT_HDR_SIZE; 3432 event->type_len = RINGBUF_TYPE_PADDING; 3433 /* time delta must be non zero */ 3434 event->time_delta = 1; 3435 3436 /* account for padding bytes */ 3437 local_add(bsize - tail, &cpu_buffer->entries_bytes); 3438 3439 /* Make sure the padding is visible before the tail_page->write update */ 3440 smp_wmb(); 3441 3442 /* Set write to end of buffer */ 3443 length = (tail + length) - bsize; 3444 local_sub(length, &tail_page->write); 3445 } 3446 3447 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 3448 3449 /* 3450 * This is the slow path, force gcc not to inline it. 3451 */ 3452 static noinline struct ring_buffer_event * 3453 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 3454 unsigned long tail, struct rb_event_info *info) 3455 { 3456 struct buffer_page *tail_page = info->tail_page; 3457 struct buffer_page *commit_page = cpu_buffer->commit_page; 3458 struct trace_buffer *buffer = cpu_buffer->buffer; 3459 struct buffer_page *next_page; 3460 int ret; 3461 3462 next_page = tail_page; 3463 3464 rb_inc_page(&next_page); 3465 3466 /* 3467 * If for some reason, we had an interrupt storm that made 3468 * it all the way around the buffer, bail, and warn 3469 * about it. 3470 */ 3471 if (unlikely(next_page == commit_page)) { 3472 local_inc(&cpu_buffer->commit_overrun); 3473 goto out_reset; 3474 } 3475 3476 /* 3477 * This is where the fun begins! 3478 * 3479 * We are fighting against races between a reader that 3480 * could be on another CPU trying to swap its reader 3481 * page with the buffer head. 3482 * 3483 * We are also fighting against interrupts coming in and 3484 * moving the head or tail on us as well. 3485 * 3486 * If the next page is the head page then we have filled 3487 * the buffer, unless the commit page is still on the 3488 * reader page. 3489 */ 3490 if (rb_is_head_page(next_page, &tail_page->list)) { 3491 3492 /* 3493 * If the commit is not on the reader page, then 3494 * move the header page. 3495 */ 3496 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 3497 /* 3498 * If we are not in overwrite mode, 3499 * this is easy, just stop here. 3500 */ 3501 if (!(buffer->flags & RB_FL_OVERWRITE)) { 3502 local_inc(&cpu_buffer->dropped_events); 3503 goto out_reset; 3504 } 3505 3506 ret = rb_handle_head_page(cpu_buffer, 3507 tail_page, 3508 next_page); 3509 if (ret < 0) 3510 goto out_reset; 3511 if (ret) 3512 goto out_again; 3513 } else { 3514 /* 3515 * We need to be careful here too. The 3516 * commit page could still be on the reader 3517 * page. We could have a small buffer, and 3518 * have filled up the buffer with events 3519 * from interrupts and such, and wrapped. 3520 * 3521 * Note, if the tail page is also on the 3522 * reader_page, we let it move out. 3523 */ 3524 if (unlikely((cpu_buffer->commit_page != 3525 cpu_buffer->tail_page) && 3526 (cpu_buffer->commit_page == 3527 cpu_buffer->reader_page))) { 3528 local_inc(&cpu_buffer->commit_overrun); 3529 goto out_reset; 3530 } 3531 } 3532 } 3533 3534 rb_tail_page_update(cpu_buffer, tail_page, next_page); 3535 3536 out_again: 3537 3538 rb_reset_tail(cpu_buffer, tail, info); 3539 3540 /* Commit what we have for now. */ 3541 rb_end_commit(cpu_buffer); 3542 /* rb_end_commit() decs committing */ 3543 local_inc(&cpu_buffer->committing); 3544 3545 /* fail and let the caller try again */ 3546 return ERR_PTR(-EAGAIN); 3547 3548 out_reset: 3549 /* reset write */ 3550 rb_reset_tail(cpu_buffer, tail, info); 3551 3552 return NULL; 3553 } 3554 3555 /* Slow path */ 3556 static struct ring_buffer_event * 3557 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 3558 struct ring_buffer_event *event, u64 delta, bool abs) 3559 { 3560 if (abs) 3561 event->type_len = RINGBUF_TYPE_TIME_STAMP; 3562 else 3563 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 3564 3565 /* Not the first event on the page, or not delta? */ 3566 if (abs || rb_event_index(cpu_buffer, event)) { 3567 event->time_delta = delta & TS_MASK; 3568 event->array[0] = delta >> TS_SHIFT; 3569 } else { 3570 /* nope, just zero it */ 3571 event->time_delta = 0; 3572 event->array[0] = 0; 3573 } 3574 3575 return skip_time_extend(event); 3576 } 3577 3578 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 3579 static inline bool sched_clock_stable(void) 3580 { 3581 return true; 3582 } 3583 #endif 3584 3585 static void 3586 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3587 struct rb_event_info *info) 3588 { 3589 u64 write_stamp; 3590 3591 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", 3592 (unsigned long long)info->delta, 3593 (unsigned long long)info->ts, 3594 (unsigned long long)info->before, 3595 (unsigned long long)info->after, 3596 (unsigned long long)({rb_time_read(&cpu_buffer->write_stamp, &write_stamp); write_stamp;}), 3597 sched_clock_stable() ? "" : 3598 "If you just came from a suspend/resume,\n" 3599 "please switch to the trace global clock:\n" 3600 " echo global > /sys/kernel/tracing/trace_clock\n" 3601 "or add trace_clock=global to the kernel command line\n"); 3602 } 3603 3604 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3605 struct ring_buffer_event **event, 3606 struct rb_event_info *info, 3607 u64 *delta, 3608 unsigned int *length) 3609 { 3610 bool abs = info->add_timestamp & 3611 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); 3612 3613 if (unlikely(info->delta > (1ULL << 59))) { 3614 /* 3615 * Some timers can use more than 59 bits, and when a timestamp 3616 * is added to the buffer, it will lose those bits. 3617 */ 3618 if (abs && (info->ts & TS_MSB)) { 3619 info->delta &= ABS_TS_MASK; 3620 3621 /* did the clock go backwards */ 3622 } else if (info->before == info->after && info->before > info->ts) { 3623 /* not interrupted */ 3624 static int once; 3625 3626 /* 3627 * This is possible with a recalibrating of the TSC. 3628 * Do not produce a call stack, but just report it. 3629 */ 3630 if (!once) { 3631 once++; 3632 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", 3633 info->before, info->ts); 3634 } 3635 } else 3636 rb_check_timestamp(cpu_buffer, info); 3637 if (!abs) 3638 info->delta = 0; 3639 } 3640 *event = rb_add_time_stamp(cpu_buffer, *event, info->delta, abs); 3641 *length -= RB_LEN_TIME_EXTEND; 3642 *delta = 0; 3643 } 3644 3645 /** 3646 * rb_update_event - update event type and data 3647 * @cpu_buffer: The per cpu buffer of the @event 3648 * @event: the event to update 3649 * @info: The info to update the @event with (contains length and delta) 3650 * 3651 * Update the type and data fields of the @event. The length 3652 * is the actual size that is written to the ring buffer, 3653 * and with this, we can determine what to place into the 3654 * data field. 3655 */ 3656 static void 3657 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 3658 struct ring_buffer_event *event, 3659 struct rb_event_info *info) 3660 { 3661 unsigned length = info->length; 3662 u64 delta = info->delta; 3663 unsigned int nest = local_read(&cpu_buffer->committing) - 1; 3664 3665 if (!WARN_ON_ONCE(nest >= MAX_NEST)) 3666 cpu_buffer->event_stamp[nest] = info->ts; 3667 3668 /* 3669 * If we need to add a timestamp, then we 3670 * add it to the start of the reserved space. 3671 */ 3672 if (unlikely(info->add_timestamp)) 3673 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); 3674 3675 event->time_delta = delta; 3676 length -= RB_EVNT_HDR_SIZE; 3677 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 3678 event->type_len = 0; 3679 event->array[0] = length; 3680 } else 3681 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 3682 } 3683 3684 static unsigned rb_calculate_event_length(unsigned length) 3685 { 3686 struct ring_buffer_event event; /* Used only for sizeof array */ 3687 3688 /* zero length can cause confusions */ 3689 if (!length) 3690 length++; 3691 3692 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 3693 length += sizeof(event.array[0]); 3694 3695 length += RB_EVNT_HDR_SIZE; 3696 length = ALIGN(length, RB_ARCH_ALIGNMENT); 3697 3698 /* 3699 * In case the time delta is larger than the 27 bits for it 3700 * in the header, we need to add a timestamp. If another 3701 * event comes in when trying to discard this one to increase 3702 * the length, then the timestamp will be added in the allocated 3703 * space of this event. If length is bigger than the size needed 3704 * for the TIME_EXTEND, then padding has to be used. The events 3705 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 3706 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 3707 * As length is a multiple of 4, we only need to worry if it 3708 * is 12 (RB_LEN_TIME_EXTEND + 4). 3709 */ 3710 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 3711 length += RB_ALIGNMENT; 3712 3713 return length; 3714 } 3715 3716 static inline bool 3717 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 3718 struct ring_buffer_event *event) 3719 { 3720 unsigned long new_index, old_index; 3721 struct buffer_page *bpage; 3722 unsigned long addr; 3723 3724 new_index = rb_event_index(cpu_buffer, event); 3725 old_index = new_index + rb_event_ts_length(event); 3726 addr = (unsigned long)event; 3727 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 3728 3729 bpage = READ_ONCE(cpu_buffer->tail_page); 3730 3731 /* 3732 * Make sure the tail_page is still the same and 3733 * the next write location is the end of this event 3734 */ 3735 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 3736 unsigned long write_mask = 3737 local_read(&bpage->write) & ~RB_WRITE_MASK; 3738 unsigned long event_length = rb_event_length(event); 3739 3740 /* 3741 * For the before_stamp to be different than the write_stamp 3742 * to make sure that the next event adds an absolute 3743 * value and does not rely on the saved write stamp, which 3744 * is now going to be bogus. 3745 * 3746 * By setting the before_stamp to zero, the next event 3747 * is not going to use the write_stamp and will instead 3748 * create an absolute timestamp. This means there's no 3749 * reason to update the wirte_stamp! 3750 */ 3751 rb_time_set(&cpu_buffer->before_stamp, 0); 3752 3753 /* 3754 * If an event were to come in now, it would see that the 3755 * write_stamp and the before_stamp are different, and assume 3756 * that this event just added itself before updating 3757 * the write stamp. The interrupting event will fix the 3758 * write stamp for us, and use an absolute timestamp. 3759 */ 3760 3761 /* 3762 * This is on the tail page. It is possible that 3763 * a write could come in and move the tail page 3764 * and write to the next page. That is fine 3765 * because we just shorten what is on this page. 3766 */ 3767 old_index += write_mask; 3768 new_index += write_mask; 3769 3770 /* caution: old_index gets updated on cmpxchg failure */ 3771 if (local_try_cmpxchg(&bpage->write, &old_index, new_index)) { 3772 /* update counters */ 3773 local_sub(event_length, &cpu_buffer->entries_bytes); 3774 return true; 3775 } 3776 } 3777 3778 /* could not discard */ 3779 return false; 3780 } 3781 3782 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 3783 { 3784 local_inc(&cpu_buffer->committing); 3785 local_inc(&cpu_buffer->commits); 3786 } 3787 3788 static __always_inline void 3789 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 3790 { 3791 unsigned long max_count; 3792 3793 /* 3794 * We only race with interrupts and NMIs on this CPU. 3795 * If we own the commit event, then we can commit 3796 * all others that interrupted us, since the interruptions 3797 * are in stack format (they finish before they come 3798 * back to us). This allows us to do a simple loop to 3799 * assign the commit to the tail. 3800 */ 3801 again: 3802 max_count = cpu_buffer->nr_pages * 100; 3803 3804 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 3805 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 3806 return; 3807 if (RB_WARN_ON(cpu_buffer, 3808 rb_is_reader_page(cpu_buffer->tail_page))) 3809 return; 3810 /* 3811 * No need for a memory barrier here, as the update 3812 * of the tail_page did it for this page. 3813 */ 3814 local_set(&cpu_buffer->commit_page->page->commit, 3815 rb_page_write(cpu_buffer->commit_page)); 3816 rb_inc_page(&cpu_buffer->commit_page); 3817 if (cpu_buffer->ring_meta) { 3818 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3819 meta->commit_buffer = (unsigned long)cpu_buffer->commit_page->page; 3820 } 3821 /* add barrier to keep gcc from optimizing too much */ 3822 barrier(); 3823 } 3824 while (rb_commit_index(cpu_buffer) != 3825 rb_page_write(cpu_buffer->commit_page)) { 3826 3827 /* Make sure the readers see the content of what is committed. */ 3828 smp_wmb(); 3829 local_set(&cpu_buffer->commit_page->page->commit, 3830 rb_page_write(cpu_buffer->commit_page)); 3831 RB_WARN_ON(cpu_buffer, 3832 local_read(&cpu_buffer->commit_page->page->commit) & 3833 ~RB_WRITE_MASK); 3834 barrier(); 3835 } 3836 3837 /* again, keep gcc from optimizing */ 3838 barrier(); 3839 3840 /* 3841 * If an interrupt came in just after the first while loop 3842 * and pushed the tail page forward, we will be left with 3843 * a dangling commit that will never go forward. 3844 */ 3845 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 3846 goto again; 3847 } 3848 3849 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 3850 { 3851 unsigned long commits; 3852 3853 if (RB_WARN_ON(cpu_buffer, 3854 !local_read(&cpu_buffer->committing))) 3855 return; 3856 3857 again: 3858 commits = local_read(&cpu_buffer->commits); 3859 /* synchronize with interrupts */ 3860 barrier(); 3861 if (local_read(&cpu_buffer->committing) == 1) 3862 rb_set_commit_to_write(cpu_buffer); 3863 3864 local_dec(&cpu_buffer->committing); 3865 3866 /* synchronize with interrupts */ 3867 barrier(); 3868 3869 /* 3870 * Need to account for interrupts coming in between the 3871 * updating of the commit page and the clearing of the 3872 * committing counter. 3873 */ 3874 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 3875 !local_read(&cpu_buffer->committing)) { 3876 local_inc(&cpu_buffer->committing); 3877 goto again; 3878 } 3879 } 3880 3881 static inline void rb_event_discard(struct ring_buffer_event *event) 3882 { 3883 if (extended_time(event)) 3884 event = skip_time_extend(event); 3885 3886 /* array[0] holds the actual length for the discarded event */ 3887 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 3888 event->type_len = RINGBUF_TYPE_PADDING; 3889 /* time delta must be non zero */ 3890 if (!event->time_delta) 3891 event->time_delta = 1; 3892 } 3893 3894 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer) 3895 { 3896 local_inc(&cpu_buffer->entries); 3897 rb_end_commit(cpu_buffer); 3898 } 3899 3900 static __always_inline void 3901 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 3902 { 3903 if (buffer->irq_work.waiters_pending) { 3904 buffer->irq_work.waiters_pending = false; 3905 /* irq_work_queue() supplies it's own memory barriers */ 3906 irq_work_queue(&buffer->irq_work.work); 3907 } 3908 3909 if (cpu_buffer->irq_work.waiters_pending) { 3910 cpu_buffer->irq_work.waiters_pending = false; 3911 /* irq_work_queue() supplies it's own memory barriers */ 3912 irq_work_queue(&cpu_buffer->irq_work.work); 3913 } 3914 3915 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 3916 return; 3917 3918 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 3919 return; 3920 3921 if (!cpu_buffer->irq_work.full_waiters_pending) 3922 return; 3923 3924 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 3925 3926 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full)) 3927 return; 3928 3929 cpu_buffer->irq_work.wakeup_full = true; 3930 cpu_buffer->irq_work.full_waiters_pending = false; 3931 /* irq_work_queue() supplies it's own memory barriers */ 3932 irq_work_queue(&cpu_buffer->irq_work.work); 3933 } 3934 3935 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION 3936 # define do_ring_buffer_record_recursion() \ 3937 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_) 3938 #else 3939 # define do_ring_buffer_record_recursion() do { } while (0) 3940 #endif 3941 3942 /* 3943 * The lock and unlock are done within a preempt disable section. 3944 * The current_context per_cpu variable can only be modified 3945 * by the current task between lock and unlock. But it can 3946 * be modified more than once via an interrupt. To pass this 3947 * information from the lock to the unlock without having to 3948 * access the 'in_interrupt()' functions again (which do show 3949 * a bit of overhead in something as critical as function tracing, 3950 * we use a bitmask trick. 3951 * 3952 * bit 1 = NMI context 3953 * bit 2 = IRQ context 3954 * bit 3 = SoftIRQ context 3955 * bit 4 = normal context. 3956 * 3957 * This works because this is the order of contexts that can 3958 * preempt other contexts. A SoftIRQ never preempts an IRQ 3959 * context. 3960 * 3961 * When the context is determined, the corresponding bit is 3962 * checked and set (if it was set, then a recursion of that context 3963 * happened). 3964 * 3965 * On unlock, we need to clear this bit. To do so, just subtract 3966 * 1 from the current_context and AND it to itself. 3967 * 3968 * (binary) 3969 * 101 - 1 = 100 3970 * 101 & 100 = 100 (clearing bit zero) 3971 * 3972 * 1010 - 1 = 1001 3973 * 1010 & 1001 = 1000 (clearing bit 1) 3974 * 3975 * The least significant bit can be cleared this way, and it 3976 * just so happens that it is the same bit corresponding to 3977 * the current context. 3978 * 3979 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit 3980 * is set when a recursion is detected at the current context, and if 3981 * the TRANSITION bit is already set, it will fail the recursion. 3982 * This is needed because there's a lag between the changing of 3983 * interrupt context and updating the preempt count. In this case, 3984 * a false positive will be found. To handle this, one extra recursion 3985 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION 3986 * bit is already set, then it is considered a recursion and the function 3987 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. 3988 * 3989 * On the trace_recursive_unlock(), the TRANSITION bit will be the first 3990 * to be cleared. Even if it wasn't the context that set it. That is, 3991 * if an interrupt comes in while NORMAL bit is set and the ring buffer 3992 * is called before preempt_count() is updated, since the check will 3993 * be on the NORMAL bit, the TRANSITION bit will then be set. If an 3994 * NMI then comes in, it will set the NMI bit, but when the NMI code 3995 * does the trace_recursive_unlock() it will clear the TRANSITION bit 3996 * and leave the NMI bit set. But this is fine, because the interrupt 3997 * code that set the TRANSITION bit will then clear the NMI bit when it 3998 * calls trace_recursive_unlock(). If another NMI comes in, it will 3999 * set the TRANSITION bit and continue. 4000 * 4001 * Note: The TRANSITION bit only handles a single transition between context. 4002 */ 4003 4004 static __always_inline bool 4005 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 4006 { 4007 unsigned int val = cpu_buffer->current_context; 4008 int bit = interrupt_context_level(); 4009 4010 bit = RB_CTX_NORMAL - bit; 4011 4012 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { 4013 /* 4014 * It is possible that this was called by transitioning 4015 * between interrupt context, and preempt_count() has not 4016 * been updated yet. In this case, use the TRANSITION bit. 4017 */ 4018 bit = RB_CTX_TRANSITION; 4019 if (val & (1 << (bit + cpu_buffer->nest))) { 4020 do_ring_buffer_record_recursion(); 4021 return true; 4022 } 4023 } 4024 4025 val |= (1 << (bit + cpu_buffer->nest)); 4026 cpu_buffer->current_context = val; 4027 4028 return false; 4029 } 4030 4031 static __always_inline void 4032 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 4033 { 4034 cpu_buffer->current_context &= 4035 cpu_buffer->current_context - (1 << cpu_buffer->nest); 4036 } 4037 4038 /* The recursive locking above uses 5 bits */ 4039 #define NESTED_BITS 5 4040 4041 /** 4042 * ring_buffer_nest_start - Allow to trace while nested 4043 * @buffer: The ring buffer to modify 4044 * 4045 * The ring buffer has a safety mechanism to prevent recursion. 4046 * But there may be a case where a trace needs to be done while 4047 * tracing something else. In this case, calling this function 4048 * will allow this function to nest within a currently active 4049 * ring_buffer_lock_reserve(). 4050 * 4051 * Call this function before calling another ring_buffer_lock_reserve() and 4052 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 4053 */ 4054 void ring_buffer_nest_start(struct trace_buffer *buffer) 4055 { 4056 struct ring_buffer_per_cpu *cpu_buffer; 4057 int cpu; 4058 4059 /* Enabled by ring_buffer_nest_end() */ 4060 preempt_disable_notrace(); 4061 cpu = raw_smp_processor_id(); 4062 cpu_buffer = buffer->buffers[cpu]; 4063 /* This is the shift value for the above recursive locking */ 4064 cpu_buffer->nest += NESTED_BITS; 4065 } 4066 4067 /** 4068 * ring_buffer_nest_end - Allow to trace while nested 4069 * @buffer: The ring buffer to modify 4070 * 4071 * Must be called after ring_buffer_nest_start() and after the 4072 * ring_buffer_unlock_commit(). 4073 */ 4074 void ring_buffer_nest_end(struct trace_buffer *buffer) 4075 { 4076 struct ring_buffer_per_cpu *cpu_buffer; 4077 int cpu; 4078 4079 /* disabled by ring_buffer_nest_start() */ 4080 cpu = raw_smp_processor_id(); 4081 cpu_buffer = buffer->buffers[cpu]; 4082 /* This is the shift value for the above recursive locking */ 4083 cpu_buffer->nest -= NESTED_BITS; 4084 preempt_enable_notrace(); 4085 } 4086 4087 /** 4088 * ring_buffer_unlock_commit - commit a reserved 4089 * @buffer: The buffer to commit to 4090 * 4091 * This commits the data to the ring buffer, and releases any locks held. 4092 * 4093 * Must be paired with ring_buffer_lock_reserve. 4094 */ 4095 int ring_buffer_unlock_commit(struct trace_buffer *buffer) 4096 { 4097 struct ring_buffer_per_cpu *cpu_buffer; 4098 int cpu = raw_smp_processor_id(); 4099 4100 cpu_buffer = buffer->buffers[cpu]; 4101 4102 rb_commit(cpu_buffer); 4103 4104 rb_wakeups(buffer, cpu_buffer); 4105 4106 trace_recursive_unlock(cpu_buffer); 4107 4108 preempt_enable_notrace(); 4109 4110 return 0; 4111 } 4112 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 4113 4114 /* Special value to validate all deltas on a page. */ 4115 #define CHECK_FULL_PAGE 1L 4116 4117 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS 4118 4119 static const char *show_irq_str(int bits) 4120 { 4121 const char *type[] = { 4122 ".", // 0 4123 "s", // 1 4124 "h", // 2 4125 "Hs", // 3 4126 "n", // 4 4127 "Ns", // 5 4128 "Nh", // 6 4129 "NHs", // 7 4130 }; 4131 4132 return type[bits]; 4133 } 4134 4135 /* Assume this is a trace event */ 4136 static const char *show_flags(struct ring_buffer_event *event) 4137 { 4138 struct trace_entry *entry; 4139 int bits = 0; 4140 4141 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4142 return "X"; 4143 4144 entry = ring_buffer_event_data(event); 4145 4146 if (entry->flags & TRACE_FLAG_SOFTIRQ) 4147 bits |= 1; 4148 4149 if (entry->flags & TRACE_FLAG_HARDIRQ) 4150 bits |= 2; 4151 4152 if (entry->flags & TRACE_FLAG_NMI) 4153 bits |= 4; 4154 4155 return show_irq_str(bits); 4156 } 4157 4158 static const char *show_irq(struct ring_buffer_event *event) 4159 { 4160 struct trace_entry *entry; 4161 4162 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4163 return ""; 4164 4165 entry = ring_buffer_event_data(event); 4166 if (entry->flags & TRACE_FLAG_IRQS_OFF) 4167 return "d"; 4168 return ""; 4169 } 4170 4171 static const char *show_interrupt_level(void) 4172 { 4173 unsigned long pc = preempt_count(); 4174 unsigned char level = 0; 4175 4176 if (pc & SOFTIRQ_OFFSET) 4177 level |= 1; 4178 4179 if (pc & HARDIRQ_MASK) 4180 level |= 2; 4181 4182 if (pc & NMI_MASK) 4183 level |= 4; 4184 4185 return show_irq_str(level); 4186 } 4187 4188 static void dump_buffer_page(struct buffer_data_page *bpage, 4189 struct rb_event_info *info, 4190 unsigned long tail) 4191 { 4192 struct ring_buffer_event *event; 4193 u64 ts, delta; 4194 int e; 4195 4196 ts = bpage->time_stamp; 4197 pr_warn(" [%lld] PAGE TIME STAMP\n", ts); 4198 4199 for (e = 0; e < tail; e += rb_event_length(event)) { 4200 4201 event = (struct ring_buffer_event *)(bpage->data + e); 4202 4203 switch (event->type_len) { 4204 4205 case RINGBUF_TYPE_TIME_EXTEND: 4206 delta = rb_event_time_stamp(event); 4207 ts += delta; 4208 pr_warn(" 0x%x: [%lld] delta:%lld TIME EXTEND\n", 4209 e, ts, delta); 4210 break; 4211 4212 case RINGBUF_TYPE_TIME_STAMP: 4213 delta = rb_event_time_stamp(event); 4214 ts = rb_fix_abs_ts(delta, ts); 4215 pr_warn(" 0x%x: [%lld] absolute:%lld TIME STAMP\n", 4216 e, ts, delta); 4217 break; 4218 4219 case RINGBUF_TYPE_PADDING: 4220 ts += event->time_delta; 4221 pr_warn(" 0x%x: [%lld] delta:%d PADDING\n", 4222 e, ts, event->time_delta); 4223 break; 4224 4225 case RINGBUF_TYPE_DATA: 4226 ts += event->time_delta; 4227 pr_warn(" 0x%x: [%lld] delta:%d %s%s\n", 4228 e, ts, event->time_delta, 4229 show_flags(event), show_irq(event)); 4230 break; 4231 4232 default: 4233 break; 4234 } 4235 } 4236 pr_warn("expected end:0x%lx last event actually ended at:0x%x\n", tail, e); 4237 } 4238 4239 static DEFINE_PER_CPU(atomic_t, checking); 4240 static atomic_t ts_dump; 4241 4242 #define buffer_warn_return(fmt, ...) \ 4243 do { \ 4244 /* If another report is happening, ignore this one */ \ 4245 if (atomic_inc_return(&ts_dump) != 1) { \ 4246 atomic_dec(&ts_dump); \ 4247 goto out; \ 4248 } \ 4249 atomic_inc(&cpu_buffer->record_disabled); \ 4250 pr_warn(fmt, ##__VA_ARGS__); \ 4251 dump_buffer_page(bpage, info, tail); \ 4252 atomic_dec(&ts_dump); \ 4253 /* There's some cases in boot up that this can happen */ \ 4254 if (WARN_ON_ONCE(system_state != SYSTEM_BOOTING)) \ 4255 /* Do not re-enable checking */ \ 4256 return; \ 4257 } while (0) 4258 4259 /* 4260 * Check if the current event time stamp matches the deltas on 4261 * the buffer page. 4262 */ 4263 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4264 struct rb_event_info *info, 4265 unsigned long tail) 4266 { 4267 struct buffer_data_page *bpage; 4268 u64 ts, delta; 4269 bool full = false; 4270 int ret; 4271 4272 bpage = info->tail_page->page; 4273 4274 if (tail == CHECK_FULL_PAGE) { 4275 full = true; 4276 tail = local_read(&bpage->commit); 4277 } else if (info->add_timestamp & 4278 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) { 4279 /* Ignore events with absolute time stamps */ 4280 return; 4281 } 4282 4283 /* 4284 * Do not check the first event (skip possible extends too). 4285 * Also do not check if previous events have not been committed. 4286 */ 4287 if (tail <= 8 || tail > local_read(&bpage->commit)) 4288 return; 4289 4290 /* 4291 * If this interrupted another event, 4292 */ 4293 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1) 4294 goto out; 4295 4296 ret = rb_read_data_buffer(bpage, tail, cpu_buffer->cpu, &ts, &delta); 4297 if (ret < 0) { 4298 if (delta < ts) { 4299 buffer_warn_return("[CPU: %d]ABSOLUTE TIME WENT BACKWARDS: last ts: %lld absolute ts: %lld\n", 4300 cpu_buffer->cpu, ts, delta); 4301 goto out; 4302 } 4303 } 4304 if ((full && ts > info->ts) || 4305 (!full && ts + info->delta != info->ts)) { 4306 buffer_warn_return("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s context:%s\n", 4307 cpu_buffer->cpu, 4308 ts + info->delta, info->ts, info->delta, 4309 info->before, info->after, 4310 full ? " (full)" : "", show_interrupt_level()); 4311 } 4312 out: 4313 atomic_dec(this_cpu_ptr(&checking)); 4314 } 4315 #else 4316 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4317 struct rb_event_info *info, 4318 unsigned long tail) 4319 { 4320 } 4321 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */ 4322 4323 static struct ring_buffer_event * 4324 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 4325 struct rb_event_info *info) 4326 { 4327 struct ring_buffer_event *event; 4328 struct buffer_page *tail_page; 4329 unsigned long tail, write, w; 4330 4331 /* Don't let the compiler play games with cpu_buffer->tail_page */ 4332 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 4333 4334 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; 4335 barrier(); 4336 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4337 rb_time_read(&cpu_buffer->write_stamp, &info->after); 4338 barrier(); 4339 info->ts = rb_time_stamp(cpu_buffer->buffer); 4340 4341 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { 4342 info->delta = info->ts; 4343 } else { 4344 /* 4345 * If interrupting an event time update, we may need an 4346 * absolute timestamp. 4347 * Don't bother if this is the start of a new page (w == 0). 4348 */ 4349 if (!w) { 4350 /* Use the sub-buffer timestamp */ 4351 info->delta = 0; 4352 } else if (unlikely(info->before != info->after)) { 4353 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; 4354 info->length += RB_LEN_TIME_EXTEND; 4355 } else { 4356 info->delta = info->ts - info->after; 4357 if (unlikely(test_time_stamp(info->delta))) { 4358 info->add_timestamp |= RB_ADD_STAMP_EXTEND; 4359 info->length += RB_LEN_TIME_EXTEND; 4360 } 4361 } 4362 } 4363 4364 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); 4365 4366 /*C*/ write = local_add_return(info->length, &tail_page->write); 4367 4368 /* set write to only the index of the write */ 4369 write &= RB_WRITE_MASK; 4370 4371 tail = write - info->length; 4372 4373 /* See if we shot pass the end of this buffer page */ 4374 if (unlikely(write > cpu_buffer->buffer->subbuf_size)) { 4375 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE); 4376 return rb_move_tail(cpu_buffer, tail, info); 4377 } 4378 4379 if (likely(tail == w)) { 4380 /* Nothing interrupted us between A and C */ 4381 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); 4382 /* 4383 * If something came in between C and D, the write stamp 4384 * may now not be in sync. But that's fine as the before_stamp 4385 * will be different and then next event will just be forced 4386 * to use an absolute timestamp. 4387 */ 4388 if (likely(!(info->add_timestamp & 4389 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4390 /* This did not interrupt any time update */ 4391 info->delta = info->ts - info->after; 4392 else 4393 /* Just use full timestamp for interrupting event */ 4394 info->delta = info->ts; 4395 check_buffer(cpu_buffer, info, tail); 4396 } else { 4397 u64 ts; 4398 /* SLOW PATH - Interrupted between A and C */ 4399 4400 /* Save the old before_stamp */ 4401 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4402 4403 /* 4404 * Read a new timestamp and update the before_stamp to make 4405 * the next event after this one force using an absolute 4406 * timestamp. This is in case an interrupt were to come in 4407 * between E and F. 4408 */ 4409 ts = rb_time_stamp(cpu_buffer->buffer); 4410 rb_time_set(&cpu_buffer->before_stamp, ts); 4411 4412 barrier(); 4413 /*E*/ rb_time_read(&cpu_buffer->write_stamp, &info->after); 4414 barrier(); 4415 /*F*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && 4416 info->after == info->before && info->after < ts) { 4417 /* 4418 * Nothing came after this event between C and F, it is 4419 * safe to use info->after for the delta as it 4420 * matched info->before and is still valid. 4421 */ 4422 info->delta = ts - info->after; 4423 } else { 4424 /* 4425 * Interrupted between C and F: 4426 * Lost the previous events time stamp. Just set the 4427 * delta to zero, and this will be the same time as 4428 * the event this event interrupted. And the events that 4429 * came after this will still be correct (as they would 4430 * have built their delta on the previous event. 4431 */ 4432 info->delta = 0; 4433 } 4434 info->ts = ts; 4435 info->add_timestamp &= ~RB_ADD_STAMP_FORCE; 4436 } 4437 4438 /* 4439 * If this is the first commit on the page, then it has the same 4440 * timestamp as the page itself. 4441 */ 4442 if (unlikely(!tail && !(info->add_timestamp & 4443 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4444 info->delta = 0; 4445 4446 /* We reserved something on the buffer */ 4447 4448 event = __rb_page_index(tail_page, tail); 4449 rb_update_event(cpu_buffer, event, info); 4450 4451 local_inc(&tail_page->entries); 4452 4453 /* 4454 * If this is the first commit on the page, then update 4455 * its timestamp. 4456 */ 4457 if (unlikely(!tail)) 4458 tail_page->page->time_stamp = info->ts; 4459 4460 /* account for these added bytes */ 4461 local_add(info->length, &cpu_buffer->entries_bytes); 4462 4463 return event; 4464 } 4465 4466 static __always_inline struct ring_buffer_event * 4467 rb_reserve_next_event(struct trace_buffer *buffer, 4468 struct ring_buffer_per_cpu *cpu_buffer, 4469 unsigned long length) 4470 { 4471 struct ring_buffer_event *event; 4472 struct rb_event_info info; 4473 int nr_loops = 0; 4474 int add_ts_default; 4475 4476 /* 4477 * ring buffer does cmpxchg as well as atomic64 operations 4478 * (which some archs use locking for atomic64), make sure this 4479 * is safe in NMI context 4480 */ 4481 if ((!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) || 4482 IS_ENABLED(CONFIG_GENERIC_ATOMIC64)) && 4483 (unlikely(in_nmi()))) { 4484 return NULL; 4485 } 4486 4487 rb_start_commit(cpu_buffer); 4488 /* The commit page can not change after this */ 4489 4490 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 4491 /* 4492 * Due to the ability to swap a cpu buffer from a buffer 4493 * it is possible it was swapped before we committed. 4494 * (committing stops a swap). We check for it here and 4495 * if it happened, we have to fail the write. 4496 */ 4497 barrier(); 4498 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 4499 local_dec(&cpu_buffer->committing); 4500 local_dec(&cpu_buffer->commits); 4501 return NULL; 4502 } 4503 #endif 4504 4505 info.length = rb_calculate_event_length(length); 4506 4507 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { 4508 add_ts_default = RB_ADD_STAMP_ABSOLUTE; 4509 info.length += RB_LEN_TIME_EXTEND; 4510 if (info.length > cpu_buffer->buffer->max_data_size) 4511 goto out_fail; 4512 } else { 4513 add_ts_default = RB_ADD_STAMP_NONE; 4514 } 4515 4516 again: 4517 info.add_timestamp = add_ts_default; 4518 info.delta = 0; 4519 4520 /* 4521 * We allow for interrupts to reenter here and do a trace. 4522 * If one does, it will cause this original code to loop 4523 * back here. Even with heavy interrupts happening, this 4524 * should only happen a few times in a row. If this happens 4525 * 1000 times in a row, there must be either an interrupt 4526 * storm or we have something buggy. 4527 * Bail! 4528 */ 4529 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 4530 goto out_fail; 4531 4532 event = __rb_reserve_next(cpu_buffer, &info); 4533 4534 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 4535 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) 4536 info.length -= RB_LEN_TIME_EXTEND; 4537 goto again; 4538 } 4539 4540 if (likely(event)) 4541 return event; 4542 out_fail: 4543 rb_end_commit(cpu_buffer); 4544 return NULL; 4545 } 4546 4547 /** 4548 * ring_buffer_lock_reserve - reserve a part of the buffer 4549 * @buffer: the ring buffer to reserve from 4550 * @length: the length of the data to reserve (excluding event header) 4551 * 4552 * Returns a reserved event on the ring buffer to copy directly to. 4553 * The user of this interface will need to get the body to write into 4554 * and can use the ring_buffer_event_data() interface. 4555 * 4556 * The length is the length of the data needed, not the event length 4557 * which also includes the event header. 4558 * 4559 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 4560 * If NULL is returned, then nothing has been allocated or locked. 4561 */ 4562 struct ring_buffer_event * 4563 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length) 4564 { 4565 struct ring_buffer_per_cpu *cpu_buffer; 4566 struct ring_buffer_event *event; 4567 int cpu; 4568 4569 /* If we are tracing schedule, we don't want to recurse */ 4570 preempt_disable_notrace(); 4571 4572 if (unlikely(atomic_read(&buffer->record_disabled))) 4573 goto out; 4574 4575 cpu = raw_smp_processor_id(); 4576 4577 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 4578 goto out; 4579 4580 cpu_buffer = buffer->buffers[cpu]; 4581 4582 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 4583 goto out; 4584 4585 if (unlikely(length > buffer->max_data_size)) 4586 goto out; 4587 4588 if (unlikely(trace_recursive_lock(cpu_buffer))) 4589 goto out; 4590 4591 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4592 if (!event) 4593 goto out_unlock; 4594 4595 return event; 4596 4597 out_unlock: 4598 trace_recursive_unlock(cpu_buffer); 4599 out: 4600 preempt_enable_notrace(); 4601 return NULL; 4602 } 4603 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 4604 4605 /* 4606 * Decrement the entries to the page that an event is on. 4607 * The event does not even need to exist, only the pointer 4608 * to the page it is on. This may only be called before the commit 4609 * takes place. 4610 */ 4611 static inline void 4612 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 4613 struct ring_buffer_event *event) 4614 { 4615 unsigned long addr = (unsigned long)event; 4616 struct buffer_page *bpage = cpu_buffer->commit_page; 4617 struct buffer_page *start; 4618 4619 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 4620 4621 /* Do the likely case first */ 4622 if (likely(bpage->page == (void *)addr)) { 4623 local_dec(&bpage->entries); 4624 return; 4625 } 4626 4627 /* 4628 * Because the commit page may be on the reader page we 4629 * start with the next page and check the end loop there. 4630 */ 4631 rb_inc_page(&bpage); 4632 start = bpage; 4633 do { 4634 if (bpage->page == (void *)addr) { 4635 local_dec(&bpage->entries); 4636 return; 4637 } 4638 rb_inc_page(&bpage); 4639 } while (bpage != start); 4640 4641 /* commit not part of this buffer?? */ 4642 RB_WARN_ON(cpu_buffer, 1); 4643 } 4644 4645 /** 4646 * ring_buffer_discard_commit - discard an event that has not been committed 4647 * @buffer: the ring buffer 4648 * @event: non committed event to discard 4649 * 4650 * Sometimes an event that is in the ring buffer needs to be ignored. 4651 * This function lets the user discard an event in the ring buffer 4652 * and then that event will not be read later. 4653 * 4654 * This function only works if it is called before the item has been 4655 * committed. It will try to free the event from the ring buffer 4656 * if another event has not been added behind it. 4657 * 4658 * If another event has been added behind it, it will set the event 4659 * up as discarded, and perform the commit. 4660 * 4661 * If this function is called, do not call ring_buffer_unlock_commit on 4662 * the event. 4663 */ 4664 void ring_buffer_discard_commit(struct trace_buffer *buffer, 4665 struct ring_buffer_event *event) 4666 { 4667 struct ring_buffer_per_cpu *cpu_buffer; 4668 int cpu; 4669 4670 /* The event is discarded regardless */ 4671 rb_event_discard(event); 4672 4673 cpu = smp_processor_id(); 4674 cpu_buffer = buffer->buffers[cpu]; 4675 4676 /* 4677 * This must only be called if the event has not been 4678 * committed yet. Thus we can assume that preemption 4679 * is still disabled. 4680 */ 4681 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 4682 4683 rb_decrement_entry(cpu_buffer, event); 4684 rb_try_to_discard(cpu_buffer, event); 4685 rb_end_commit(cpu_buffer); 4686 4687 trace_recursive_unlock(cpu_buffer); 4688 4689 preempt_enable_notrace(); 4690 4691 } 4692 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 4693 4694 /** 4695 * ring_buffer_write - write data to the buffer without reserving 4696 * @buffer: The ring buffer to write to. 4697 * @length: The length of the data being written (excluding the event header) 4698 * @data: The data to write to the buffer. 4699 * 4700 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 4701 * one function. If you already have the data to write to the buffer, it 4702 * may be easier to simply call this function. 4703 * 4704 * Note, like ring_buffer_lock_reserve, the length is the length of the data 4705 * and not the length of the event which would hold the header. 4706 */ 4707 int ring_buffer_write(struct trace_buffer *buffer, 4708 unsigned long length, 4709 void *data) 4710 { 4711 struct ring_buffer_per_cpu *cpu_buffer; 4712 struct ring_buffer_event *event; 4713 void *body; 4714 int ret = -EBUSY; 4715 int cpu; 4716 4717 preempt_disable_notrace(); 4718 4719 if (atomic_read(&buffer->record_disabled)) 4720 goto out; 4721 4722 cpu = raw_smp_processor_id(); 4723 4724 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4725 goto out; 4726 4727 cpu_buffer = buffer->buffers[cpu]; 4728 4729 if (atomic_read(&cpu_buffer->record_disabled)) 4730 goto out; 4731 4732 if (length > buffer->max_data_size) 4733 goto out; 4734 4735 if (unlikely(trace_recursive_lock(cpu_buffer))) 4736 goto out; 4737 4738 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4739 if (!event) 4740 goto out_unlock; 4741 4742 body = rb_event_data(event); 4743 4744 memcpy(body, data, length); 4745 4746 rb_commit(cpu_buffer); 4747 4748 rb_wakeups(buffer, cpu_buffer); 4749 4750 ret = 0; 4751 4752 out_unlock: 4753 trace_recursive_unlock(cpu_buffer); 4754 4755 out: 4756 preempt_enable_notrace(); 4757 4758 return ret; 4759 } 4760 EXPORT_SYMBOL_GPL(ring_buffer_write); 4761 4762 /* 4763 * The total entries in the ring buffer is the running counter 4764 * of entries entered into the ring buffer, minus the sum of 4765 * the entries read from the ring buffer and the number of 4766 * entries that were overwritten. 4767 */ 4768 static inline unsigned long 4769 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 4770 { 4771 return local_read(&cpu_buffer->entries) - 4772 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 4773 } 4774 4775 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 4776 { 4777 return !rb_num_of_entries(cpu_buffer); 4778 } 4779 4780 /** 4781 * ring_buffer_record_disable - stop all writes into the buffer 4782 * @buffer: The ring buffer to stop writes to. 4783 * 4784 * This prevents all writes to the buffer. Any attempt to write 4785 * to the buffer after this will fail and return NULL. 4786 * 4787 * The caller should call synchronize_rcu() after this. 4788 */ 4789 void ring_buffer_record_disable(struct trace_buffer *buffer) 4790 { 4791 atomic_inc(&buffer->record_disabled); 4792 } 4793 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 4794 4795 /** 4796 * ring_buffer_record_enable - enable writes to the buffer 4797 * @buffer: The ring buffer to enable writes 4798 * 4799 * Note, multiple disables will need the same number of enables 4800 * to truly enable the writing (much like preempt_disable). 4801 */ 4802 void ring_buffer_record_enable(struct trace_buffer *buffer) 4803 { 4804 atomic_dec(&buffer->record_disabled); 4805 } 4806 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 4807 4808 /** 4809 * ring_buffer_record_off - stop all writes into the buffer 4810 * @buffer: The ring buffer to stop writes to. 4811 * 4812 * This prevents all writes to the buffer. Any attempt to write 4813 * to the buffer after this will fail and return NULL. 4814 * 4815 * This is different than ring_buffer_record_disable() as 4816 * it works like an on/off switch, where as the disable() version 4817 * must be paired with a enable(). 4818 */ 4819 void ring_buffer_record_off(struct trace_buffer *buffer) 4820 { 4821 unsigned int rd; 4822 unsigned int new_rd; 4823 4824 rd = atomic_read(&buffer->record_disabled); 4825 do { 4826 new_rd = rd | RB_BUFFER_OFF; 4827 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4828 } 4829 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 4830 4831 /** 4832 * ring_buffer_record_on - restart writes into the buffer 4833 * @buffer: The ring buffer to start writes to. 4834 * 4835 * This enables all writes to the buffer that was disabled by 4836 * ring_buffer_record_off(). 4837 * 4838 * This is different than ring_buffer_record_enable() as 4839 * it works like an on/off switch, where as the enable() version 4840 * must be paired with a disable(). 4841 */ 4842 void ring_buffer_record_on(struct trace_buffer *buffer) 4843 { 4844 unsigned int rd; 4845 unsigned int new_rd; 4846 4847 rd = atomic_read(&buffer->record_disabled); 4848 do { 4849 new_rd = rd & ~RB_BUFFER_OFF; 4850 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4851 } 4852 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 4853 4854 /** 4855 * ring_buffer_record_is_on - return true if the ring buffer can write 4856 * @buffer: The ring buffer to see if write is enabled 4857 * 4858 * Returns true if the ring buffer is in a state that it accepts writes. 4859 */ 4860 bool ring_buffer_record_is_on(struct trace_buffer *buffer) 4861 { 4862 return !atomic_read(&buffer->record_disabled); 4863 } 4864 4865 /** 4866 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 4867 * @buffer: The ring buffer to see if write is set enabled 4868 * 4869 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 4870 * Note that this does NOT mean it is in a writable state. 4871 * 4872 * It may return true when the ring buffer has been disabled by 4873 * ring_buffer_record_disable(), as that is a temporary disabling of 4874 * the ring buffer. 4875 */ 4876 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer) 4877 { 4878 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 4879 } 4880 4881 /** 4882 * ring_buffer_record_is_on_cpu - return true if the ring buffer can write 4883 * @buffer: The ring buffer to see if write is enabled 4884 * @cpu: The CPU to test if the ring buffer can write too 4885 * 4886 * Returns true if the ring buffer is in a state that it accepts writes 4887 * for a particular CPU. 4888 */ 4889 bool ring_buffer_record_is_on_cpu(struct trace_buffer *buffer, int cpu) 4890 { 4891 struct ring_buffer_per_cpu *cpu_buffer; 4892 4893 cpu_buffer = buffer->buffers[cpu]; 4894 4895 return ring_buffer_record_is_set_on(buffer) && 4896 !atomic_read(&cpu_buffer->record_disabled); 4897 } 4898 4899 /** 4900 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 4901 * @buffer: The ring buffer to stop writes to. 4902 * @cpu: The CPU buffer to stop 4903 * 4904 * This prevents all writes to the buffer. Any attempt to write 4905 * to the buffer after this will fail and return NULL. 4906 * 4907 * The caller should call synchronize_rcu() after this. 4908 */ 4909 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu) 4910 { 4911 struct ring_buffer_per_cpu *cpu_buffer; 4912 4913 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4914 return; 4915 4916 cpu_buffer = buffer->buffers[cpu]; 4917 atomic_inc(&cpu_buffer->record_disabled); 4918 } 4919 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 4920 4921 /** 4922 * ring_buffer_record_enable_cpu - enable writes to the buffer 4923 * @buffer: The ring buffer to enable writes 4924 * @cpu: The CPU to enable. 4925 * 4926 * Note, multiple disables will need the same number of enables 4927 * to truly enable the writing (much like preempt_disable). 4928 */ 4929 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu) 4930 { 4931 struct ring_buffer_per_cpu *cpu_buffer; 4932 4933 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4934 return; 4935 4936 cpu_buffer = buffer->buffers[cpu]; 4937 atomic_dec(&cpu_buffer->record_disabled); 4938 } 4939 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 4940 4941 /** 4942 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 4943 * @buffer: The ring buffer 4944 * @cpu: The per CPU buffer to read from. 4945 */ 4946 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu) 4947 { 4948 unsigned long flags; 4949 struct ring_buffer_per_cpu *cpu_buffer; 4950 struct buffer_page *bpage; 4951 u64 ret = 0; 4952 4953 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4954 return 0; 4955 4956 cpu_buffer = buffer->buffers[cpu]; 4957 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4958 /* 4959 * if the tail is on reader_page, oldest time stamp is on the reader 4960 * page 4961 */ 4962 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 4963 bpage = cpu_buffer->reader_page; 4964 else 4965 bpage = rb_set_head_page(cpu_buffer); 4966 if (bpage) 4967 ret = bpage->page->time_stamp; 4968 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4969 4970 return ret; 4971 } 4972 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 4973 4974 /** 4975 * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer 4976 * @buffer: The ring buffer 4977 * @cpu: The per CPU buffer to read from. 4978 */ 4979 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu) 4980 { 4981 struct ring_buffer_per_cpu *cpu_buffer; 4982 unsigned long ret; 4983 4984 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4985 return 0; 4986 4987 cpu_buffer = buffer->buffers[cpu]; 4988 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 4989 4990 return ret; 4991 } 4992 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 4993 4994 /** 4995 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 4996 * @buffer: The ring buffer 4997 * @cpu: The per CPU buffer to get the entries from. 4998 */ 4999 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu) 5000 { 5001 struct ring_buffer_per_cpu *cpu_buffer; 5002 5003 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5004 return 0; 5005 5006 cpu_buffer = buffer->buffers[cpu]; 5007 5008 return rb_num_of_entries(cpu_buffer); 5009 } 5010 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 5011 5012 /** 5013 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 5014 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 5015 * @buffer: The ring buffer 5016 * @cpu: The per CPU buffer to get the number of overruns from 5017 */ 5018 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu) 5019 { 5020 struct ring_buffer_per_cpu *cpu_buffer; 5021 unsigned long ret; 5022 5023 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5024 return 0; 5025 5026 cpu_buffer = buffer->buffers[cpu]; 5027 ret = local_read(&cpu_buffer->overrun); 5028 5029 return ret; 5030 } 5031 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 5032 5033 /** 5034 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 5035 * commits failing due to the buffer wrapping around while there are uncommitted 5036 * events, such as during an interrupt storm. 5037 * @buffer: The ring buffer 5038 * @cpu: The per CPU buffer to get the number of overruns from 5039 */ 5040 unsigned long 5041 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu) 5042 { 5043 struct ring_buffer_per_cpu *cpu_buffer; 5044 unsigned long ret; 5045 5046 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5047 return 0; 5048 5049 cpu_buffer = buffer->buffers[cpu]; 5050 ret = local_read(&cpu_buffer->commit_overrun); 5051 5052 return ret; 5053 } 5054 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 5055 5056 /** 5057 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 5058 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 5059 * @buffer: The ring buffer 5060 * @cpu: The per CPU buffer to get the number of overruns from 5061 */ 5062 unsigned long 5063 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu) 5064 { 5065 struct ring_buffer_per_cpu *cpu_buffer; 5066 unsigned long ret; 5067 5068 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5069 return 0; 5070 5071 cpu_buffer = buffer->buffers[cpu]; 5072 ret = local_read(&cpu_buffer->dropped_events); 5073 5074 return ret; 5075 } 5076 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 5077 5078 /** 5079 * ring_buffer_read_events_cpu - get the number of events successfully read 5080 * @buffer: The ring buffer 5081 * @cpu: The per CPU buffer to get the number of events read 5082 */ 5083 unsigned long 5084 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu) 5085 { 5086 struct ring_buffer_per_cpu *cpu_buffer; 5087 5088 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5089 return 0; 5090 5091 cpu_buffer = buffer->buffers[cpu]; 5092 return cpu_buffer->read; 5093 } 5094 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 5095 5096 /** 5097 * ring_buffer_entries - get the number of entries in a buffer 5098 * @buffer: The ring buffer 5099 * 5100 * Returns the total number of entries in the ring buffer 5101 * (all CPU entries) 5102 */ 5103 unsigned long ring_buffer_entries(struct trace_buffer *buffer) 5104 { 5105 struct ring_buffer_per_cpu *cpu_buffer; 5106 unsigned long entries = 0; 5107 int cpu; 5108 5109 /* if you care about this being correct, lock the buffer */ 5110 for_each_buffer_cpu(buffer, cpu) { 5111 cpu_buffer = buffer->buffers[cpu]; 5112 entries += rb_num_of_entries(cpu_buffer); 5113 } 5114 5115 return entries; 5116 } 5117 EXPORT_SYMBOL_GPL(ring_buffer_entries); 5118 5119 /** 5120 * ring_buffer_overruns - get the number of overruns in buffer 5121 * @buffer: The ring buffer 5122 * 5123 * Returns the total number of overruns in the ring buffer 5124 * (all CPU entries) 5125 */ 5126 unsigned long ring_buffer_overruns(struct trace_buffer *buffer) 5127 { 5128 struct ring_buffer_per_cpu *cpu_buffer; 5129 unsigned long overruns = 0; 5130 int cpu; 5131 5132 /* if you care about this being correct, lock the buffer */ 5133 for_each_buffer_cpu(buffer, cpu) { 5134 cpu_buffer = buffer->buffers[cpu]; 5135 overruns += local_read(&cpu_buffer->overrun); 5136 } 5137 5138 return overruns; 5139 } 5140 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 5141 5142 static void rb_iter_reset(struct ring_buffer_iter *iter) 5143 { 5144 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5145 5146 /* Iterator usage is expected to have record disabled */ 5147 iter->head_page = cpu_buffer->reader_page; 5148 iter->head = cpu_buffer->reader_page->read; 5149 iter->next_event = iter->head; 5150 5151 iter->cache_reader_page = iter->head_page; 5152 iter->cache_read = cpu_buffer->read; 5153 iter->cache_pages_removed = cpu_buffer->pages_removed; 5154 5155 if (iter->head) { 5156 iter->read_stamp = cpu_buffer->read_stamp; 5157 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; 5158 } else { 5159 iter->read_stamp = iter->head_page->page->time_stamp; 5160 iter->page_stamp = iter->read_stamp; 5161 } 5162 } 5163 5164 /** 5165 * ring_buffer_iter_reset - reset an iterator 5166 * @iter: The iterator to reset 5167 * 5168 * Resets the iterator, so that it will start from the beginning 5169 * again. 5170 */ 5171 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 5172 { 5173 struct ring_buffer_per_cpu *cpu_buffer; 5174 unsigned long flags; 5175 5176 if (!iter) 5177 return; 5178 5179 cpu_buffer = iter->cpu_buffer; 5180 5181 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5182 rb_iter_reset(iter); 5183 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5184 } 5185 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 5186 5187 /** 5188 * ring_buffer_iter_empty - check if an iterator has no more to read 5189 * @iter: The iterator to check 5190 */ 5191 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 5192 { 5193 struct ring_buffer_per_cpu *cpu_buffer; 5194 struct buffer_page *reader; 5195 struct buffer_page *head_page; 5196 struct buffer_page *commit_page; 5197 struct buffer_page *curr_commit_page; 5198 unsigned commit; 5199 u64 curr_commit_ts; 5200 u64 commit_ts; 5201 5202 cpu_buffer = iter->cpu_buffer; 5203 reader = cpu_buffer->reader_page; 5204 head_page = cpu_buffer->head_page; 5205 commit_page = READ_ONCE(cpu_buffer->commit_page); 5206 commit_ts = commit_page->page->time_stamp; 5207 5208 /* 5209 * When the writer goes across pages, it issues a cmpxchg which 5210 * is a mb(), which will synchronize with the rmb here. 5211 * (see rb_tail_page_update()) 5212 */ 5213 smp_rmb(); 5214 commit = rb_page_commit(commit_page); 5215 /* We want to make sure that the commit page doesn't change */ 5216 smp_rmb(); 5217 5218 /* Make sure commit page didn't change */ 5219 curr_commit_page = READ_ONCE(cpu_buffer->commit_page); 5220 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); 5221 5222 /* If the commit page changed, then there's more data */ 5223 if (curr_commit_page != commit_page || 5224 curr_commit_ts != commit_ts) 5225 return 0; 5226 5227 /* Still racy, as it may return a false positive, but that's OK */ 5228 return ((iter->head_page == commit_page && iter->head >= commit) || 5229 (iter->head_page == reader && commit_page == head_page && 5230 head_page->read == commit && 5231 iter->head == rb_page_size(cpu_buffer->reader_page))); 5232 } 5233 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 5234 5235 static void 5236 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 5237 struct ring_buffer_event *event) 5238 { 5239 u64 delta; 5240 5241 switch (event->type_len) { 5242 case RINGBUF_TYPE_PADDING: 5243 return; 5244 5245 case RINGBUF_TYPE_TIME_EXTEND: 5246 delta = rb_event_time_stamp(event); 5247 cpu_buffer->read_stamp += delta; 5248 return; 5249 5250 case RINGBUF_TYPE_TIME_STAMP: 5251 delta = rb_event_time_stamp(event); 5252 delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp); 5253 cpu_buffer->read_stamp = delta; 5254 return; 5255 5256 case RINGBUF_TYPE_DATA: 5257 cpu_buffer->read_stamp += event->time_delta; 5258 return; 5259 5260 default: 5261 RB_WARN_ON(cpu_buffer, 1); 5262 } 5263 } 5264 5265 static void 5266 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 5267 struct ring_buffer_event *event) 5268 { 5269 u64 delta; 5270 5271 switch (event->type_len) { 5272 case RINGBUF_TYPE_PADDING: 5273 return; 5274 5275 case RINGBUF_TYPE_TIME_EXTEND: 5276 delta = rb_event_time_stamp(event); 5277 iter->read_stamp += delta; 5278 return; 5279 5280 case RINGBUF_TYPE_TIME_STAMP: 5281 delta = rb_event_time_stamp(event); 5282 delta = rb_fix_abs_ts(delta, iter->read_stamp); 5283 iter->read_stamp = delta; 5284 return; 5285 5286 case RINGBUF_TYPE_DATA: 5287 iter->read_stamp += event->time_delta; 5288 return; 5289 5290 default: 5291 RB_WARN_ON(iter->cpu_buffer, 1); 5292 } 5293 } 5294 5295 static struct buffer_page * 5296 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 5297 { 5298 struct buffer_page *reader = NULL; 5299 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 5300 unsigned long overwrite; 5301 unsigned long flags; 5302 int nr_loops = 0; 5303 bool ret; 5304 5305 local_irq_save(flags); 5306 arch_spin_lock(&cpu_buffer->lock); 5307 5308 again: 5309 /* 5310 * This should normally only loop twice. But because the 5311 * start of the reader inserts an empty page, it causes 5312 * a case where we will loop three times. There should be no 5313 * reason to loop four times (that I know of). 5314 */ 5315 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 5316 reader = NULL; 5317 goto out; 5318 } 5319 5320 reader = cpu_buffer->reader_page; 5321 5322 /* If there's more to read, return this page */ 5323 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 5324 goto out; 5325 5326 /* Never should we have an index greater than the size */ 5327 if (RB_WARN_ON(cpu_buffer, 5328 cpu_buffer->reader_page->read > rb_page_size(reader))) 5329 goto out; 5330 5331 /* check if we caught up to the tail */ 5332 reader = NULL; 5333 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 5334 goto out; 5335 5336 /* Don't bother swapping if the ring buffer is empty */ 5337 if (rb_num_of_entries(cpu_buffer) == 0) 5338 goto out; 5339 5340 /* 5341 * Reset the reader page to size zero. 5342 */ 5343 local_set(&cpu_buffer->reader_page->write, 0); 5344 local_set(&cpu_buffer->reader_page->entries, 0); 5345 local_set(&cpu_buffer->reader_page->page->commit, 0); 5346 cpu_buffer->reader_page->real_end = 0; 5347 5348 spin: 5349 /* 5350 * Splice the empty reader page into the list around the head. 5351 */ 5352 reader = rb_set_head_page(cpu_buffer); 5353 if (!reader) 5354 goto out; 5355 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 5356 cpu_buffer->reader_page->list.prev = reader->list.prev; 5357 5358 /* 5359 * cpu_buffer->pages just needs to point to the buffer, it 5360 * has no specific buffer page to point to. Lets move it out 5361 * of our way so we don't accidentally swap it. 5362 */ 5363 cpu_buffer->pages = reader->list.prev; 5364 5365 /* The reader page will be pointing to the new head */ 5366 rb_set_list_to_head(&cpu_buffer->reader_page->list); 5367 5368 /* 5369 * We want to make sure we read the overruns after we set up our 5370 * pointers to the next object. The writer side does a 5371 * cmpxchg to cross pages which acts as the mb on the writer 5372 * side. Note, the reader will constantly fail the swap 5373 * while the writer is updating the pointers, so this 5374 * guarantees that the overwrite recorded here is the one we 5375 * want to compare with the last_overrun. 5376 */ 5377 smp_mb(); 5378 overwrite = local_read(&(cpu_buffer->overrun)); 5379 5380 /* 5381 * Here's the tricky part. 5382 * 5383 * We need to move the pointer past the header page. 5384 * But we can only do that if a writer is not currently 5385 * moving it. The page before the header page has the 5386 * flag bit '1' set if it is pointing to the page we want. 5387 * but if the writer is in the process of moving it 5388 * then it will be '2' or already moved '0'. 5389 */ 5390 5391 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 5392 5393 /* 5394 * If we did not convert it, then we must try again. 5395 */ 5396 if (!ret) 5397 goto spin; 5398 5399 if (cpu_buffer->ring_meta) 5400 rb_update_meta_reader(cpu_buffer, reader); 5401 5402 /* 5403 * Yay! We succeeded in replacing the page. 5404 * 5405 * Now make the new head point back to the reader page. 5406 */ 5407 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 5408 rb_inc_page(&cpu_buffer->head_page); 5409 5410 cpu_buffer->cnt++; 5411 local_inc(&cpu_buffer->pages_read); 5412 5413 /* Finally update the reader page to the new head */ 5414 cpu_buffer->reader_page = reader; 5415 cpu_buffer->reader_page->read = 0; 5416 5417 if (overwrite != cpu_buffer->last_overrun) { 5418 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 5419 cpu_buffer->last_overrun = overwrite; 5420 } 5421 5422 goto again; 5423 5424 out: 5425 /* Update the read_stamp on the first event */ 5426 if (reader && reader->read == 0) 5427 cpu_buffer->read_stamp = reader->page->time_stamp; 5428 5429 arch_spin_unlock(&cpu_buffer->lock); 5430 local_irq_restore(flags); 5431 5432 /* 5433 * The writer has preempt disable, wait for it. But not forever 5434 * Although, 1 second is pretty much "forever" 5435 */ 5436 #define USECS_WAIT 1000000 5437 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) { 5438 /* If the write is past the end of page, a writer is still updating it */ 5439 if (likely(!reader || rb_page_write(reader) <= bsize)) 5440 break; 5441 5442 udelay(1); 5443 5444 /* Get the latest version of the reader write value */ 5445 smp_rmb(); 5446 } 5447 5448 /* The writer is not moving forward? Something is wrong */ 5449 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT)) 5450 reader = NULL; 5451 5452 /* 5453 * Make sure we see any padding after the write update 5454 * (see rb_reset_tail()). 5455 * 5456 * In addition, a writer may be writing on the reader page 5457 * if the page has not been fully filled, so the read barrier 5458 * is also needed to make sure we see the content of what is 5459 * committed by the writer (see rb_set_commit_to_write()). 5460 */ 5461 smp_rmb(); 5462 5463 5464 return reader; 5465 } 5466 5467 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 5468 { 5469 struct ring_buffer_event *event; 5470 struct buffer_page *reader; 5471 unsigned length; 5472 5473 reader = rb_get_reader_page(cpu_buffer); 5474 5475 /* This function should not be called when buffer is empty */ 5476 if (RB_WARN_ON(cpu_buffer, !reader)) 5477 return; 5478 5479 event = rb_reader_event(cpu_buffer); 5480 5481 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 5482 cpu_buffer->read++; 5483 5484 rb_update_read_stamp(cpu_buffer, event); 5485 5486 length = rb_event_length(event); 5487 cpu_buffer->reader_page->read += length; 5488 cpu_buffer->read_bytes += length; 5489 } 5490 5491 static void rb_advance_iter(struct ring_buffer_iter *iter) 5492 { 5493 struct ring_buffer_per_cpu *cpu_buffer; 5494 5495 cpu_buffer = iter->cpu_buffer; 5496 5497 /* If head == next_event then we need to jump to the next event */ 5498 if (iter->head == iter->next_event) { 5499 /* If the event gets overwritten again, there's nothing to do */ 5500 if (rb_iter_head_event(iter) == NULL) 5501 return; 5502 } 5503 5504 iter->head = iter->next_event; 5505 5506 /* 5507 * Check if we are at the end of the buffer. 5508 */ 5509 if (iter->next_event >= rb_page_size(iter->head_page)) { 5510 /* discarded commits can make the page empty */ 5511 if (iter->head_page == cpu_buffer->commit_page) 5512 return; 5513 rb_inc_iter(iter); 5514 return; 5515 } 5516 5517 rb_update_iter_read_stamp(iter, iter->event); 5518 } 5519 5520 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 5521 { 5522 return cpu_buffer->lost_events; 5523 } 5524 5525 static struct ring_buffer_event * 5526 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 5527 unsigned long *lost_events) 5528 { 5529 struct ring_buffer_event *event; 5530 struct buffer_page *reader; 5531 int nr_loops = 0; 5532 5533 if (ts) 5534 *ts = 0; 5535 again: 5536 /* 5537 * We repeat when a time extend is encountered. 5538 * Since the time extend is always attached to a data event, 5539 * we should never loop more than once. 5540 * (We never hit the following condition more than twice). 5541 */ 5542 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 5543 return NULL; 5544 5545 reader = rb_get_reader_page(cpu_buffer); 5546 if (!reader) 5547 return NULL; 5548 5549 event = rb_reader_event(cpu_buffer); 5550 5551 switch (event->type_len) { 5552 case RINGBUF_TYPE_PADDING: 5553 if (rb_null_event(event)) 5554 RB_WARN_ON(cpu_buffer, 1); 5555 /* 5556 * Because the writer could be discarding every 5557 * event it creates (which would probably be bad) 5558 * if we were to go back to "again" then we may never 5559 * catch up, and will trigger the warn on, or lock 5560 * the box. Return the padding, and we will release 5561 * the current locks, and try again. 5562 */ 5563 return event; 5564 5565 case RINGBUF_TYPE_TIME_EXTEND: 5566 /* Internal data, OK to advance */ 5567 rb_advance_reader(cpu_buffer); 5568 goto again; 5569 5570 case RINGBUF_TYPE_TIME_STAMP: 5571 if (ts) { 5572 *ts = rb_event_time_stamp(event); 5573 *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp); 5574 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5575 cpu_buffer->cpu, ts); 5576 } 5577 /* Internal data, OK to advance */ 5578 rb_advance_reader(cpu_buffer); 5579 goto again; 5580 5581 case RINGBUF_TYPE_DATA: 5582 if (ts && !(*ts)) { 5583 *ts = cpu_buffer->read_stamp + event->time_delta; 5584 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5585 cpu_buffer->cpu, ts); 5586 } 5587 if (lost_events) 5588 *lost_events = rb_lost_events(cpu_buffer); 5589 return event; 5590 5591 default: 5592 RB_WARN_ON(cpu_buffer, 1); 5593 } 5594 5595 return NULL; 5596 } 5597 EXPORT_SYMBOL_GPL(ring_buffer_peek); 5598 5599 static struct ring_buffer_event * 5600 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5601 { 5602 struct trace_buffer *buffer; 5603 struct ring_buffer_per_cpu *cpu_buffer; 5604 struct ring_buffer_event *event; 5605 int nr_loops = 0; 5606 5607 if (ts) 5608 *ts = 0; 5609 5610 cpu_buffer = iter->cpu_buffer; 5611 buffer = cpu_buffer->buffer; 5612 5613 /* 5614 * Check if someone performed a consuming read to the buffer 5615 * or removed some pages from the buffer. In these cases, 5616 * iterator was invalidated and we need to reset it. 5617 */ 5618 if (unlikely(iter->cache_read != cpu_buffer->read || 5619 iter->cache_reader_page != cpu_buffer->reader_page || 5620 iter->cache_pages_removed != cpu_buffer->pages_removed)) 5621 rb_iter_reset(iter); 5622 5623 again: 5624 if (ring_buffer_iter_empty(iter)) 5625 return NULL; 5626 5627 /* 5628 * As the writer can mess with what the iterator is trying 5629 * to read, just give up if we fail to get an event after 5630 * three tries. The iterator is not as reliable when reading 5631 * the ring buffer with an active write as the consumer is. 5632 * Do not warn if the three failures is reached. 5633 */ 5634 if (++nr_loops > 3) 5635 return NULL; 5636 5637 if (rb_per_cpu_empty(cpu_buffer)) 5638 return NULL; 5639 5640 if (iter->head >= rb_page_size(iter->head_page)) { 5641 rb_inc_iter(iter); 5642 goto again; 5643 } 5644 5645 event = rb_iter_head_event(iter); 5646 if (!event) 5647 goto again; 5648 5649 switch (event->type_len) { 5650 case RINGBUF_TYPE_PADDING: 5651 if (rb_null_event(event)) { 5652 rb_inc_iter(iter); 5653 goto again; 5654 } 5655 rb_advance_iter(iter); 5656 return event; 5657 5658 case RINGBUF_TYPE_TIME_EXTEND: 5659 /* Internal data, OK to advance */ 5660 rb_advance_iter(iter); 5661 goto again; 5662 5663 case RINGBUF_TYPE_TIME_STAMP: 5664 if (ts) { 5665 *ts = rb_event_time_stamp(event); 5666 *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp); 5667 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5668 cpu_buffer->cpu, ts); 5669 } 5670 /* Internal data, OK to advance */ 5671 rb_advance_iter(iter); 5672 goto again; 5673 5674 case RINGBUF_TYPE_DATA: 5675 if (ts && !(*ts)) { 5676 *ts = iter->read_stamp + event->time_delta; 5677 ring_buffer_normalize_time_stamp(buffer, 5678 cpu_buffer->cpu, ts); 5679 } 5680 return event; 5681 5682 default: 5683 RB_WARN_ON(cpu_buffer, 1); 5684 } 5685 5686 return NULL; 5687 } 5688 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 5689 5690 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 5691 { 5692 if (likely(!in_nmi())) { 5693 raw_spin_lock(&cpu_buffer->reader_lock); 5694 return true; 5695 } 5696 5697 /* 5698 * If an NMI die dumps out the content of the ring buffer 5699 * trylock must be used to prevent a deadlock if the NMI 5700 * preempted a task that holds the ring buffer locks. If 5701 * we get the lock then all is fine, if not, then continue 5702 * to do the read, but this can corrupt the ring buffer, 5703 * so it must be permanently disabled from future writes. 5704 * Reading from NMI is a oneshot deal. 5705 */ 5706 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 5707 return true; 5708 5709 /* Continue without locking, but disable the ring buffer */ 5710 atomic_inc(&cpu_buffer->record_disabled); 5711 return false; 5712 } 5713 5714 static inline void 5715 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 5716 { 5717 if (likely(locked)) 5718 raw_spin_unlock(&cpu_buffer->reader_lock); 5719 } 5720 5721 /** 5722 * ring_buffer_peek - peek at the next event to be read 5723 * @buffer: The ring buffer to read 5724 * @cpu: The cpu to peak at 5725 * @ts: The timestamp counter of this event. 5726 * @lost_events: a variable to store if events were lost (may be NULL) 5727 * 5728 * This will return the event that will be read next, but does 5729 * not consume the data. 5730 */ 5731 struct ring_buffer_event * 5732 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, 5733 unsigned long *lost_events) 5734 { 5735 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5736 struct ring_buffer_event *event; 5737 unsigned long flags; 5738 bool dolock; 5739 5740 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5741 return NULL; 5742 5743 again: 5744 local_irq_save(flags); 5745 dolock = rb_reader_lock(cpu_buffer); 5746 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5747 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5748 rb_advance_reader(cpu_buffer); 5749 rb_reader_unlock(cpu_buffer, dolock); 5750 local_irq_restore(flags); 5751 5752 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5753 goto again; 5754 5755 return event; 5756 } 5757 5758 /** ring_buffer_iter_dropped - report if there are dropped events 5759 * @iter: The ring buffer iterator 5760 * 5761 * Returns true if there was dropped events since the last peek. 5762 */ 5763 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) 5764 { 5765 bool ret = iter->missed_events != 0; 5766 5767 iter->missed_events = 0; 5768 return ret; 5769 } 5770 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); 5771 5772 /** 5773 * ring_buffer_iter_peek - peek at the next event to be read 5774 * @iter: The ring buffer iterator 5775 * @ts: The timestamp counter of this event. 5776 * 5777 * This will return the event that will be read next, but does 5778 * not increment the iterator. 5779 */ 5780 struct ring_buffer_event * 5781 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5782 { 5783 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5784 struct ring_buffer_event *event; 5785 unsigned long flags; 5786 5787 again: 5788 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5789 event = rb_iter_peek(iter, ts); 5790 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5791 5792 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5793 goto again; 5794 5795 return event; 5796 } 5797 5798 /** 5799 * ring_buffer_consume - return an event and consume it 5800 * @buffer: The ring buffer to get the next event from 5801 * @cpu: the cpu to read the buffer from 5802 * @ts: a variable to store the timestamp (may be NULL) 5803 * @lost_events: a variable to store if events were lost (may be NULL) 5804 * 5805 * Returns the next event in the ring buffer, and that event is consumed. 5806 * Meaning, that sequential reads will keep returning a different event, 5807 * and eventually empty the ring buffer if the producer is slower. 5808 */ 5809 struct ring_buffer_event * 5810 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts, 5811 unsigned long *lost_events) 5812 { 5813 struct ring_buffer_per_cpu *cpu_buffer; 5814 struct ring_buffer_event *event = NULL; 5815 unsigned long flags; 5816 bool dolock; 5817 5818 again: 5819 /* might be called in atomic */ 5820 preempt_disable(); 5821 5822 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5823 goto out; 5824 5825 cpu_buffer = buffer->buffers[cpu]; 5826 local_irq_save(flags); 5827 dolock = rb_reader_lock(cpu_buffer); 5828 5829 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5830 if (event) { 5831 cpu_buffer->lost_events = 0; 5832 rb_advance_reader(cpu_buffer); 5833 } 5834 5835 rb_reader_unlock(cpu_buffer, dolock); 5836 local_irq_restore(flags); 5837 5838 out: 5839 preempt_enable(); 5840 5841 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5842 goto again; 5843 5844 return event; 5845 } 5846 EXPORT_SYMBOL_GPL(ring_buffer_consume); 5847 5848 /** 5849 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer 5850 * @buffer: The ring buffer to read from 5851 * @cpu: The cpu buffer to iterate over 5852 * @flags: gfp flags to use for memory allocation 5853 * 5854 * This performs the initial preparations necessary to iterate 5855 * through the buffer. Memory is allocated, buffer resizing 5856 * is disabled, and the iterator pointer is returned to the caller. 5857 * 5858 * After a sequence of ring_buffer_read_prepare calls, the user is 5859 * expected to make at least one call to ring_buffer_read_prepare_sync. 5860 * Afterwards, ring_buffer_read_start is invoked to get things going 5861 * for real. 5862 * 5863 * This overall must be paired with ring_buffer_read_finish. 5864 */ 5865 struct ring_buffer_iter * 5866 ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags) 5867 { 5868 struct ring_buffer_per_cpu *cpu_buffer; 5869 struct ring_buffer_iter *iter; 5870 5871 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5872 return NULL; 5873 5874 iter = kzalloc(sizeof(*iter), flags); 5875 if (!iter) 5876 return NULL; 5877 5878 /* Holds the entire event: data and meta data */ 5879 iter->event_size = buffer->subbuf_size; 5880 iter->event = kmalloc(iter->event_size, flags); 5881 if (!iter->event) { 5882 kfree(iter); 5883 return NULL; 5884 } 5885 5886 cpu_buffer = buffer->buffers[cpu]; 5887 5888 iter->cpu_buffer = cpu_buffer; 5889 5890 atomic_inc(&cpu_buffer->resize_disabled); 5891 5892 return iter; 5893 } 5894 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare); 5895 5896 /** 5897 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls 5898 * 5899 * All previously invoked ring_buffer_read_prepare calls to prepare 5900 * iterators will be synchronized. Afterwards, read_buffer_read_start 5901 * calls on those iterators are allowed. 5902 */ 5903 void 5904 ring_buffer_read_prepare_sync(void) 5905 { 5906 synchronize_rcu(); 5907 } 5908 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync); 5909 5910 /** 5911 * ring_buffer_read_start - start a non consuming read of the buffer 5912 * @iter: The iterator returned by ring_buffer_read_prepare 5913 * 5914 * This finalizes the startup of an iteration through the buffer. 5915 * The iterator comes from a call to ring_buffer_read_prepare and 5916 * an intervening ring_buffer_read_prepare_sync must have been 5917 * performed. 5918 * 5919 * Must be paired with ring_buffer_read_finish. 5920 */ 5921 void 5922 ring_buffer_read_start(struct ring_buffer_iter *iter) 5923 { 5924 struct ring_buffer_per_cpu *cpu_buffer; 5925 unsigned long flags; 5926 5927 if (!iter) 5928 return; 5929 5930 cpu_buffer = iter->cpu_buffer; 5931 5932 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5933 arch_spin_lock(&cpu_buffer->lock); 5934 rb_iter_reset(iter); 5935 arch_spin_unlock(&cpu_buffer->lock); 5936 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5937 } 5938 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 5939 5940 /** 5941 * ring_buffer_read_finish - finish reading the iterator of the buffer 5942 * @iter: The iterator retrieved by ring_buffer_start 5943 * 5944 * This re-enables resizing of the buffer, and frees the iterator. 5945 */ 5946 void 5947 ring_buffer_read_finish(struct ring_buffer_iter *iter) 5948 { 5949 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5950 5951 /* Use this opportunity to check the integrity of the ring buffer. */ 5952 rb_check_pages(cpu_buffer); 5953 5954 atomic_dec(&cpu_buffer->resize_disabled); 5955 kfree(iter->event); 5956 kfree(iter); 5957 } 5958 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 5959 5960 /** 5961 * ring_buffer_iter_advance - advance the iterator to the next location 5962 * @iter: The ring buffer iterator 5963 * 5964 * Move the location of the iterator such that the next read will 5965 * be the next location of the iterator. 5966 */ 5967 void ring_buffer_iter_advance(struct ring_buffer_iter *iter) 5968 { 5969 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5970 unsigned long flags; 5971 5972 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5973 5974 rb_advance_iter(iter); 5975 5976 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5977 } 5978 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); 5979 5980 /** 5981 * ring_buffer_size - return the size of the ring buffer (in bytes) 5982 * @buffer: The ring buffer. 5983 * @cpu: The CPU to get ring buffer size from. 5984 */ 5985 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu) 5986 { 5987 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5988 return 0; 5989 5990 return buffer->subbuf_size * buffer->buffers[cpu]->nr_pages; 5991 } 5992 EXPORT_SYMBOL_GPL(ring_buffer_size); 5993 5994 /** 5995 * ring_buffer_max_event_size - return the max data size of an event 5996 * @buffer: The ring buffer. 5997 * 5998 * Returns the maximum size an event can be. 5999 */ 6000 unsigned long ring_buffer_max_event_size(struct trace_buffer *buffer) 6001 { 6002 /* If abs timestamp is requested, events have a timestamp too */ 6003 if (ring_buffer_time_stamp_abs(buffer)) 6004 return buffer->max_data_size - RB_LEN_TIME_EXTEND; 6005 return buffer->max_data_size; 6006 } 6007 EXPORT_SYMBOL_GPL(ring_buffer_max_event_size); 6008 6009 static void rb_clear_buffer_page(struct buffer_page *page) 6010 { 6011 local_set(&page->write, 0); 6012 local_set(&page->entries, 0); 6013 rb_init_page(page->page); 6014 page->read = 0; 6015 } 6016 6017 /* 6018 * When the buffer is memory mapped to user space, each sub buffer 6019 * has a unique id that is used by the meta data to tell the user 6020 * where the current reader page is. 6021 * 6022 * For a normal allocated ring buffer, the id is saved in the buffer page 6023 * id field, and updated via this function. 6024 * 6025 * But for a fixed memory mapped buffer, the id is already assigned for 6026 * fixed memory ording in the memory layout and can not be used. Instead 6027 * the index of where the page lies in the memory layout is used. 6028 * 6029 * For the normal pages, set the buffer page id with the passed in @id 6030 * value and return that. 6031 * 6032 * For fixed memory mapped pages, get the page index in the memory layout 6033 * and return that as the id. 6034 */ 6035 static int rb_page_id(struct ring_buffer_per_cpu *cpu_buffer, 6036 struct buffer_page *bpage, int id) 6037 { 6038 /* 6039 * For boot buffers, the id is the index, 6040 * otherwise, set the buffer page with this id 6041 */ 6042 if (cpu_buffer->ring_meta) 6043 id = rb_meta_subbuf_idx(cpu_buffer->ring_meta, bpage->page); 6044 else 6045 bpage->id = id; 6046 6047 return id; 6048 } 6049 6050 static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6051 { 6052 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 6053 6054 if (!meta) 6055 return; 6056 6057 meta->reader.read = cpu_buffer->reader_page->read; 6058 meta->reader.id = rb_page_id(cpu_buffer, cpu_buffer->reader_page, 6059 cpu_buffer->reader_page->id); 6060 6061 meta->reader.lost_events = cpu_buffer->lost_events; 6062 6063 meta->entries = local_read(&cpu_buffer->entries); 6064 meta->overrun = local_read(&cpu_buffer->overrun); 6065 meta->read = cpu_buffer->read; 6066 6067 /* Some archs do not have data cache coherency between kernel and user-space */ 6068 flush_kernel_vmap_range(cpu_buffer->meta_page, PAGE_SIZE); 6069 } 6070 6071 static void 6072 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 6073 { 6074 struct buffer_page *page; 6075 6076 rb_head_page_deactivate(cpu_buffer); 6077 6078 cpu_buffer->head_page 6079 = list_entry(cpu_buffer->pages, struct buffer_page, list); 6080 rb_clear_buffer_page(cpu_buffer->head_page); 6081 list_for_each_entry(page, cpu_buffer->pages, list) { 6082 rb_clear_buffer_page(page); 6083 } 6084 6085 cpu_buffer->tail_page = cpu_buffer->head_page; 6086 cpu_buffer->commit_page = cpu_buffer->head_page; 6087 6088 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 6089 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6090 rb_clear_buffer_page(cpu_buffer->reader_page); 6091 6092 local_set(&cpu_buffer->entries_bytes, 0); 6093 local_set(&cpu_buffer->overrun, 0); 6094 local_set(&cpu_buffer->commit_overrun, 0); 6095 local_set(&cpu_buffer->dropped_events, 0); 6096 local_set(&cpu_buffer->entries, 0); 6097 local_set(&cpu_buffer->committing, 0); 6098 local_set(&cpu_buffer->commits, 0); 6099 local_set(&cpu_buffer->pages_touched, 0); 6100 local_set(&cpu_buffer->pages_lost, 0); 6101 local_set(&cpu_buffer->pages_read, 0); 6102 cpu_buffer->last_pages_touch = 0; 6103 cpu_buffer->shortest_full = 0; 6104 cpu_buffer->read = 0; 6105 cpu_buffer->read_bytes = 0; 6106 6107 rb_time_set(&cpu_buffer->write_stamp, 0); 6108 rb_time_set(&cpu_buffer->before_stamp, 0); 6109 6110 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp)); 6111 6112 cpu_buffer->lost_events = 0; 6113 cpu_buffer->last_overrun = 0; 6114 6115 rb_head_page_activate(cpu_buffer); 6116 cpu_buffer->pages_removed = 0; 6117 6118 if (cpu_buffer->mapped) { 6119 rb_update_meta_page(cpu_buffer); 6120 if (cpu_buffer->ring_meta) { 6121 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 6122 meta->commit_buffer = meta->head_buffer; 6123 } 6124 } 6125 } 6126 6127 /* Must have disabled the cpu buffer then done a synchronize_rcu */ 6128 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 6129 { 6130 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 6131 6132 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 6133 return; 6134 6135 arch_spin_lock(&cpu_buffer->lock); 6136 6137 rb_reset_cpu(cpu_buffer); 6138 6139 arch_spin_unlock(&cpu_buffer->lock); 6140 } 6141 6142 /** 6143 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 6144 * @buffer: The ring buffer to reset a per cpu buffer of 6145 * @cpu: The CPU buffer to be reset 6146 */ 6147 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) 6148 { 6149 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6150 6151 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6152 return; 6153 6154 /* prevent another thread from changing buffer sizes */ 6155 mutex_lock(&buffer->mutex); 6156 6157 atomic_inc(&cpu_buffer->resize_disabled); 6158 atomic_inc(&cpu_buffer->record_disabled); 6159 6160 /* Make sure all commits have finished */ 6161 synchronize_rcu(); 6162 6163 reset_disabled_cpu_buffer(cpu_buffer); 6164 6165 atomic_dec(&cpu_buffer->record_disabled); 6166 atomic_dec(&cpu_buffer->resize_disabled); 6167 6168 mutex_unlock(&buffer->mutex); 6169 } 6170 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 6171 6172 /* Flag to ensure proper resetting of atomic variables */ 6173 #define RESET_BIT (1 << 30) 6174 6175 /** 6176 * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer 6177 * @buffer: The ring buffer to reset a per cpu buffer of 6178 */ 6179 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) 6180 { 6181 struct ring_buffer_per_cpu *cpu_buffer; 6182 int cpu; 6183 6184 /* prevent another thread from changing buffer sizes */ 6185 mutex_lock(&buffer->mutex); 6186 6187 for_each_online_buffer_cpu(buffer, cpu) { 6188 cpu_buffer = buffer->buffers[cpu]; 6189 6190 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled); 6191 atomic_inc(&cpu_buffer->record_disabled); 6192 } 6193 6194 /* Make sure all commits have finished */ 6195 synchronize_rcu(); 6196 6197 for_each_buffer_cpu(buffer, cpu) { 6198 cpu_buffer = buffer->buffers[cpu]; 6199 6200 /* 6201 * If a CPU came online during the synchronize_rcu(), then 6202 * ignore it. 6203 */ 6204 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT)) 6205 continue; 6206 6207 reset_disabled_cpu_buffer(cpu_buffer); 6208 6209 atomic_dec(&cpu_buffer->record_disabled); 6210 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled); 6211 } 6212 6213 mutex_unlock(&buffer->mutex); 6214 } 6215 6216 /** 6217 * ring_buffer_reset - reset a ring buffer 6218 * @buffer: The ring buffer to reset all cpu buffers 6219 */ 6220 void ring_buffer_reset(struct trace_buffer *buffer) 6221 { 6222 struct ring_buffer_per_cpu *cpu_buffer; 6223 int cpu; 6224 6225 /* prevent another thread from changing buffer sizes */ 6226 mutex_lock(&buffer->mutex); 6227 6228 for_each_buffer_cpu(buffer, cpu) { 6229 cpu_buffer = buffer->buffers[cpu]; 6230 6231 atomic_inc(&cpu_buffer->resize_disabled); 6232 atomic_inc(&cpu_buffer->record_disabled); 6233 } 6234 6235 /* Make sure all commits have finished */ 6236 synchronize_rcu(); 6237 6238 for_each_buffer_cpu(buffer, cpu) { 6239 cpu_buffer = buffer->buffers[cpu]; 6240 6241 reset_disabled_cpu_buffer(cpu_buffer); 6242 6243 atomic_dec(&cpu_buffer->record_disabled); 6244 atomic_dec(&cpu_buffer->resize_disabled); 6245 } 6246 6247 mutex_unlock(&buffer->mutex); 6248 } 6249 EXPORT_SYMBOL_GPL(ring_buffer_reset); 6250 6251 /** 6252 * ring_buffer_empty - is the ring buffer empty? 6253 * @buffer: The ring buffer to test 6254 */ 6255 bool ring_buffer_empty(struct trace_buffer *buffer) 6256 { 6257 struct ring_buffer_per_cpu *cpu_buffer; 6258 unsigned long flags; 6259 bool dolock; 6260 bool ret; 6261 int cpu; 6262 6263 /* yes this is racy, but if you don't like the race, lock the buffer */ 6264 for_each_buffer_cpu(buffer, cpu) { 6265 cpu_buffer = buffer->buffers[cpu]; 6266 local_irq_save(flags); 6267 dolock = rb_reader_lock(cpu_buffer); 6268 ret = rb_per_cpu_empty(cpu_buffer); 6269 rb_reader_unlock(cpu_buffer, dolock); 6270 local_irq_restore(flags); 6271 6272 if (!ret) 6273 return false; 6274 } 6275 6276 return true; 6277 } 6278 EXPORT_SYMBOL_GPL(ring_buffer_empty); 6279 6280 /** 6281 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 6282 * @buffer: The ring buffer 6283 * @cpu: The CPU buffer to test 6284 */ 6285 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) 6286 { 6287 struct ring_buffer_per_cpu *cpu_buffer; 6288 unsigned long flags; 6289 bool dolock; 6290 bool ret; 6291 6292 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6293 return true; 6294 6295 cpu_buffer = buffer->buffers[cpu]; 6296 local_irq_save(flags); 6297 dolock = rb_reader_lock(cpu_buffer); 6298 ret = rb_per_cpu_empty(cpu_buffer); 6299 rb_reader_unlock(cpu_buffer, dolock); 6300 local_irq_restore(flags); 6301 6302 return ret; 6303 } 6304 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 6305 6306 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 6307 /** 6308 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 6309 * @buffer_a: One buffer to swap with 6310 * @buffer_b: The other buffer to swap with 6311 * @cpu: the CPU of the buffers to swap 6312 * 6313 * This function is useful for tracers that want to take a "snapshot" 6314 * of a CPU buffer and has another back up buffer lying around. 6315 * it is expected that the tracer handles the cpu buffer not being 6316 * used at the moment. 6317 */ 6318 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, 6319 struct trace_buffer *buffer_b, int cpu) 6320 { 6321 struct ring_buffer_per_cpu *cpu_buffer_a; 6322 struct ring_buffer_per_cpu *cpu_buffer_b; 6323 int ret = -EINVAL; 6324 6325 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 6326 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 6327 return -EINVAL; 6328 6329 cpu_buffer_a = buffer_a->buffers[cpu]; 6330 cpu_buffer_b = buffer_b->buffers[cpu]; 6331 6332 /* It's up to the callers to not try to swap mapped buffers */ 6333 if (WARN_ON_ONCE(cpu_buffer_a->mapped || cpu_buffer_b->mapped)) 6334 return -EBUSY; 6335 6336 /* At least make sure the two buffers are somewhat the same */ 6337 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 6338 return -EINVAL; 6339 6340 if (buffer_a->subbuf_order != buffer_b->subbuf_order) 6341 return -EINVAL; 6342 6343 if (atomic_read(&buffer_a->record_disabled)) 6344 return -EAGAIN; 6345 6346 if (atomic_read(&buffer_b->record_disabled)) 6347 return -EAGAIN; 6348 6349 if (atomic_read(&cpu_buffer_a->record_disabled)) 6350 return -EAGAIN; 6351 6352 if (atomic_read(&cpu_buffer_b->record_disabled)) 6353 return -EAGAIN; 6354 6355 /* 6356 * We can't do a synchronize_rcu here because this 6357 * function can be called in atomic context. 6358 * Normally this will be called from the same CPU as cpu. 6359 * If not it's up to the caller to protect this. 6360 */ 6361 atomic_inc(&cpu_buffer_a->record_disabled); 6362 atomic_inc(&cpu_buffer_b->record_disabled); 6363 6364 ret = -EBUSY; 6365 if (local_read(&cpu_buffer_a->committing)) 6366 goto out_dec; 6367 if (local_read(&cpu_buffer_b->committing)) 6368 goto out_dec; 6369 6370 /* 6371 * When resize is in progress, we cannot swap it because 6372 * it will mess the state of the cpu buffer. 6373 */ 6374 if (atomic_read(&buffer_a->resizing)) 6375 goto out_dec; 6376 if (atomic_read(&buffer_b->resizing)) 6377 goto out_dec; 6378 6379 buffer_a->buffers[cpu] = cpu_buffer_b; 6380 buffer_b->buffers[cpu] = cpu_buffer_a; 6381 6382 cpu_buffer_b->buffer = buffer_a; 6383 cpu_buffer_a->buffer = buffer_b; 6384 6385 ret = 0; 6386 6387 out_dec: 6388 atomic_dec(&cpu_buffer_a->record_disabled); 6389 atomic_dec(&cpu_buffer_b->record_disabled); 6390 return ret; 6391 } 6392 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 6393 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 6394 6395 /** 6396 * ring_buffer_alloc_read_page - allocate a page to read from buffer 6397 * @buffer: the buffer to allocate for. 6398 * @cpu: the cpu buffer to allocate. 6399 * 6400 * This function is used in conjunction with ring_buffer_read_page. 6401 * When reading a full page from the ring buffer, these functions 6402 * can be used to speed up the process. The calling function should 6403 * allocate a few pages first with this function. Then when it 6404 * needs to get pages from the ring buffer, it passes the result 6405 * of this function into ring_buffer_read_page, which will swap 6406 * the page that was allocated, with the read page of the buffer. 6407 * 6408 * Returns: 6409 * The page allocated, or ERR_PTR 6410 */ 6411 struct buffer_data_read_page * 6412 ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 6413 { 6414 struct ring_buffer_per_cpu *cpu_buffer; 6415 struct buffer_data_read_page *bpage = NULL; 6416 unsigned long flags; 6417 struct page *page; 6418 6419 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6420 return ERR_PTR(-ENODEV); 6421 6422 bpage = kzalloc(sizeof(*bpage), GFP_KERNEL); 6423 if (!bpage) 6424 return ERR_PTR(-ENOMEM); 6425 6426 bpage->order = buffer->subbuf_order; 6427 cpu_buffer = buffer->buffers[cpu]; 6428 local_irq_save(flags); 6429 arch_spin_lock(&cpu_buffer->lock); 6430 6431 if (cpu_buffer->free_page) { 6432 bpage->data = cpu_buffer->free_page; 6433 cpu_buffer->free_page = NULL; 6434 } 6435 6436 arch_spin_unlock(&cpu_buffer->lock); 6437 local_irq_restore(flags); 6438 6439 if (bpage->data) 6440 goto out; 6441 6442 page = alloc_pages_node(cpu_to_node(cpu), 6443 GFP_KERNEL | __GFP_NORETRY | __GFP_COMP | __GFP_ZERO, 6444 cpu_buffer->buffer->subbuf_order); 6445 if (!page) { 6446 kfree(bpage); 6447 return ERR_PTR(-ENOMEM); 6448 } 6449 6450 bpage->data = page_address(page); 6451 6452 out: 6453 rb_init_page(bpage->data); 6454 6455 return bpage; 6456 } 6457 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 6458 6459 /** 6460 * ring_buffer_free_read_page - free an allocated read page 6461 * @buffer: the buffer the page was allocate for 6462 * @cpu: the cpu buffer the page came from 6463 * @data_page: the page to free 6464 * 6465 * Free a page allocated from ring_buffer_alloc_read_page. 6466 */ 6467 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, 6468 struct buffer_data_read_page *data_page) 6469 { 6470 struct ring_buffer_per_cpu *cpu_buffer; 6471 struct buffer_data_page *bpage = data_page->data; 6472 struct page *page = virt_to_page(bpage); 6473 unsigned long flags; 6474 6475 if (!buffer || !buffer->buffers || !buffer->buffers[cpu]) 6476 return; 6477 6478 cpu_buffer = buffer->buffers[cpu]; 6479 6480 /* 6481 * If the page is still in use someplace else, or order of the page 6482 * is different from the subbuffer order of the buffer - 6483 * we can't reuse it 6484 */ 6485 if (page_ref_count(page) > 1 || data_page->order != buffer->subbuf_order) 6486 goto out; 6487 6488 local_irq_save(flags); 6489 arch_spin_lock(&cpu_buffer->lock); 6490 6491 if (!cpu_buffer->free_page) { 6492 cpu_buffer->free_page = bpage; 6493 bpage = NULL; 6494 } 6495 6496 arch_spin_unlock(&cpu_buffer->lock); 6497 local_irq_restore(flags); 6498 6499 out: 6500 free_pages((unsigned long)bpage, data_page->order); 6501 kfree(data_page); 6502 } 6503 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 6504 6505 /** 6506 * ring_buffer_read_page - extract a page from the ring buffer 6507 * @buffer: buffer to extract from 6508 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 6509 * @len: amount to extract 6510 * @cpu: the cpu of the buffer to extract 6511 * @full: should the extraction only happen when the page is full. 6512 * 6513 * This function will pull out a page from the ring buffer and consume it. 6514 * @data_page must be the address of the variable that was returned 6515 * from ring_buffer_alloc_read_page. This is because the page might be used 6516 * to swap with a page in the ring buffer. 6517 * 6518 * for example: 6519 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 6520 * if (IS_ERR(rpage)) 6521 * return PTR_ERR(rpage); 6522 * ret = ring_buffer_read_page(buffer, rpage, len, cpu, 0); 6523 * if (ret >= 0) 6524 * process_page(ring_buffer_read_page_data(rpage), ret); 6525 * ring_buffer_free_read_page(buffer, cpu, rpage); 6526 * 6527 * When @full is set, the function will not return true unless 6528 * the writer is off the reader page. 6529 * 6530 * Note: it is up to the calling functions to handle sleeps and wakeups. 6531 * The ring buffer can be used anywhere in the kernel and can not 6532 * blindly call wake_up. The layer that uses the ring buffer must be 6533 * responsible for that. 6534 * 6535 * Returns: 6536 * >=0 if data has been transferred, returns the offset of consumed data. 6537 * <0 if no data has been transferred. 6538 */ 6539 int ring_buffer_read_page(struct trace_buffer *buffer, 6540 struct buffer_data_read_page *data_page, 6541 size_t len, int cpu, int full) 6542 { 6543 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6544 struct ring_buffer_event *event; 6545 struct buffer_data_page *bpage; 6546 struct buffer_page *reader; 6547 unsigned long missed_events; 6548 unsigned int commit; 6549 unsigned int read; 6550 u64 save_timestamp; 6551 6552 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6553 return -1; 6554 6555 /* 6556 * If len is not big enough to hold the page header, then 6557 * we can not copy anything. 6558 */ 6559 if (len <= BUF_PAGE_HDR_SIZE) 6560 return -1; 6561 6562 len -= BUF_PAGE_HDR_SIZE; 6563 6564 if (!data_page || !data_page->data) 6565 return -1; 6566 6567 if (data_page->order != buffer->subbuf_order) 6568 return -1; 6569 6570 bpage = data_page->data; 6571 if (!bpage) 6572 return -1; 6573 6574 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 6575 6576 reader = rb_get_reader_page(cpu_buffer); 6577 if (!reader) 6578 return -1; 6579 6580 event = rb_reader_event(cpu_buffer); 6581 6582 read = reader->read; 6583 commit = rb_page_size(reader); 6584 6585 /* Check if any events were dropped */ 6586 missed_events = cpu_buffer->lost_events; 6587 6588 /* 6589 * If this page has been partially read or 6590 * if len is not big enough to read the rest of the page or 6591 * a writer is still on the page, then 6592 * we must copy the data from the page to the buffer. 6593 * Otherwise, we can simply swap the page with the one passed in. 6594 */ 6595 if (read || (len < (commit - read)) || 6596 cpu_buffer->reader_page == cpu_buffer->commit_page || 6597 cpu_buffer->mapped) { 6598 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 6599 unsigned int rpos = read; 6600 unsigned int pos = 0; 6601 unsigned int size; 6602 6603 /* 6604 * If a full page is expected, this can still be returned 6605 * if there's been a previous partial read and the 6606 * rest of the page can be read and the commit page is off 6607 * the reader page. 6608 */ 6609 if (full && 6610 (!read || (len < (commit - read)) || 6611 cpu_buffer->reader_page == cpu_buffer->commit_page)) 6612 return -1; 6613 6614 if (len > (commit - read)) 6615 len = (commit - read); 6616 6617 /* Always keep the time extend and data together */ 6618 size = rb_event_ts_length(event); 6619 6620 if (len < size) 6621 return -1; 6622 6623 /* save the current timestamp, since the user will need it */ 6624 save_timestamp = cpu_buffer->read_stamp; 6625 6626 /* Need to copy one event at a time */ 6627 do { 6628 /* We need the size of one event, because 6629 * rb_advance_reader only advances by one event, 6630 * whereas rb_event_ts_length may include the size of 6631 * one or two events. 6632 * We have already ensured there's enough space if this 6633 * is a time extend. */ 6634 size = rb_event_length(event); 6635 memcpy(bpage->data + pos, rpage->data + rpos, size); 6636 6637 len -= size; 6638 6639 rb_advance_reader(cpu_buffer); 6640 rpos = reader->read; 6641 pos += size; 6642 6643 if (rpos >= commit) 6644 break; 6645 6646 event = rb_reader_event(cpu_buffer); 6647 /* Always keep the time extend and data together */ 6648 size = rb_event_ts_length(event); 6649 } while (len >= size); 6650 6651 /* update bpage */ 6652 local_set(&bpage->commit, pos); 6653 bpage->time_stamp = save_timestamp; 6654 6655 /* we copied everything to the beginning */ 6656 read = 0; 6657 } else { 6658 /* update the entry counter */ 6659 cpu_buffer->read += rb_page_entries(reader); 6660 cpu_buffer->read_bytes += rb_page_size(reader); 6661 6662 /* swap the pages */ 6663 rb_init_page(bpage); 6664 bpage = reader->page; 6665 reader->page = data_page->data; 6666 local_set(&reader->write, 0); 6667 local_set(&reader->entries, 0); 6668 reader->read = 0; 6669 data_page->data = bpage; 6670 6671 /* 6672 * Use the real_end for the data size, 6673 * This gives us a chance to store the lost events 6674 * on the page. 6675 */ 6676 if (reader->real_end) 6677 local_set(&bpage->commit, reader->real_end); 6678 } 6679 6680 cpu_buffer->lost_events = 0; 6681 6682 commit = local_read(&bpage->commit); 6683 /* 6684 * Set a flag in the commit field if we lost events 6685 */ 6686 if (missed_events) { 6687 /* If there is room at the end of the page to save the 6688 * missed events, then record it there. 6689 */ 6690 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 6691 memcpy(&bpage->data[commit], &missed_events, 6692 sizeof(missed_events)); 6693 local_add(RB_MISSED_STORED, &bpage->commit); 6694 commit += sizeof(missed_events); 6695 } 6696 local_add(RB_MISSED_EVENTS, &bpage->commit); 6697 } 6698 6699 /* 6700 * This page may be off to user land. Zero it out here. 6701 */ 6702 if (commit < buffer->subbuf_size) 6703 memset(&bpage->data[commit], 0, buffer->subbuf_size - commit); 6704 6705 return read; 6706 } 6707 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 6708 6709 /** 6710 * ring_buffer_read_page_data - get pointer to the data in the page. 6711 * @page: the page to get the data from 6712 * 6713 * Returns pointer to the actual data in this page. 6714 */ 6715 void *ring_buffer_read_page_data(struct buffer_data_read_page *page) 6716 { 6717 return page->data; 6718 } 6719 EXPORT_SYMBOL_GPL(ring_buffer_read_page_data); 6720 6721 /** 6722 * ring_buffer_subbuf_size_get - get size of the sub buffer. 6723 * @buffer: the buffer to get the sub buffer size from 6724 * 6725 * Returns size of the sub buffer, in bytes. 6726 */ 6727 int ring_buffer_subbuf_size_get(struct trace_buffer *buffer) 6728 { 6729 return buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 6730 } 6731 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_size_get); 6732 6733 /** 6734 * ring_buffer_subbuf_order_get - get order of system sub pages in one buffer page. 6735 * @buffer: The ring_buffer to get the system sub page order from 6736 * 6737 * By default, one ring buffer sub page equals to one system page. This parameter 6738 * is configurable, per ring buffer. The size of the ring buffer sub page can be 6739 * extended, but must be an order of system page size. 6740 * 6741 * Returns the order of buffer sub page size, in system pages: 6742 * 0 means the sub buffer size is 1 system page and so forth. 6743 * In case of an error < 0 is returned. 6744 */ 6745 int ring_buffer_subbuf_order_get(struct trace_buffer *buffer) 6746 { 6747 if (!buffer) 6748 return -EINVAL; 6749 6750 return buffer->subbuf_order; 6751 } 6752 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_get); 6753 6754 /** 6755 * ring_buffer_subbuf_order_set - set the size of ring buffer sub page. 6756 * @buffer: The ring_buffer to set the new page size. 6757 * @order: Order of the system pages in one sub buffer page 6758 * 6759 * By default, one ring buffer pages equals to one system page. This API can be 6760 * used to set new size of the ring buffer page. The size must be order of 6761 * system page size, that's why the input parameter @order is the order of 6762 * system pages that are allocated for one ring buffer page: 6763 * 0 - 1 system page 6764 * 1 - 2 system pages 6765 * 3 - 4 system pages 6766 * ... 6767 * 6768 * Returns 0 on success or < 0 in case of an error. 6769 */ 6770 int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order) 6771 { 6772 struct ring_buffer_per_cpu *cpu_buffer; 6773 struct buffer_page *bpage, *tmp; 6774 int old_order, old_size; 6775 int nr_pages; 6776 int psize; 6777 int err; 6778 int cpu; 6779 6780 if (!buffer || order < 0) 6781 return -EINVAL; 6782 6783 if (buffer->subbuf_order == order) 6784 return 0; 6785 6786 psize = (1 << order) * PAGE_SIZE; 6787 if (psize <= BUF_PAGE_HDR_SIZE) 6788 return -EINVAL; 6789 6790 /* Size of a subbuf cannot be greater than the write counter */ 6791 if (psize > RB_WRITE_MASK + 1) 6792 return -EINVAL; 6793 6794 old_order = buffer->subbuf_order; 6795 old_size = buffer->subbuf_size; 6796 6797 /* prevent another thread from changing buffer sizes */ 6798 mutex_lock(&buffer->mutex); 6799 atomic_inc(&buffer->record_disabled); 6800 6801 /* Make sure all commits have finished */ 6802 synchronize_rcu(); 6803 6804 buffer->subbuf_order = order; 6805 buffer->subbuf_size = psize - BUF_PAGE_HDR_SIZE; 6806 6807 /* Make sure all new buffers are allocated, before deleting the old ones */ 6808 for_each_buffer_cpu(buffer, cpu) { 6809 6810 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6811 continue; 6812 6813 cpu_buffer = buffer->buffers[cpu]; 6814 6815 if (cpu_buffer->mapped) { 6816 err = -EBUSY; 6817 goto error; 6818 } 6819 6820 /* Update the number of pages to match the new size */ 6821 nr_pages = old_size * buffer->buffers[cpu]->nr_pages; 6822 nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size); 6823 6824 /* we need a minimum of two pages */ 6825 if (nr_pages < 2) 6826 nr_pages = 2; 6827 6828 cpu_buffer->nr_pages_to_update = nr_pages; 6829 6830 /* Include the reader page */ 6831 nr_pages++; 6832 6833 /* Allocate the new size buffer */ 6834 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6835 if (__rb_allocate_pages(cpu_buffer, nr_pages, 6836 &cpu_buffer->new_pages)) { 6837 /* not enough memory for new pages */ 6838 err = -ENOMEM; 6839 goto error; 6840 } 6841 } 6842 6843 for_each_buffer_cpu(buffer, cpu) { 6844 struct buffer_data_page *old_free_data_page; 6845 struct list_head old_pages; 6846 unsigned long flags; 6847 6848 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6849 continue; 6850 6851 cpu_buffer = buffer->buffers[cpu]; 6852 6853 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6854 6855 /* Clear the head bit to make the link list normal to read */ 6856 rb_head_page_deactivate(cpu_buffer); 6857 6858 /* 6859 * Collect buffers from the cpu_buffer pages list and the 6860 * reader_page on old_pages, so they can be freed later when not 6861 * under a spinlock. The pages list is a linked list with no 6862 * head, adding old_pages turns it into a regular list with 6863 * old_pages being the head. 6864 */ 6865 list_add(&old_pages, cpu_buffer->pages); 6866 list_add(&cpu_buffer->reader_page->list, &old_pages); 6867 6868 /* One page was allocated for the reader page */ 6869 cpu_buffer->reader_page = list_entry(cpu_buffer->new_pages.next, 6870 struct buffer_page, list); 6871 list_del_init(&cpu_buffer->reader_page->list); 6872 6873 /* Install the new pages, remove the head from the list */ 6874 cpu_buffer->pages = cpu_buffer->new_pages.next; 6875 list_del_init(&cpu_buffer->new_pages); 6876 cpu_buffer->cnt++; 6877 6878 cpu_buffer->head_page 6879 = list_entry(cpu_buffer->pages, struct buffer_page, list); 6880 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 6881 6882 cpu_buffer->nr_pages = cpu_buffer->nr_pages_to_update; 6883 cpu_buffer->nr_pages_to_update = 0; 6884 6885 old_free_data_page = cpu_buffer->free_page; 6886 cpu_buffer->free_page = NULL; 6887 6888 rb_head_page_activate(cpu_buffer); 6889 6890 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6891 6892 /* Free old sub buffers */ 6893 list_for_each_entry_safe(bpage, tmp, &old_pages, list) { 6894 list_del_init(&bpage->list); 6895 free_buffer_page(bpage); 6896 } 6897 free_pages((unsigned long)old_free_data_page, old_order); 6898 6899 rb_check_pages(cpu_buffer); 6900 } 6901 6902 atomic_dec(&buffer->record_disabled); 6903 mutex_unlock(&buffer->mutex); 6904 6905 return 0; 6906 6907 error: 6908 buffer->subbuf_order = old_order; 6909 buffer->subbuf_size = old_size; 6910 6911 atomic_dec(&buffer->record_disabled); 6912 mutex_unlock(&buffer->mutex); 6913 6914 for_each_buffer_cpu(buffer, cpu) { 6915 cpu_buffer = buffer->buffers[cpu]; 6916 6917 if (!cpu_buffer->nr_pages_to_update) 6918 continue; 6919 6920 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, list) { 6921 list_del_init(&bpage->list); 6922 free_buffer_page(bpage); 6923 } 6924 } 6925 6926 return err; 6927 } 6928 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set); 6929 6930 static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6931 { 6932 struct page *page; 6933 6934 if (cpu_buffer->meta_page) 6935 return 0; 6936 6937 page = alloc_page(GFP_USER | __GFP_ZERO); 6938 if (!page) 6939 return -ENOMEM; 6940 6941 cpu_buffer->meta_page = page_to_virt(page); 6942 6943 return 0; 6944 } 6945 6946 static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6947 { 6948 unsigned long addr = (unsigned long)cpu_buffer->meta_page; 6949 6950 free_page(addr); 6951 cpu_buffer->meta_page = NULL; 6952 } 6953 6954 static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer, 6955 unsigned long *subbuf_ids) 6956 { 6957 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 6958 unsigned int nr_subbufs = cpu_buffer->nr_pages + 1; 6959 struct buffer_page *first_subbuf, *subbuf; 6960 int cnt = 0; 6961 int id = 0; 6962 6963 id = rb_page_id(cpu_buffer, cpu_buffer->reader_page, id); 6964 subbuf_ids[id++] = (unsigned long)cpu_buffer->reader_page->page; 6965 cnt++; 6966 6967 first_subbuf = subbuf = rb_set_head_page(cpu_buffer); 6968 do { 6969 id = rb_page_id(cpu_buffer, subbuf, id); 6970 6971 if (WARN_ON(id >= nr_subbufs)) 6972 break; 6973 6974 subbuf_ids[id] = (unsigned long)subbuf->page; 6975 6976 rb_inc_page(&subbuf); 6977 id++; 6978 cnt++; 6979 } while (subbuf != first_subbuf); 6980 6981 WARN_ON(cnt != nr_subbufs); 6982 6983 /* install subbuf ID to kern VA translation */ 6984 cpu_buffer->subbuf_ids = subbuf_ids; 6985 6986 meta->meta_struct_len = sizeof(*meta); 6987 meta->nr_subbufs = nr_subbufs; 6988 meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 6989 meta->meta_page_size = meta->subbuf_size; 6990 6991 rb_update_meta_page(cpu_buffer); 6992 } 6993 6994 static struct ring_buffer_per_cpu * 6995 rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu) 6996 { 6997 struct ring_buffer_per_cpu *cpu_buffer; 6998 6999 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7000 return ERR_PTR(-EINVAL); 7001 7002 cpu_buffer = buffer->buffers[cpu]; 7003 7004 mutex_lock(&cpu_buffer->mapping_lock); 7005 7006 if (!cpu_buffer->user_mapped) { 7007 mutex_unlock(&cpu_buffer->mapping_lock); 7008 return ERR_PTR(-ENODEV); 7009 } 7010 7011 return cpu_buffer; 7012 } 7013 7014 static void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer) 7015 { 7016 mutex_unlock(&cpu_buffer->mapping_lock); 7017 } 7018 7019 /* 7020 * Fast-path for rb_buffer_(un)map(). Called whenever the meta-page doesn't need 7021 * to be set-up or torn-down. 7022 */ 7023 static int __rb_inc_dec_mapped(struct ring_buffer_per_cpu *cpu_buffer, 7024 bool inc) 7025 { 7026 unsigned long flags; 7027 7028 lockdep_assert_held(&cpu_buffer->mapping_lock); 7029 7030 /* mapped is always greater or equal to user_mapped */ 7031 if (WARN_ON(cpu_buffer->mapped < cpu_buffer->user_mapped)) 7032 return -EINVAL; 7033 7034 if (inc && cpu_buffer->mapped == UINT_MAX) 7035 return -EBUSY; 7036 7037 if (WARN_ON(!inc && cpu_buffer->user_mapped == 0)) 7038 return -EINVAL; 7039 7040 mutex_lock(&cpu_buffer->buffer->mutex); 7041 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7042 7043 if (inc) { 7044 cpu_buffer->user_mapped++; 7045 cpu_buffer->mapped++; 7046 } else { 7047 cpu_buffer->user_mapped--; 7048 cpu_buffer->mapped--; 7049 } 7050 7051 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7052 mutex_unlock(&cpu_buffer->buffer->mutex); 7053 7054 return 0; 7055 } 7056 7057 /* 7058 * +--------------+ pgoff == 0 7059 * | meta page | 7060 * +--------------+ pgoff == 1 7061 * | subbuffer 0 | 7062 * | | 7063 * +--------------+ pgoff == (1 + (1 << subbuf_order)) 7064 * | subbuffer 1 | 7065 * | | 7066 * ... 7067 */ 7068 #ifdef CONFIG_MMU 7069 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 7070 struct vm_area_struct *vma) 7071 { 7072 unsigned long nr_subbufs, nr_pages, nr_vma_pages, pgoff = vma->vm_pgoff; 7073 unsigned int subbuf_pages, subbuf_order; 7074 struct page **pages __free(kfree) = NULL; 7075 int p = 0, s = 0; 7076 int err; 7077 7078 /* Refuse MP_PRIVATE or writable mappings */ 7079 if (vma->vm_flags & VM_WRITE || vma->vm_flags & VM_EXEC || 7080 !(vma->vm_flags & VM_MAYSHARE)) 7081 return -EPERM; 7082 7083 subbuf_order = cpu_buffer->buffer->subbuf_order; 7084 subbuf_pages = 1 << subbuf_order; 7085 7086 if (subbuf_order && pgoff % subbuf_pages) 7087 return -EINVAL; 7088 7089 /* 7090 * Make sure the mapping cannot become writable later. Also tell the VM 7091 * to not touch these pages (VM_DONTCOPY | VM_DONTEXPAND). 7092 */ 7093 vm_flags_mod(vma, VM_DONTCOPY | VM_DONTEXPAND | VM_DONTDUMP, 7094 VM_MAYWRITE); 7095 7096 lockdep_assert_held(&cpu_buffer->mapping_lock); 7097 7098 nr_subbufs = cpu_buffer->nr_pages + 1; /* + reader-subbuf */ 7099 nr_pages = ((nr_subbufs + 1) << subbuf_order); /* + meta-page */ 7100 if (nr_pages <= pgoff) 7101 return -EINVAL; 7102 7103 nr_pages -= pgoff; 7104 7105 nr_vma_pages = vma_pages(vma); 7106 if (!nr_vma_pages || nr_vma_pages > nr_pages) 7107 return -EINVAL; 7108 7109 nr_pages = nr_vma_pages; 7110 7111 pages = kcalloc(nr_pages, sizeof(*pages), GFP_KERNEL); 7112 if (!pages) 7113 return -ENOMEM; 7114 7115 if (!pgoff) { 7116 unsigned long meta_page_padding; 7117 7118 pages[p++] = virt_to_page(cpu_buffer->meta_page); 7119 7120 /* 7121 * Pad with the zero-page to align the meta-page with the 7122 * sub-buffers. 7123 */ 7124 meta_page_padding = subbuf_pages - 1; 7125 while (meta_page_padding-- && p < nr_pages) { 7126 unsigned long __maybe_unused zero_addr = 7127 vma->vm_start + (PAGE_SIZE * p); 7128 7129 pages[p++] = ZERO_PAGE(zero_addr); 7130 } 7131 } else { 7132 /* Skip the meta-page */ 7133 pgoff -= subbuf_pages; 7134 7135 s += pgoff / subbuf_pages; 7136 } 7137 7138 while (p < nr_pages) { 7139 struct page *page; 7140 int off = 0; 7141 7142 if (WARN_ON_ONCE(s >= nr_subbufs)) 7143 return -EINVAL; 7144 7145 page = virt_to_page((void *)cpu_buffer->subbuf_ids[s]); 7146 7147 for (; off < (1 << (subbuf_order)); off++, page++) { 7148 if (p >= nr_pages) 7149 break; 7150 7151 pages[p++] = page; 7152 } 7153 s++; 7154 } 7155 7156 err = vm_insert_pages(vma, vma->vm_start, pages, &nr_pages); 7157 7158 return err; 7159 } 7160 #else 7161 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 7162 struct vm_area_struct *vma) 7163 { 7164 return -EOPNOTSUPP; 7165 } 7166 #endif 7167 7168 int ring_buffer_map(struct trace_buffer *buffer, int cpu, 7169 struct vm_area_struct *vma) 7170 { 7171 struct ring_buffer_per_cpu *cpu_buffer; 7172 unsigned long flags, *subbuf_ids; 7173 int err; 7174 7175 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7176 return -EINVAL; 7177 7178 cpu_buffer = buffer->buffers[cpu]; 7179 7180 guard(mutex)(&cpu_buffer->mapping_lock); 7181 7182 if (cpu_buffer->user_mapped) { 7183 err = __rb_map_vma(cpu_buffer, vma); 7184 if (!err) 7185 err = __rb_inc_dec_mapped(cpu_buffer, true); 7186 return err; 7187 } 7188 7189 /* prevent another thread from changing buffer/sub-buffer sizes */ 7190 guard(mutex)(&buffer->mutex); 7191 7192 err = rb_alloc_meta_page(cpu_buffer); 7193 if (err) 7194 return err; 7195 7196 /* subbuf_ids include the reader while nr_pages does not */ 7197 subbuf_ids = kcalloc(cpu_buffer->nr_pages + 1, sizeof(*subbuf_ids), GFP_KERNEL); 7198 if (!subbuf_ids) { 7199 rb_free_meta_page(cpu_buffer); 7200 return -ENOMEM; 7201 } 7202 7203 atomic_inc(&cpu_buffer->resize_disabled); 7204 7205 /* 7206 * Lock all readers to block any subbuf swap until the subbuf IDs are 7207 * assigned. 7208 */ 7209 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7210 rb_setup_ids_meta_page(cpu_buffer, subbuf_ids); 7211 7212 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7213 7214 err = __rb_map_vma(cpu_buffer, vma); 7215 if (!err) { 7216 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7217 /* This is the first time it is mapped by user */ 7218 cpu_buffer->mapped++; 7219 cpu_buffer->user_mapped = 1; 7220 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7221 } else { 7222 kfree(cpu_buffer->subbuf_ids); 7223 cpu_buffer->subbuf_ids = NULL; 7224 rb_free_meta_page(cpu_buffer); 7225 atomic_dec(&cpu_buffer->resize_disabled); 7226 } 7227 7228 return 0; 7229 } 7230 7231 int ring_buffer_unmap(struct trace_buffer *buffer, int cpu) 7232 { 7233 struct ring_buffer_per_cpu *cpu_buffer; 7234 unsigned long flags; 7235 7236 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7237 return -EINVAL; 7238 7239 cpu_buffer = buffer->buffers[cpu]; 7240 7241 guard(mutex)(&cpu_buffer->mapping_lock); 7242 7243 if (!cpu_buffer->user_mapped) { 7244 return -ENODEV; 7245 } else if (cpu_buffer->user_mapped > 1) { 7246 __rb_inc_dec_mapped(cpu_buffer, false); 7247 return 0; 7248 } 7249 7250 guard(mutex)(&buffer->mutex); 7251 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7252 7253 /* This is the last user space mapping */ 7254 if (!WARN_ON_ONCE(cpu_buffer->mapped < cpu_buffer->user_mapped)) 7255 cpu_buffer->mapped--; 7256 cpu_buffer->user_mapped = 0; 7257 7258 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7259 7260 kfree(cpu_buffer->subbuf_ids); 7261 cpu_buffer->subbuf_ids = NULL; 7262 rb_free_meta_page(cpu_buffer); 7263 atomic_dec(&cpu_buffer->resize_disabled); 7264 7265 return 0; 7266 } 7267 7268 int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu) 7269 { 7270 struct ring_buffer_per_cpu *cpu_buffer; 7271 struct buffer_page *reader; 7272 unsigned long missed_events; 7273 unsigned long reader_size; 7274 unsigned long flags; 7275 7276 cpu_buffer = rb_get_mapped_buffer(buffer, cpu); 7277 if (IS_ERR(cpu_buffer)) 7278 return (int)PTR_ERR(cpu_buffer); 7279 7280 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7281 7282 consume: 7283 if (rb_per_cpu_empty(cpu_buffer)) 7284 goto out; 7285 7286 reader_size = rb_page_size(cpu_buffer->reader_page); 7287 7288 /* 7289 * There are data to be read on the current reader page, we can 7290 * return to the caller. But before that, we assume the latter will read 7291 * everything. Let's update the kernel reader accordingly. 7292 */ 7293 if (cpu_buffer->reader_page->read < reader_size) { 7294 while (cpu_buffer->reader_page->read < reader_size) 7295 rb_advance_reader(cpu_buffer); 7296 goto out; 7297 } 7298 7299 reader = rb_get_reader_page(cpu_buffer); 7300 if (WARN_ON(!reader)) 7301 goto out; 7302 7303 /* Check if any events were dropped */ 7304 missed_events = cpu_buffer->lost_events; 7305 7306 if (missed_events) { 7307 if (cpu_buffer->reader_page != cpu_buffer->commit_page) { 7308 struct buffer_data_page *bpage = reader->page; 7309 unsigned int commit; 7310 /* 7311 * Use the real_end for the data size, 7312 * This gives us a chance to store the lost events 7313 * on the page. 7314 */ 7315 if (reader->real_end) 7316 local_set(&bpage->commit, reader->real_end); 7317 /* 7318 * If there is room at the end of the page to save the 7319 * missed events, then record it there. 7320 */ 7321 commit = rb_page_size(reader); 7322 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 7323 memcpy(&bpage->data[commit], &missed_events, 7324 sizeof(missed_events)); 7325 local_add(RB_MISSED_STORED, &bpage->commit); 7326 } 7327 local_add(RB_MISSED_EVENTS, &bpage->commit); 7328 } else if (!WARN_ONCE(cpu_buffer->reader_page == cpu_buffer->tail_page, 7329 "Reader on commit with %ld missed events", 7330 missed_events)) { 7331 /* 7332 * There shouldn't be any missed events if the tail_page 7333 * is on the reader page. But if the tail page is not on the 7334 * reader page and the commit_page is, that would mean that 7335 * there's a commit_overrun (an interrupt preempted an 7336 * addition of an event and then filled the buffer 7337 * with new events). In this case it's not an 7338 * error, but it should still be reported. 7339 * 7340 * TODO: Add missed events to the page for user space to know. 7341 */ 7342 pr_info("Ring buffer [%d] commit overrun lost %ld events at timestamp:%lld\n", 7343 cpu, missed_events, cpu_buffer->reader_page->page->time_stamp); 7344 } 7345 } 7346 7347 cpu_buffer->lost_events = 0; 7348 7349 goto consume; 7350 7351 out: 7352 /* Some archs do not have data cache coherency between kernel and user-space */ 7353 flush_kernel_vmap_range(cpu_buffer->reader_page->page, 7354 buffer->subbuf_size + BUF_PAGE_HDR_SIZE); 7355 7356 rb_update_meta_page(cpu_buffer); 7357 7358 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7359 rb_put_mapped_buffer(cpu_buffer); 7360 7361 return 0; 7362 } 7363 7364 /* 7365 * We only allocate new buffers, never free them if the CPU goes down. 7366 * If we were to free the buffer, then the user would lose any trace that was in 7367 * the buffer. 7368 */ 7369 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 7370 { 7371 struct trace_buffer *buffer; 7372 long nr_pages_same; 7373 int cpu_i; 7374 unsigned long nr_pages; 7375 7376 buffer = container_of(node, struct trace_buffer, node); 7377 if (cpumask_test_cpu(cpu, buffer->cpumask)) 7378 return 0; 7379 7380 nr_pages = 0; 7381 nr_pages_same = 1; 7382 /* check if all cpu sizes are same */ 7383 for_each_buffer_cpu(buffer, cpu_i) { 7384 /* fill in the size from first enabled cpu */ 7385 if (nr_pages == 0) 7386 nr_pages = buffer->buffers[cpu_i]->nr_pages; 7387 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 7388 nr_pages_same = 0; 7389 break; 7390 } 7391 } 7392 /* allocate minimum pages, user can later expand it */ 7393 if (!nr_pages_same) 7394 nr_pages = 2; 7395 buffer->buffers[cpu] = 7396 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 7397 if (!buffer->buffers[cpu]) { 7398 WARN(1, "failed to allocate ring buffer on CPU %u\n", 7399 cpu); 7400 return -ENOMEM; 7401 } 7402 smp_wmb(); 7403 cpumask_set_cpu(cpu, buffer->cpumask); 7404 return 0; 7405 } 7406 7407 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 7408 /* 7409 * This is a basic integrity check of the ring buffer. 7410 * Late in the boot cycle this test will run when configured in. 7411 * It will kick off a thread per CPU that will go into a loop 7412 * writing to the per cpu ring buffer various sizes of data. 7413 * Some of the data will be large items, some small. 7414 * 7415 * Another thread is created that goes into a spin, sending out 7416 * IPIs to the other CPUs to also write into the ring buffer. 7417 * this is to test the nesting ability of the buffer. 7418 * 7419 * Basic stats are recorded and reported. If something in the 7420 * ring buffer should happen that's not expected, a big warning 7421 * is displayed and all ring buffers are disabled. 7422 */ 7423 static struct task_struct *rb_threads[NR_CPUS] __initdata; 7424 7425 struct rb_test_data { 7426 struct trace_buffer *buffer; 7427 unsigned long events; 7428 unsigned long bytes_written; 7429 unsigned long bytes_alloc; 7430 unsigned long bytes_dropped; 7431 unsigned long events_nested; 7432 unsigned long bytes_written_nested; 7433 unsigned long bytes_alloc_nested; 7434 unsigned long bytes_dropped_nested; 7435 int min_size_nested; 7436 int max_size_nested; 7437 int max_size; 7438 int min_size; 7439 int cpu; 7440 int cnt; 7441 }; 7442 7443 static struct rb_test_data rb_data[NR_CPUS] __initdata; 7444 7445 /* 1 meg per cpu */ 7446 #define RB_TEST_BUFFER_SIZE 1048576 7447 7448 static char rb_string[] __initdata = 7449 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 7450 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 7451 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 7452 7453 static bool rb_test_started __initdata; 7454 7455 struct rb_item { 7456 int size; 7457 char str[]; 7458 }; 7459 7460 static __init int rb_write_something(struct rb_test_data *data, bool nested) 7461 { 7462 struct ring_buffer_event *event; 7463 struct rb_item *item; 7464 bool started; 7465 int event_len; 7466 int size; 7467 int len; 7468 int cnt; 7469 7470 /* Have nested writes different that what is written */ 7471 cnt = data->cnt + (nested ? 27 : 0); 7472 7473 /* Multiply cnt by ~e, to make some unique increment */ 7474 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 7475 7476 len = size + sizeof(struct rb_item); 7477 7478 started = rb_test_started; 7479 /* read rb_test_started before checking buffer enabled */ 7480 smp_rmb(); 7481 7482 event = ring_buffer_lock_reserve(data->buffer, len); 7483 if (!event) { 7484 /* Ignore dropped events before test starts. */ 7485 if (started) { 7486 if (nested) 7487 data->bytes_dropped_nested += len; 7488 else 7489 data->bytes_dropped += len; 7490 } 7491 return len; 7492 } 7493 7494 event_len = ring_buffer_event_length(event); 7495 7496 if (RB_WARN_ON(data->buffer, event_len < len)) 7497 goto out; 7498 7499 item = ring_buffer_event_data(event); 7500 item->size = size; 7501 memcpy(item->str, rb_string, size); 7502 7503 if (nested) { 7504 data->bytes_alloc_nested += event_len; 7505 data->bytes_written_nested += len; 7506 data->events_nested++; 7507 if (!data->min_size_nested || len < data->min_size_nested) 7508 data->min_size_nested = len; 7509 if (len > data->max_size_nested) 7510 data->max_size_nested = len; 7511 } else { 7512 data->bytes_alloc += event_len; 7513 data->bytes_written += len; 7514 data->events++; 7515 if (!data->min_size || len < data->min_size) 7516 data->max_size = len; 7517 if (len > data->max_size) 7518 data->max_size = len; 7519 } 7520 7521 out: 7522 ring_buffer_unlock_commit(data->buffer); 7523 7524 return 0; 7525 } 7526 7527 static __init int rb_test(void *arg) 7528 { 7529 struct rb_test_data *data = arg; 7530 7531 while (!kthread_should_stop()) { 7532 rb_write_something(data, false); 7533 data->cnt++; 7534 7535 set_current_state(TASK_INTERRUPTIBLE); 7536 /* Now sleep between a min of 100-300us and a max of 1ms */ 7537 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 7538 } 7539 7540 return 0; 7541 } 7542 7543 static __init void rb_ipi(void *ignore) 7544 { 7545 struct rb_test_data *data; 7546 int cpu = smp_processor_id(); 7547 7548 data = &rb_data[cpu]; 7549 rb_write_something(data, true); 7550 } 7551 7552 static __init int rb_hammer_test(void *arg) 7553 { 7554 while (!kthread_should_stop()) { 7555 7556 /* Send an IPI to all cpus to write data! */ 7557 smp_call_function(rb_ipi, NULL, 1); 7558 /* No sleep, but for non preempt, let others run */ 7559 schedule(); 7560 } 7561 7562 return 0; 7563 } 7564 7565 static __init int test_ringbuffer(void) 7566 { 7567 struct task_struct *rb_hammer; 7568 struct trace_buffer *buffer; 7569 int cpu; 7570 int ret = 0; 7571 7572 if (security_locked_down(LOCKDOWN_TRACEFS)) { 7573 pr_warn("Lockdown is enabled, skipping ring buffer tests\n"); 7574 return 0; 7575 } 7576 7577 pr_info("Running ring buffer tests...\n"); 7578 7579 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 7580 if (WARN_ON(!buffer)) 7581 return 0; 7582 7583 /* Disable buffer so that threads can't write to it yet */ 7584 ring_buffer_record_off(buffer); 7585 7586 for_each_online_cpu(cpu) { 7587 rb_data[cpu].buffer = buffer; 7588 rb_data[cpu].cpu = cpu; 7589 rb_data[cpu].cnt = cpu; 7590 rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu], 7591 cpu, "rbtester/%u"); 7592 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 7593 pr_cont("FAILED\n"); 7594 ret = PTR_ERR(rb_threads[cpu]); 7595 goto out_free; 7596 } 7597 } 7598 7599 /* Now create the rb hammer! */ 7600 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 7601 if (WARN_ON(IS_ERR(rb_hammer))) { 7602 pr_cont("FAILED\n"); 7603 ret = PTR_ERR(rb_hammer); 7604 goto out_free; 7605 } 7606 7607 ring_buffer_record_on(buffer); 7608 /* 7609 * Show buffer is enabled before setting rb_test_started. 7610 * Yes there's a small race window where events could be 7611 * dropped and the thread wont catch it. But when a ring 7612 * buffer gets enabled, there will always be some kind of 7613 * delay before other CPUs see it. Thus, we don't care about 7614 * those dropped events. We care about events dropped after 7615 * the threads see that the buffer is active. 7616 */ 7617 smp_wmb(); 7618 rb_test_started = true; 7619 7620 set_current_state(TASK_INTERRUPTIBLE); 7621 /* Just run for 10 seconds */; 7622 schedule_timeout(10 * HZ); 7623 7624 kthread_stop(rb_hammer); 7625 7626 out_free: 7627 for_each_online_cpu(cpu) { 7628 if (!rb_threads[cpu]) 7629 break; 7630 kthread_stop(rb_threads[cpu]); 7631 } 7632 if (ret) { 7633 ring_buffer_free(buffer); 7634 return ret; 7635 } 7636 7637 /* Report! */ 7638 pr_info("finished\n"); 7639 for_each_online_cpu(cpu) { 7640 struct ring_buffer_event *event; 7641 struct rb_test_data *data = &rb_data[cpu]; 7642 struct rb_item *item; 7643 unsigned long total_events; 7644 unsigned long total_dropped; 7645 unsigned long total_written; 7646 unsigned long total_alloc; 7647 unsigned long total_read = 0; 7648 unsigned long total_size = 0; 7649 unsigned long total_len = 0; 7650 unsigned long total_lost = 0; 7651 unsigned long lost; 7652 int big_event_size; 7653 int small_event_size; 7654 7655 ret = -1; 7656 7657 total_events = data->events + data->events_nested; 7658 total_written = data->bytes_written + data->bytes_written_nested; 7659 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 7660 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 7661 7662 big_event_size = data->max_size + data->max_size_nested; 7663 small_event_size = data->min_size + data->min_size_nested; 7664 7665 pr_info("CPU %d:\n", cpu); 7666 pr_info(" events: %ld\n", total_events); 7667 pr_info(" dropped bytes: %ld\n", total_dropped); 7668 pr_info(" alloced bytes: %ld\n", total_alloc); 7669 pr_info(" written bytes: %ld\n", total_written); 7670 pr_info(" biggest event: %d\n", big_event_size); 7671 pr_info(" smallest event: %d\n", small_event_size); 7672 7673 if (RB_WARN_ON(buffer, total_dropped)) 7674 break; 7675 7676 ret = 0; 7677 7678 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 7679 total_lost += lost; 7680 item = ring_buffer_event_data(event); 7681 total_len += ring_buffer_event_length(event); 7682 total_size += item->size + sizeof(struct rb_item); 7683 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 7684 pr_info("FAILED!\n"); 7685 pr_info("buffer had: %.*s\n", item->size, item->str); 7686 pr_info("expected: %.*s\n", item->size, rb_string); 7687 RB_WARN_ON(buffer, 1); 7688 ret = -1; 7689 break; 7690 } 7691 total_read++; 7692 } 7693 if (ret) 7694 break; 7695 7696 ret = -1; 7697 7698 pr_info(" read events: %ld\n", total_read); 7699 pr_info(" lost events: %ld\n", total_lost); 7700 pr_info(" total events: %ld\n", total_lost + total_read); 7701 pr_info(" recorded len bytes: %ld\n", total_len); 7702 pr_info(" recorded size bytes: %ld\n", total_size); 7703 if (total_lost) { 7704 pr_info(" With dropped events, record len and size may not match\n" 7705 " alloced and written from above\n"); 7706 } else { 7707 if (RB_WARN_ON(buffer, total_len != total_alloc || 7708 total_size != total_written)) 7709 break; 7710 } 7711 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 7712 break; 7713 7714 ret = 0; 7715 } 7716 if (!ret) 7717 pr_info("Ring buffer PASSED!\n"); 7718 7719 ring_buffer_free(buffer); 7720 return 0; 7721 } 7722 7723 late_initcall(test_ringbuffer); 7724 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 7725