1 // SPDX-License-Identifier: GPL-2.0
2 /* Watch queue and general notification mechanism, built on pipes
3 *
4 * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
5 * Written by David Howells (dhowells@redhat.com)
6 *
7 * See Documentation/core-api/watch_queue.rst
8 */
9
10 #define pr_fmt(fmt) "watchq: " fmt
11 #include <linux/module.h>
12 #include <linux/init.h>
13 #include <linux/sched.h>
14 #include <linux/slab.h>
15 #include <linux/printk.h>
16 #include <linux/miscdevice.h>
17 #include <linux/fs.h>
18 #include <linux/mm.h>
19 #include <linux/pagemap.h>
20 #include <linux/poll.h>
21 #include <linux/uaccess.h>
22 #include <linux/vmalloc.h>
23 #include <linux/file.h>
24 #include <linux/security.h>
25 #include <linux/cred.h>
26 #include <linux/sched/signal.h>
27 #include <linux/watch_queue.h>
28 #include <linux/pipe_fs_i.h>
29
30 MODULE_DESCRIPTION("Watch queue");
31 MODULE_AUTHOR("Red Hat, Inc.");
32
33 #define WATCH_QUEUE_NOTE_SIZE 128
34 #define WATCH_QUEUE_NOTES_PER_PAGE (PAGE_SIZE / WATCH_QUEUE_NOTE_SIZE)
35
36 /*
37 * This must be called under the RCU read-lock, which makes
38 * sure that the wqueue still exists. It can then take the lock,
39 * and check that the wqueue hasn't been destroyed, which in
40 * turn makes sure that the notification pipe still exists.
41 */
lock_wqueue(struct watch_queue * wqueue)42 static inline bool lock_wqueue(struct watch_queue *wqueue)
43 {
44 spin_lock_bh(&wqueue->lock);
45 if (unlikely(!wqueue->pipe)) {
46 spin_unlock_bh(&wqueue->lock);
47 return false;
48 }
49 return true;
50 }
51
unlock_wqueue(struct watch_queue * wqueue)52 static inline void unlock_wqueue(struct watch_queue *wqueue)
53 {
54 spin_unlock_bh(&wqueue->lock);
55 }
56
watch_queue_pipe_buf_release(struct pipe_inode_info * pipe,struct pipe_buffer * buf)57 static void watch_queue_pipe_buf_release(struct pipe_inode_info *pipe,
58 struct pipe_buffer *buf)
59 {
60 struct watch_queue *wqueue = (struct watch_queue *)buf->private;
61 struct page *page;
62 unsigned int bit;
63
64 /* We need to work out which note within the page this refers to, but
65 * the note might have been maximum size, so merely ANDing the offset
66 * off doesn't work. OTOH, the note must've been more than zero size.
67 */
68 bit = buf->offset + buf->len;
69 if ((bit & (WATCH_QUEUE_NOTE_SIZE - 1)) == 0)
70 bit -= WATCH_QUEUE_NOTE_SIZE;
71 bit /= WATCH_QUEUE_NOTE_SIZE;
72
73 page = buf->page;
74 bit += page->private;
75
76 set_bit(bit, wqueue->notes_bitmap);
77 generic_pipe_buf_release(pipe, buf);
78 }
79
80 // No try_steal function => no stealing
81 #define watch_queue_pipe_buf_try_steal NULL
82
83 /* New data written to a pipe may be appended to a buffer with this type. */
84 static const struct pipe_buf_operations watch_queue_pipe_buf_ops = {
85 .release = watch_queue_pipe_buf_release,
86 .try_steal = watch_queue_pipe_buf_try_steal,
87 .get = generic_pipe_buf_get,
88 };
89
90 /*
91 * Post a notification to a watch queue.
92 *
93 * Must be called with the RCU lock for reading, and the
94 * watch_queue lock held, which guarantees that the pipe
95 * hasn't been released.
96 */
post_one_notification(struct watch_queue * wqueue,struct watch_notification * n)97 static bool post_one_notification(struct watch_queue *wqueue,
98 struct watch_notification *n)
99 {
100 void *p;
101 struct pipe_inode_info *pipe = wqueue->pipe;
102 struct pipe_buffer *buf;
103 struct page *page;
104 unsigned int head, tail, note, offset, len;
105 bool done = false;
106
107 spin_lock_irq(&pipe->rd_wait.lock);
108
109 head = pipe->head;
110 tail = pipe->tail;
111 if (pipe_full(head, tail, pipe->ring_size))
112 goto lost;
113
114 note = find_first_bit(wqueue->notes_bitmap, wqueue->nr_notes);
115 if (note >= wqueue->nr_notes)
116 goto lost;
117
118 page = wqueue->notes[note / WATCH_QUEUE_NOTES_PER_PAGE];
119 offset = note % WATCH_QUEUE_NOTES_PER_PAGE * WATCH_QUEUE_NOTE_SIZE;
120 get_page(page);
121 len = n->info & WATCH_INFO_LENGTH;
122 p = kmap_atomic(page);
123 memcpy(p + offset, n, len);
124 kunmap_atomic(p);
125
126 buf = pipe_buf(pipe, head);
127 buf->page = page;
128 buf->private = (unsigned long)wqueue;
129 buf->ops = &watch_queue_pipe_buf_ops;
130 buf->offset = offset;
131 buf->len = len;
132 buf->flags = PIPE_BUF_FLAG_WHOLE;
133 smp_store_release(&pipe->head, head + 1); /* vs pipe_read() */
134
135 if (!test_and_clear_bit(note, wqueue->notes_bitmap)) {
136 spin_unlock_irq(&pipe->rd_wait.lock);
137 BUG();
138 }
139 wake_up_interruptible_sync_poll_locked(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
140 done = true;
141
142 out:
143 spin_unlock_irq(&pipe->rd_wait.lock);
144 if (done)
145 kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
146 return done;
147
148 lost:
149 buf = pipe_buf(pipe, head - 1);
150 buf->flags |= PIPE_BUF_FLAG_LOSS;
151 goto out;
152 }
153
154 /*
155 * Apply filter rules to a notification.
156 */
filter_watch_notification(const struct watch_filter * wf,const struct watch_notification * n)157 static bool filter_watch_notification(const struct watch_filter *wf,
158 const struct watch_notification *n)
159 {
160 const struct watch_type_filter *wt;
161 unsigned int st_bits = sizeof(wt->subtype_filter[0]) * 8;
162 unsigned int st_index = n->subtype / st_bits;
163 unsigned int st_bit = 1U << (n->subtype % st_bits);
164 int i;
165
166 if (!test_bit(n->type, wf->type_filter))
167 return false;
168
169 for (i = 0; i < wf->nr_filters; i++) {
170 wt = &wf->filters[i];
171 if (n->type == wt->type &&
172 (wt->subtype_filter[st_index] & st_bit) &&
173 (n->info & wt->info_mask) == wt->info_filter)
174 return true;
175 }
176
177 return false; /* If there is a filter, the default is to reject. */
178 }
179
180 /**
181 * __post_watch_notification - Post an event notification
182 * @wlist: The watch list to post the event to.
183 * @n: The notification record to post.
184 * @cred: The creds of the process that triggered the notification.
185 * @id: The ID to match on the watch.
186 *
187 * Post a notification of an event into a set of watch queues and let the users
188 * know.
189 *
190 * The size of the notification should be set in n->info & WATCH_INFO_LENGTH and
191 * should be in units of sizeof(*n).
192 */
__post_watch_notification(struct watch_list * wlist,struct watch_notification * n,const struct cred * cred,u64 id)193 void __post_watch_notification(struct watch_list *wlist,
194 struct watch_notification *n,
195 const struct cred *cred,
196 u64 id)
197 {
198 const struct watch_filter *wf;
199 struct watch_queue *wqueue;
200 struct watch *watch;
201
202 if (((n->info & WATCH_INFO_LENGTH) >> WATCH_INFO_LENGTH__SHIFT) == 0) {
203 WARN_ON(1);
204 return;
205 }
206
207 rcu_read_lock();
208
209 hlist_for_each_entry_rcu(watch, &wlist->watchers, list_node) {
210 if (watch->id != id)
211 continue;
212 n->info &= ~WATCH_INFO_ID;
213 n->info |= watch->info_id;
214
215 wqueue = rcu_dereference(watch->queue);
216 wf = rcu_dereference(wqueue->filter);
217 if (wf && !filter_watch_notification(wf, n))
218 continue;
219
220 if (security_post_notification(watch->cred, cred, n) < 0)
221 continue;
222
223 if (lock_wqueue(wqueue)) {
224 post_one_notification(wqueue, n);
225 unlock_wqueue(wqueue);
226 }
227 }
228
229 rcu_read_unlock();
230 }
231 EXPORT_SYMBOL(__post_watch_notification);
232
233 /*
234 * Allocate sufficient pages to preallocation for the requested number of
235 * notifications.
236 */
watch_queue_set_size(struct pipe_inode_info * pipe,unsigned int nr_notes)237 long watch_queue_set_size(struct pipe_inode_info *pipe, unsigned int nr_notes)
238 {
239 struct watch_queue *wqueue = pipe->watch_queue;
240 struct page **pages;
241 unsigned long *bitmap;
242 unsigned long user_bufs;
243 int ret, i, nr_pages;
244
245 if (!wqueue)
246 return -ENODEV;
247 if (wqueue->notes)
248 return -EBUSY;
249
250 if (nr_notes < 1 ||
251 nr_notes > 512) /* TODO: choose a better hard limit */
252 return -EINVAL;
253
254 nr_pages = (nr_notes + WATCH_QUEUE_NOTES_PER_PAGE - 1);
255 nr_pages /= WATCH_QUEUE_NOTES_PER_PAGE;
256 user_bufs = account_pipe_buffers(pipe->user, pipe->nr_accounted, nr_pages);
257
258 if (nr_pages > pipe->max_usage &&
259 (too_many_pipe_buffers_hard(user_bufs) ||
260 too_many_pipe_buffers_soft(user_bufs)) &&
261 pipe_is_unprivileged_user()) {
262 ret = -EPERM;
263 goto error;
264 }
265
266 nr_notes = nr_pages * WATCH_QUEUE_NOTES_PER_PAGE;
267 ret = pipe_resize_ring(pipe, roundup_pow_of_two(nr_notes));
268 if (ret < 0)
269 goto error;
270
271 /*
272 * pipe_resize_ring() does not update nr_accounted for watch_queue
273 * pipes, because the above vastly overprovisions. Set nr_accounted on
274 * and max_usage this pipe to the number that was actually charged to
275 * the user above via account_pipe_buffers.
276 */
277 pipe->max_usage = nr_pages;
278 pipe->nr_accounted = nr_pages;
279
280 ret = -ENOMEM;
281 pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL);
282 if (!pages)
283 goto error;
284
285 for (i = 0; i < nr_pages; i++) {
286 pages[i] = alloc_page(GFP_KERNEL);
287 if (!pages[i])
288 goto error_p;
289 pages[i]->private = i * WATCH_QUEUE_NOTES_PER_PAGE;
290 }
291
292 bitmap = bitmap_alloc(nr_notes, GFP_KERNEL);
293 if (!bitmap)
294 goto error_p;
295
296 bitmap_fill(bitmap, nr_notes);
297 wqueue->notes = pages;
298 wqueue->notes_bitmap = bitmap;
299 wqueue->nr_pages = nr_pages;
300 wqueue->nr_notes = nr_notes;
301 return 0;
302
303 error_p:
304 while (--i >= 0)
305 __free_page(pages[i]);
306 kfree(pages);
307 error:
308 (void) account_pipe_buffers(pipe->user, nr_pages, pipe->nr_accounted);
309 return ret;
310 }
311
312 /*
313 * Set the filter on a watch queue.
314 */
watch_queue_set_filter(struct pipe_inode_info * pipe,struct watch_notification_filter __user * _filter)315 long watch_queue_set_filter(struct pipe_inode_info *pipe,
316 struct watch_notification_filter __user *_filter)
317 {
318 struct watch_notification_type_filter *tf;
319 struct watch_notification_filter filter;
320 struct watch_type_filter *q;
321 struct watch_filter *wfilter;
322 struct watch_queue *wqueue = pipe->watch_queue;
323 int ret, nr_filter = 0, i;
324
325 if (!wqueue)
326 return -ENODEV;
327
328 if (!_filter) {
329 /* Remove the old filter */
330 wfilter = NULL;
331 goto set;
332 }
333
334 /* Grab the user's filter specification */
335 if (copy_from_user(&filter, _filter, sizeof(filter)) != 0)
336 return -EFAULT;
337 if (filter.nr_filters == 0 ||
338 filter.nr_filters > 16 ||
339 filter.__reserved != 0)
340 return -EINVAL;
341
342 tf = memdup_array_user(_filter->filters, filter.nr_filters, sizeof(*tf));
343 if (IS_ERR(tf))
344 return PTR_ERR(tf);
345
346 ret = -EINVAL;
347 for (i = 0; i < filter.nr_filters; i++) {
348 if ((tf[i].info_filter & ~tf[i].info_mask) ||
349 tf[i].info_mask & WATCH_INFO_LENGTH)
350 goto err_filter;
351 /* Ignore any unknown types */
352 if (tf[i].type >= WATCH_TYPE__NR)
353 continue;
354 nr_filter++;
355 }
356
357 /* Now we need to build the internal filter from only the relevant
358 * user-specified filters.
359 */
360 ret = -ENOMEM;
361 wfilter = kzalloc(struct_size(wfilter, filters, nr_filter), GFP_KERNEL);
362 if (!wfilter)
363 goto err_filter;
364 wfilter->nr_filters = nr_filter;
365
366 q = wfilter->filters;
367 for (i = 0; i < filter.nr_filters; i++) {
368 if (tf[i].type >= WATCH_TYPE__NR)
369 continue;
370
371 q->type = tf[i].type;
372 q->info_filter = tf[i].info_filter;
373 q->info_mask = tf[i].info_mask;
374 q->subtype_filter[0] = tf[i].subtype_filter[0];
375 __set_bit(q->type, wfilter->type_filter);
376 q++;
377 }
378
379 kfree(tf);
380 set:
381 pipe_lock(pipe);
382 wfilter = rcu_replace_pointer(wqueue->filter, wfilter,
383 lockdep_is_held(&pipe->mutex));
384 pipe_unlock(pipe);
385 if (wfilter)
386 kfree_rcu(wfilter, rcu);
387 return 0;
388
389 err_filter:
390 kfree(tf);
391 return ret;
392 }
393
__put_watch_queue(struct kref * kref)394 static void __put_watch_queue(struct kref *kref)
395 {
396 struct watch_queue *wqueue =
397 container_of(kref, struct watch_queue, usage);
398 struct watch_filter *wfilter;
399 int i;
400
401 for (i = 0; i < wqueue->nr_pages; i++)
402 __free_page(wqueue->notes[i]);
403 kfree(wqueue->notes);
404 bitmap_free(wqueue->notes_bitmap);
405
406 wfilter = rcu_access_pointer(wqueue->filter);
407 if (wfilter)
408 kfree_rcu(wfilter, rcu);
409 kfree_rcu(wqueue, rcu);
410 }
411
412 /**
413 * put_watch_queue - Dispose of a ref on a watchqueue.
414 * @wqueue: The watch queue to unref.
415 */
put_watch_queue(struct watch_queue * wqueue)416 void put_watch_queue(struct watch_queue *wqueue)
417 {
418 kref_put(&wqueue->usage, __put_watch_queue);
419 }
420 EXPORT_SYMBOL(put_watch_queue);
421
free_watch(struct rcu_head * rcu)422 static void free_watch(struct rcu_head *rcu)
423 {
424 struct watch *watch = container_of(rcu, struct watch, rcu);
425
426 put_watch_queue(rcu_access_pointer(watch->queue));
427 atomic_dec(&watch->cred->user->nr_watches);
428 put_cred(watch->cred);
429 kfree(watch);
430 }
431
__put_watch(struct kref * kref)432 static void __put_watch(struct kref *kref)
433 {
434 struct watch *watch = container_of(kref, struct watch, usage);
435
436 call_rcu(&watch->rcu, free_watch);
437 }
438
439 /*
440 * Discard a watch.
441 */
put_watch(struct watch * watch)442 static void put_watch(struct watch *watch)
443 {
444 kref_put(&watch->usage, __put_watch);
445 }
446
447 /**
448 * init_watch - Initialise a watch
449 * @watch: The watch to initialise.
450 * @wqueue: The queue to assign.
451 *
452 * Initialise a watch and set the watch queue.
453 */
init_watch(struct watch * watch,struct watch_queue * wqueue)454 void init_watch(struct watch *watch, struct watch_queue *wqueue)
455 {
456 kref_init(&watch->usage);
457 INIT_HLIST_NODE(&watch->list_node);
458 INIT_HLIST_NODE(&watch->queue_node);
459 rcu_assign_pointer(watch->queue, wqueue);
460 }
461
add_one_watch(struct watch * watch,struct watch_list * wlist,struct watch_queue * wqueue)462 static int add_one_watch(struct watch *watch, struct watch_list *wlist, struct watch_queue *wqueue)
463 {
464 const struct cred *cred;
465 struct watch *w;
466
467 hlist_for_each_entry(w, &wlist->watchers, list_node) {
468 struct watch_queue *wq = rcu_access_pointer(w->queue);
469 if (wqueue == wq && watch->id == w->id)
470 return -EBUSY;
471 }
472
473 cred = current_cred();
474 if (atomic_inc_return(&cred->user->nr_watches) > task_rlimit(current, RLIMIT_NOFILE)) {
475 atomic_dec(&cred->user->nr_watches);
476 return -EAGAIN;
477 }
478
479 watch->cred = get_cred(cred);
480 rcu_assign_pointer(watch->watch_list, wlist);
481
482 kref_get(&wqueue->usage);
483 kref_get(&watch->usage);
484 hlist_add_head(&watch->queue_node, &wqueue->watches);
485 hlist_add_head_rcu(&watch->list_node, &wlist->watchers);
486 return 0;
487 }
488
489 /**
490 * add_watch_to_object - Add a watch on an object to a watch list
491 * @watch: The watch to add
492 * @wlist: The watch list to add to
493 *
494 * @watch->queue must have been set to point to the queue to post notifications
495 * to and the watch list of the object to be watched. @watch->cred must also
496 * have been set to the appropriate credentials and a ref taken on them.
497 *
498 * The caller must pin the queue and the list both and must hold the list
499 * locked against racing watch additions/removals.
500 */
add_watch_to_object(struct watch * watch,struct watch_list * wlist)501 int add_watch_to_object(struct watch *watch, struct watch_list *wlist)
502 {
503 struct watch_queue *wqueue;
504 int ret = -ENOENT;
505
506 rcu_read_lock();
507
508 wqueue = rcu_access_pointer(watch->queue);
509 if (lock_wqueue(wqueue)) {
510 spin_lock(&wlist->lock);
511 ret = add_one_watch(watch, wlist, wqueue);
512 spin_unlock(&wlist->lock);
513 unlock_wqueue(wqueue);
514 }
515
516 rcu_read_unlock();
517 return ret;
518 }
519 EXPORT_SYMBOL(add_watch_to_object);
520
521 /**
522 * remove_watch_from_object - Remove a watch or all watches from an object.
523 * @wlist: The watch list to remove from
524 * @wq: The watch queue of interest (ignored if @all is true)
525 * @id: The ID of the watch to remove (ignored if @all is true)
526 * @all: True to remove all objects
527 *
528 * Remove a specific watch or all watches from an object. A notification is
529 * sent to the watcher to tell them that this happened.
530 */
remove_watch_from_object(struct watch_list * wlist,struct watch_queue * wq,u64 id,bool all)531 int remove_watch_from_object(struct watch_list *wlist, struct watch_queue *wq,
532 u64 id, bool all)
533 {
534 struct watch_notification_removal n;
535 struct watch_queue *wqueue;
536 struct watch *watch;
537 int ret = -EBADSLT;
538
539 rcu_read_lock();
540
541 again:
542 spin_lock(&wlist->lock);
543 hlist_for_each_entry(watch, &wlist->watchers, list_node) {
544 if (all ||
545 (watch->id == id && rcu_access_pointer(watch->queue) == wq))
546 goto found;
547 }
548 spin_unlock(&wlist->lock);
549 goto out;
550
551 found:
552 ret = 0;
553 hlist_del_init_rcu(&watch->list_node);
554 rcu_assign_pointer(watch->watch_list, NULL);
555 spin_unlock(&wlist->lock);
556
557 /* We now own the reference on watch that used to belong to wlist. */
558
559 n.watch.type = WATCH_TYPE_META;
560 n.watch.subtype = WATCH_META_REMOVAL_NOTIFICATION;
561 n.watch.info = watch->info_id | watch_sizeof(n.watch);
562 n.id = id;
563 if (id != 0)
564 n.watch.info = watch->info_id | watch_sizeof(n);
565
566 wqueue = rcu_dereference(watch->queue);
567
568 if (lock_wqueue(wqueue)) {
569 post_one_notification(wqueue, &n.watch);
570
571 if (!hlist_unhashed(&watch->queue_node)) {
572 hlist_del_init_rcu(&watch->queue_node);
573 put_watch(watch);
574 }
575
576 unlock_wqueue(wqueue);
577 }
578
579 if (wlist->release_watch) {
580 void (*release_watch)(struct watch *);
581
582 release_watch = wlist->release_watch;
583 rcu_read_unlock();
584 (*release_watch)(watch);
585 rcu_read_lock();
586 }
587 put_watch(watch);
588
589 if (all && !hlist_empty(&wlist->watchers))
590 goto again;
591 out:
592 rcu_read_unlock();
593 return ret;
594 }
595 EXPORT_SYMBOL(remove_watch_from_object);
596
597 /*
598 * Remove all the watches that are contributory to a queue. This has the
599 * potential to race with removal of the watches by the destruction of the
600 * objects being watched or with the distribution of notifications.
601 */
watch_queue_clear(struct watch_queue * wqueue)602 void watch_queue_clear(struct watch_queue *wqueue)
603 {
604 struct watch_list *wlist;
605 struct watch *watch;
606 bool release;
607
608 rcu_read_lock();
609 spin_lock_bh(&wqueue->lock);
610
611 /*
612 * This pipe can be freed by callers like free_pipe_info().
613 * Removing this reference also prevents new notifications.
614 */
615 wqueue->pipe = NULL;
616
617 while (!hlist_empty(&wqueue->watches)) {
618 watch = hlist_entry(wqueue->watches.first, struct watch, queue_node);
619 hlist_del_init_rcu(&watch->queue_node);
620 /* We now own a ref on the watch. */
621 spin_unlock_bh(&wqueue->lock);
622
623 /* We can't do the next bit under the queue lock as we need to
624 * get the list lock - which would cause a deadlock if someone
625 * was removing from the opposite direction at the same time or
626 * posting a notification.
627 */
628 wlist = rcu_dereference(watch->watch_list);
629 if (wlist) {
630 void (*release_watch)(struct watch *);
631
632 spin_lock(&wlist->lock);
633
634 release = !hlist_unhashed(&watch->list_node);
635 if (release) {
636 hlist_del_init_rcu(&watch->list_node);
637 rcu_assign_pointer(watch->watch_list, NULL);
638
639 /* We now own a second ref on the watch. */
640 }
641
642 release_watch = wlist->release_watch;
643 spin_unlock(&wlist->lock);
644
645 if (release) {
646 if (release_watch) {
647 rcu_read_unlock();
648 /* This might need to call dput(), so
649 * we have to drop all the locks.
650 */
651 (*release_watch)(watch);
652 rcu_read_lock();
653 }
654 put_watch(watch);
655 }
656 }
657
658 put_watch(watch);
659 spin_lock_bh(&wqueue->lock);
660 }
661
662 spin_unlock_bh(&wqueue->lock);
663 rcu_read_unlock();
664 }
665
666 /**
667 * get_watch_queue - Get a watch queue from its file descriptor.
668 * @fd: The fd to query.
669 */
get_watch_queue(int fd)670 struct watch_queue *get_watch_queue(int fd)
671 {
672 struct pipe_inode_info *pipe;
673 struct watch_queue *wqueue = ERR_PTR(-EINVAL);
674 CLASS(fd, f)(fd);
675
676 if (!fd_empty(f)) {
677 pipe = get_pipe_info(fd_file(f), false);
678 if (pipe && pipe->watch_queue) {
679 wqueue = pipe->watch_queue;
680 kref_get(&wqueue->usage);
681 }
682 }
683
684 return wqueue;
685 }
686 EXPORT_SYMBOL(get_watch_queue);
687
688 /*
689 * Initialise a watch queue
690 */
watch_queue_init(struct pipe_inode_info * pipe)691 int watch_queue_init(struct pipe_inode_info *pipe)
692 {
693 struct watch_queue *wqueue;
694
695 wqueue = kzalloc(sizeof(*wqueue), GFP_KERNEL);
696 if (!wqueue)
697 return -ENOMEM;
698
699 wqueue->pipe = pipe;
700 kref_init(&wqueue->usage);
701 spin_lock_init(&wqueue->lock);
702 INIT_HLIST_HEAD(&wqueue->watches);
703
704 pipe->watch_queue = wqueue;
705 return 0;
706 }
707