1 // SPDX-License-Identifier: GPL-2.0
2 /* Watch queue and general notification mechanism, built on pipes
3  *
4  * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  *
7  * See Documentation/core-api/watch_queue.rst
8  */
9 
10 #define pr_fmt(fmt) "watchq: " fmt
11 #include <linux/module.h>
12 #include <linux/init.h>
13 #include <linux/sched.h>
14 #include <linux/slab.h>
15 #include <linux/printk.h>
16 #include <linux/miscdevice.h>
17 #include <linux/fs.h>
18 #include <linux/mm.h>
19 #include <linux/pagemap.h>
20 #include <linux/poll.h>
21 #include <linux/uaccess.h>
22 #include <linux/vmalloc.h>
23 #include <linux/file.h>
24 #include <linux/security.h>
25 #include <linux/cred.h>
26 #include <linux/sched/signal.h>
27 #include <linux/watch_queue.h>
28 #include <linux/pipe_fs_i.h>
29 
30 MODULE_DESCRIPTION("Watch queue");
31 MODULE_AUTHOR("Red Hat, Inc.");
32 
33 #define WATCH_QUEUE_NOTE_SIZE 128
34 #define WATCH_QUEUE_NOTES_PER_PAGE (PAGE_SIZE / WATCH_QUEUE_NOTE_SIZE)
35 
36 /*
37  * This must be called under the RCU read-lock, which makes
38  * sure that the wqueue still exists. It can then take the lock,
39  * and check that the wqueue hasn't been destroyed, which in
40  * turn makes sure that the notification pipe still exists.
41  */
lock_wqueue(struct watch_queue * wqueue)42 static inline bool lock_wqueue(struct watch_queue *wqueue)
43 {
44 	spin_lock_bh(&wqueue->lock);
45 	if (unlikely(!wqueue->pipe)) {
46 		spin_unlock_bh(&wqueue->lock);
47 		return false;
48 	}
49 	return true;
50 }
51 
unlock_wqueue(struct watch_queue * wqueue)52 static inline void unlock_wqueue(struct watch_queue *wqueue)
53 {
54 	spin_unlock_bh(&wqueue->lock);
55 }
56 
watch_queue_pipe_buf_release(struct pipe_inode_info * pipe,struct pipe_buffer * buf)57 static void watch_queue_pipe_buf_release(struct pipe_inode_info *pipe,
58 					 struct pipe_buffer *buf)
59 {
60 	struct watch_queue *wqueue = (struct watch_queue *)buf->private;
61 	struct page *page;
62 	unsigned int bit;
63 
64 	/* We need to work out which note within the page this refers to, but
65 	 * the note might have been maximum size, so merely ANDing the offset
66 	 * off doesn't work.  OTOH, the note must've been more than zero size.
67 	 */
68 	bit = buf->offset + buf->len;
69 	if ((bit & (WATCH_QUEUE_NOTE_SIZE - 1)) == 0)
70 		bit -= WATCH_QUEUE_NOTE_SIZE;
71 	bit /= WATCH_QUEUE_NOTE_SIZE;
72 
73 	page = buf->page;
74 	bit += page->private;
75 
76 	set_bit(bit, wqueue->notes_bitmap);
77 	generic_pipe_buf_release(pipe, buf);
78 }
79 
80 // No try_steal function => no stealing
81 #define watch_queue_pipe_buf_try_steal NULL
82 
83 /* New data written to a pipe may be appended to a buffer with this type. */
84 static const struct pipe_buf_operations watch_queue_pipe_buf_ops = {
85 	.release	= watch_queue_pipe_buf_release,
86 	.try_steal	= watch_queue_pipe_buf_try_steal,
87 	.get		= generic_pipe_buf_get,
88 };
89 
90 /*
91  * Post a notification to a watch queue.
92  *
93  * Must be called with the RCU lock for reading, and the
94  * watch_queue lock held, which guarantees that the pipe
95  * hasn't been released.
96  */
post_one_notification(struct watch_queue * wqueue,struct watch_notification * n)97 static bool post_one_notification(struct watch_queue *wqueue,
98 				  struct watch_notification *n)
99 {
100 	void *p;
101 	struct pipe_inode_info *pipe = wqueue->pipe;
102 	struct pipe_buffer *buf;
103 	struct page *page;
104 	unsigned int head, tail, note, offset, len;
105 	bool done = false;
106 
107 	spin_lock_irq(&pipe->rd_wait.lock);
108 
109 	head = pipe->head;
110 	tail = pipe->tail;
111 	if (pipe_full(head, tail, pipe->ring_size))
112 		goto lost;
113 
114 	note = find_first_bit(wqueue->notes_bitmap, wqueue->nr_notes);
115 	if (note >= wqueue->nr_notes)
116 		goto lost;
117 
118 	page = wqueue->notes[note / WATCH_QUEUE_NOTES_PER_PAGE];
119 	offset = note % WATCH_QUEUE_NOTES_PER_PAGE * WATCH_QUEUE_NOTE_SIZE;
120 	get_page(page);
121 	len = n->info & WATCH_INFO_LENGTH;
122 	p = kmap_atomic(page);
123 	memcpy(p + offset, n, len);
124 	kunmap_atomic(p);
125 
126 	buf = pipe_buf(pipe, head);
127 	buf->page = page;
128 	buf->private = (unsigned long)wqueue;
129 	buf->ops = &watch_queue_pipe_buf_ops;
130 	buf->offset = offset;
131 	buf->len = len;
132 	buf->flags = PIPE_BUF_FLAG_WHOLE;
133 	smp_store_release(&pipe->head, head + 1); /* vs pipe_read() */
134 
135 	if (!test_and_clear_bit(note, wqueue->notes_bitmap)) {
136 		spin_unlock_irq(&pipe->rd_wait.lock);
137 		BUG();
138 	}
139 	wake_up_interruptible_sync_poll_locked(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
140 	done = true;
141 
142 out:
143 	spin_unlock_irq(&pipe->rd_wait.lock);
144 	if (done)
145 		kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
146 	return done;
147 
148 lost:
149 	buf = pipe_buf(pipe, head - 1);
150 	buf->flags |= PIPE_BUF_FLAG_LOSS;
151 	goto out;
152 }
153 
154 /*
155  * Apply filter rules to a notification.
156  */
filter_watch_notification(const struct watch_filter * wf,const struct watch_notification * n)157 static bool filter_watch_notification(const struct watch_filter *wf,
158 				      const struct watch_notification *n)
159 {
160 	const struct watch_type_filter *wt;
161 	unsigned int st_bits = sizeof(wt->subtype_filter[0]) * 8;
162 	unsigned int st_index = n->subtype / st_bits;
163 	unsigned int st_bit = 1U << (n->subtype % st_bits);
164 	int i;
165 
166 	if (!test_bit(n->type, wf->type_filter))
167 		return false;
168 
169 	for (i = 0; i < wf->nr_filters; i++) {
170 		wt = &wf->filters[i];
171 		if (n->type == wt->type &&
172 		    (wt->subtype_filter[st_index] & st_bit) &&
173 		    (n->info & wt->info_mask) == wt->info_filter)
174 			return true;
175 	}
176 
177 	return false; /* If there is a filter, the default is to reject. */
178 }
179 
180 /**
181  * __post_watch_notification - Post an event notification
182  * @wlist: The watch list to post the event to.
183  * @n: The notification record to post.
184  * @cred: The creds of the process that triggered the notification.
185  * @id: The ID to match on the watch.
186  *
187  * Post a notification of an event into a set of watch queues and let the users
188  * know.
189  *
190  * The size of the notification should be set in n->info & WATCH_INFO_LENGTH and
191  * should be in units of sizeof(*n).
192  */
__post_watch_notification(struct watch_list * wlist,struct watch_notification * n,const struct cred * cred,u64 id)193 void __post_watch_notification(struct watch_list *wlist,
194 			       struct watch_notification *n,
195 			       const struct cred *cred,
196 			       u64 id)
197 {
198 	const struct watch_filter *wf;
199 	struct watch_queue *wqueue;
200 	struct watch *watch;
201 
202 	if (((n->info & WATCH_INFO_LENGTH) >> WATCH_INFO_LENGTH__SHIFT) == 0) {
203 		WARN_ON(1);
204 		return;
205 	}
206 
207 	rcu_read_lock();
208 
209 	hlist_for_each_entry_rcu(watch, &wlist->watchers, list_node) {
210 		if (watch->id != id)
211 			continue;
212 		n->info &= ~WATCH_INFO_ID;
213 		n->info |= watch->info_id;
214 
215 		wqueue = rcu_dereference(watch->queue);
216 		wf = rcu_dereference(wqueue->filter);
217 		if (wf && !filter_watch_notification(wf, n))
218 			continue;
219 
220 		if (security_post_notification(watch->cred, cred, n) < 0)
221 			continue;
222 
223 		if (lock_wqueue(wqueue)) {
224 			post_one_notification(wqueue, n);
225 			unlock_wqueue(wqueue);
226 		}
227 	}
228 
229 	rcu_read_unlock();
230 }
231 EXPORT_SYMBOL(__post_watch_notification);
232 
233 /*
234  * Allocate sufficient pages to preallocation for the requested number of
235  * notifications.
236  */
watch_queue_set_size(struct pipe_inode_info * pipe,unsigned int nr_notes)237 long watch_queue_set_size(struct pipe_inode_info *pipe, unsigned int nr_notes)
238 {
239 	struct watch_queue *wqueue = pipe->watch_queue;
240 	struct page **pages;
241 	unsigned long *bitmap;
242 	unsigned long user_bufs;
243 	int ret, i, nr_pages;
244 
245 	if (!wqueue)
246 		return -ENODEV;
247 	if (wqueue->notes)
248 		return -EBUSY;
249 
250 	if (nr_notes < 1 ||
251 	    nr_notes > 512) /* TODO: choose a better hard limit */
252 		return -EINVAL;
253 
254 	nr_pages = (nr_notes + WATCH_QUEUE_NOTES_PER_PAGE - 1);
255 	nr_pages /= WATCH_QUEUE_NOTES_PER_PAGE;
256 	user_bufs = account_pipe_buffers(pipe->user, pipe->nr_accounted, nr_pages);
257 
258 	if (nr_pages > pipe->max_usage &&
259 	    (too_many_pipe_buffers_hard(user_bufs) ||
260 	     too_many_pipe_buffers_soft(user_bufs)) &&
261 	    pipe_is_unprivileged_user()) {
262 		ret = -EPERM;
263 		goto error;
264 	}
265 
266 	nr_notes = nr_pages * WATCH_QUEUE_NOTES_PER_PAGE;
267 	ret = pipe_resize_ring(pipe, roundup_pow_of_two(nr_notes));
268 	if (ret < 0)
269 		goto error;
270 
271 	/*
272 	 * pipe_resize_ring() does not update nr_accounted for watch_queue
273 	 * pipes, because the above vastly overprovisions. Set nr_accounted on
274 	 * and max_usage this pipe to the number that was actually charged to
275 	 * the user above via account_pipe_buffers.
276 	 */
277 	pipe->max_usage = nr_pages;
278 	pipe->nr_accounted = nr_pages;
279 
280 	ret = -ENOMEM;
281 	pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL);
282 	if (!pages)
283 		goto error;
284 
285 	for (i = 0; i < nr_pages; i++) {
286 		pages[i] = alloc_page(GFP_KERNEL);
287 		if (!pages[i])
288 			goto error_p;
289 		pages[i]->private = i * WATCH_QUEUE_NOTES_PER_PAGE;
290 	}
291 
292 	bitmap = bitmap_alloc(nr_notes, GFP_KERNEL);
293 	if (!bitmap)
294 		goto error_p;
295 
296 	bitmap_fill(bitmap, nr_notes);
297 	wqueue->notes = pages;
298 	wqueue->notes_bitmap = bitmap;
299 	wqueue->nr_pages = nr_pages;
300 	wqueue->nr_notes = nr_notes;
301 	return 0;
302 
303 error_p:
304 	while (--i >= 0)
305 		__free_page(pages[i]);
306 	kfree(pages);
307 error:
308 	(void) account_pipe_buffers(pipe->user, nr_pages, pipe->nr_accounted);
309 	return ret;
310 }
311 
312 /*
313  * Set the filter on a watch queue.
314  */
watch_queue_set_filter(struct pipe_inode_info * pipe,struct watch_notification_filter __user * _filter)315 long watch_queue_set_filter(struct pipe_inode_info *pipe,
316 			    struct watch_notification_filter __user *_filter)
317 {
318 	struct watch_notification_type_filter *tf;
319 	struct watch_notification_filter filter;
320 	struct watch_type_filter *q;
321 	struct watch_filter *wfilter;
322 	struct watch_queue *wqueue = pipe->watch_queue;
323 	int ret, nr_filter = 0, i;
324 
325 	if (!wqueue)
326 		return -ENODEV;
327 
328 	if (!_filter) {
329 		/* Remove the old filter */
330 		wfilter = NULL;
331 		goto set;
332 	}
333 
334 	/* Grab the user's filter specification */
335 	if (copy_from_user(&filter, _filter, sizeof(filter)) != 0)
336 		return -EFAULT;
337 	if (filter.nr_filters == 0 ||
338 	    filter.nr_filters > 16 ||
339 	    filter.__reserved != 0)
340 		return -EINVAL;
341 
342 	tf = memdup_array_user(_filter->filters, filter.nr_filters, sizeof(*tf));
343 	if (IS_ERR(tf))
344 		return PTR_ERR(tf);
345 
346 	ret = -EINVAL;
347 	for (i = 0; i < filter.nr_filters; i++) {
348 		if ((tf[i].info_filter & ~tf[i].info_mask) ||
349 		    tf[i].info_mask & WATCH_INFO_LENGTH)
350 			goto err_filter;
351 		/* Ignore any unknown types */
352 		if (tf[i].type >= WATCH_TYPE__NR)
353 			continue;
354 		nr_filter++;
355 	}
356 
357 	/* Now we need to build the internal filter from only the relevant
358 	 * user-specified filters.
359 	 */
360 	ret = -ENOMEM;
361 	wfilter = kzalloc(struct_size(wfilter, filters, nr_filter), GFP_KERNEL);
362 	if (!wfilter)
363 		goto err_filter;
364 	wfilter->nr_filters = nr_filter;
365 
366 	q = wfilter->filters;
367 	for (i = 0; i < filter.nr_filters; i++) {
368 		if (tf[i].type >= WATCH_TYPE__NR)
369 			continue;
370 
371 		q->type			= tf[i].type;
372 		q->info_filter		= tf[i].info_filter;
373 		q->info_mask		= tf[i].info_mask;
374 		q->subtype_filter[0]	= tf[i].subtype_filter[0];
375 		__set_bit(q->type, wfilter->type_filter);
376 		q++;
377 	}
378 
379 	kfree(tf);
380 set:
381 	pipe_lock(pipe);
382 	wfilter = rcu_replace_pointer(wqueue->filter, wfilter,
383 				      lockdep_is_held(&pipe->mutex));
384 	pipe_unlock(pipe);
385 	if (wfilter)
386 		kfree_rcu(wfilter, rcu);
387 	return 0;
388 
389 err_filter:
390 	kfree(tf);
391 	return ret;
392 }
393 
__put_watch_queue(struct kref * kref)394 static void __put_watch_queue(struct kref *kref)
395 {
396 	struct watch_queue *wqueue =
397 		container_of(kref, struct watch_queue, usage);
398 	struct watch_filter *wfilter;
399 	int i;
400 
401 	for (i = 0; i < wqueue->nr_pages; i++)
402 		__free_page(wqueue->notes[i]);
403 	kfree(wqueue->notes);
404 	bitmap_free(wqueue->notes_bitmap);
405 
406 	wfilter = rcu_access_pointer(wqueue->filter);
407 	if (wfilter)
408 		kfree_rcu(wfilter, rcu);
409 	kfree_rcu(wqueue, rcu);
410 }
411 
412 /**
413  * put_watch_queue - Dispose of a ref on a watchqueue.
414  * @wqueue: The watch queue to unref.
415  */
put_watch_queue(struct watch_queue * wqueue)416 void put_watch_queue(struct watch_queue *wqueue)
417 {
418 	kref_put(&wqueue->usage, __put_watch_queue);
419 }
420 EXPORT_SYMBOL(put_watch_queue);
421 
free_watch(struct rcu_head * rcu)422 static void free_watch(struct rcu_head *rcu)
423 {
424 	struct watch *watch = container_of(rcu, struct watch, rcu);
425 
426 	put_watch_queue(rcu_access_pointer(watch->queue));
427 	atomic_dec(&watch->cred->user->nr_watches);
428 	put_cred(watch->cred);
429 	kfree(watch);
430 }
431 
__put_watch(struct kref * kref)432 static void __put_watch(struct kref *kref)
433 {
434 	struct watch *watch = container_of(kref, struct watch, usage);
435 
436 	call_rcu(&watch->rcu, free_watch);
437 }
438 
439 /*
440  * Discard a watch.
441  */
put_watch(struct watch * watch)442 static void put_watch(struct watch *watch)
443 {
444 	kref_put(&watch->usage, __put_watch);
445 }
446 
447 /**
448  * init_watch - Initialise a watch
449  * @watch: The watch to initialise.
450  * @wqueue: The queue to assign.
451  *
452  * Initialise a watch and set the watch queue.
453  */
init_watch(struct watch * watch,struct watch_queue * wqueue)454 void init_watch(struct watch *watch, struct watch_queue *wqueue)
455 {
456 	kref_init(&watch->usage);
457 	INIT_HLIST_NODE(&watch->list_node);
458 	INIT_HLIST_NODE(&watch->queue_node);
459 	rcu_assign_pointer(watch->queue, wqueue);
460 }
461 
add_one_watch(struct watch * watch,struct watch_list * wlist,struct watch_queue * wqueue)462 static int add_one_watch(struct watch *watch, struct watch_list *wlist, struct watch_queue *wqueue)
463 {
464 	const struct cred *cred;
465 	struct watch *w;
466 
467 	hlist_for_each_entry(w, &wlist->watchers, list_node) {
468 		struct watch_queue *wq = rcu_access_pointer(w->queue);
469 		if (wqueue == wq && watch->id == w->id)
470 			return -EBUSY;
471 	}
472 
473 	cred = current_cred();
474 	if (atomic_inc_return(&cred->user->nr_watches) > task_rlimit(current, RLIMIT_NOFILE)) {
475 		atomic_dec(&cred->user->nr_watches);
476 		return -EAGAIN;
477 	}
478 
479 	watch->cred = get_cred(cred);
480 	rcu_assign_pointer(watch->watch_list, wlist);
481 
482 	kref_get(&wqueue->usage);
483 	kref_get(&watch->usage);
484 	hlist_add_head(&watch->queue_node, &wqueue->watches);
485 	hlist_add_head_rcu(&watch->list_node, &wlist->watchers);
486 	return 0;
487 }
488 
489 /**
490  * add_watch_to_object - Add a watch on an object to a watch list
491  * @watch: The watch to add
492  * @wlist: The watch list to add to
493  *
494  * @watch->queue must have been set to point to the queue to post notifications
495  * to and the watch list of the object to be watched.  @watch->cred must also
496  * have been set to the appropriate credentials and a ref taken on them.
497  *
498  * The caller must pin the queue and the list both and must hold the list
499  * locked against racing watch additions/removals.
500  */
add_watch_to_object(struct watch * watch,struct watch_list * wlist)501 int add_watch_to_object(struct watch *watch, struct watch_list *wlist)
502 {
503 	struct watch_queue *wqueue;
504 	int ret = -ENOENT;
505 
506 	rcu_read_lock();
507 
508 	wqueue = rcu_access_pointer(watch->queue);
509 	if (lock_wqueue(wqueue)) {
510 		spin_lock(&wlist->lock);
511 		ret = add_one_watch(watch, wlist, wqueue);
512 		spin_unlock(&wlist->lock);
513 		unlock_wqueue(wqueue);
514 	}
515 
516 	rcu_read_unlock();
517 	return ret;
518 }
519 EXPORT_SYMBOL(add_watch_to_object);
520 
521 /**
522  * remove_watch_from_object - Remove a watch or all watches from an object.
523  * @wlist: The watch list to remove from
524  * @wq: The watch queue of interest (ignored if @all is true)
525  * @id: The ID of the watch to remove (ignored if @all is true)
526  * @all: True to remove all objects
527  *
528  * Remove a specific watch or all watches from an object.  A notification is
529  * sent to the watcher to tell them that this happened.
530  */
remove_watch_from_object(struct watch_list * wlist,struct watch_queue * wq,u64 id,bool all)531 int remove_watch_from_object(struct watch_list *wlist, struct watch_queue *wq,
532 			     u64 id, bool all)
533 {
534 	struct watch_notification_removal n;
535 	struct watch_queue *wqueue;
536 	struct watch *watch;
537 	int ret = -EBADSLT;
538 
539 	rcu_read_lock();
540 
541 again:
542 	spin_lock(&wlist->lock);
543 	hlist_for_each_entry(watch, &wlist->watchers, list_node) {
544 		if (all ||
545 		    (watch->id == id && rcu_access_pointer(watch->queue) == wq))
546 			goto found;
547 	}
548 	spin_unlock(&wlist->lock);
549 	goto out;
550 
551 found:
552 	ret = 0;
553 	hlist_del_init_rcu(&watch->list_node);
554 	rcu_assign_pointer(watch->watch_list, NULL);
555 	spin_unlock(&wlist->lock);
556 
557 	/* We now own the reference on watch that used to belong to wlist. */
558 
559 	n.watch.type = WATCH_TYPE_META;
560 	n.watch.subtype = WATCH_META_REMOVAL_NOTIFICATION;
561 	n.watch.info = watch->info_id | watch_sizeof(n.watch);
562 	n.id = id;
563 	if (id != 0)
564 		n.watch.info = watch->info_id | watch_sizeof(n);
565 
566 	wqueue = rcu_dereference(watch->queue);
567 
568 	if (lock_wqueue(wqueue)) {
569 		post_one_notification(wqueue, &n.watch);
570 
571 		if (!hlist_unhashed(&watch->queue_node)) {
572 			hlist_del_init_rcu(&watch->queue_node);
573 			put_watch(watch);
574 		}
575 
576 		unlock_wqueue(wqueue);
577 	}
578 
579 	if (wlist->release_watch) {
580 		void (*release_watch)(struct watch *);
581 
582 		release_watch = wlist->release_watch;
583 		rcu_read_unlock();
584 		(*release_watch)(watch);
585 		rcu_read_lock();
586 	}
587 	put_watch(watch);
588 
589 	if (all && !hlist_empty(&wlist->watchers))
590 		goto again;
591 out:
592 	rcu_read_unlock();
593 	return ret;
594 }
595 EXPORT_SYMBOL(remove_watch_from_object);
596 
597 /*
598  * Remove all the watches that are contributory to a queue.  This has the
599  * potential to race with removal of the watches by the destruction of the
600  * objects being watched or with the distribution of notifications.
601  */
watch_queue_clear(struct watch_queue * wqueue)602 void watch_queue_clear(struct watch_queue *wqueue)
603 {
604 	struct watch_list *wlist;
605 	struct watch *watch;
606 	bool release;
607 
608 	rcu_read_lock();
609 	spin_lock_bh(&wqueue->lock);
610 
611 	/*
612 	 * This pipe can be freed by callers like free_pipe_info().
613 	 * Removing this reference also prevents new notifications.
614 	 */
615 	wqueue->pipe = NULL;
616 
617 	while (!hlist_empty(&wqueue->watches)) {
618 		watch = hlist_entry(wqueue->watches.first, struct watch, queue_node);
619 		hlist_del_init_rcu(&watch->queue_node);
620 		/* We now own a ref on the watch. */
621 		spin_unlock_bh(&wqueue->lock);
622 
623 		/* We can't do the next bit under the queue lock as we need to
624 		 * get the list lock - which would cause a deadlock if someone
625 		 * was removing from the opposite direction at the same time or
626 		 * posting a notification.
627 		 */
628 		wlist = rcu_dereference(watch->watch_list);
629 		if (wlist) {
630 			void (*release_watch)(struct watch *);
631 
632 			spin_lock(&wlist->lock);
633 
634 			release = !hlist_unhashed(&watch->list_node);
635 			if (release) {
636 				hlist_del_init_rcu(&watch->list_node);
637 				rcu_assign_pointer(watch->watch_list, NULL);
638 
639 				/* We now own a second ref on the watch. */
640 			}
641 
642 			release_watch = wlist->release_watch;
643 			spin_unlock(&wlist->lock);
644 
645 			if (release) {
646 				if (release_watch) {
647 					rcu_read_unlock();
648 					/* This might need to call dput(), so
649 					 * we have to drop all the locks.
650 					 */
651 					(*release_watch)(watch);
652 					rcu_read_lock();
653 				}
654 				put_watch(watch);
655 			}
656 		}
657 
658 		put_watch(watch);
659 		spin_lock_bh(&wqueue->lock);
660 	}
661 
662 	spin_unlock_bh(&wqueue->lock);
663 	rcu_read_unlock();
664 }
665 
666 /**
667  * get_watch_queue - Get a watch queue from its file descriptor.
668  * @fd: The fd to query.
669  */
get_watch_queue(int fd)670 struct watch_queue *get_watch_queue(int fd)
671 {
672 	struct pipe_inode_info *pipe;
673 	struct watch_queue *wqueue = ERR_PTR(-EINVAL);
674 	CLASS(fd, f)(fd);
675 
676 	if (!fd_empty(f)) {
677 		pipe = get_pipe_info(fd_file(f), false);
678 		if (pipe && pipe->watch_queue) {
679 			wqueue = pipe->watch_queue;
680 			kref_get(&wqueue->usage);
681 		}
682 	}
683 
684 	return wqueue;
685 }
686 EXPORT_SYMBOL(get_watch_queue);
687 
688 /*
689  * Initialise a watch queue
690  */
watch_queue_init(struct pipe_inode_info * pipe)691 int watch_queue_init(struct pipe_inode_info *pipe)
692 {
693 	struct watch_queue *wqueue;
694 
695 	wqueue = kzalloc(sizeof(*wqueue), GFP_KERNEL);
696 	if (!wqueue)
697 		return -ENOMEM;
698 
699 	wqueue->pipe = pipe;
700 	kref_init(&wqueue->usage);
701 	spin_lock_init(&wqueue->lock);
702 	INIT_HLIST_HEAD(&wqueue->watches);
703 
704 	pipe->watch_queue = wqueue;
705 	return 0;
706 }
707