xref: /src/sys/contrib/openzfs/lib/libspl/taskq.c (revision 66e85755595a451db490d2fe24267d85db4b09c2)
1 // SPDX-License-Identifier: CDDL-1.0
2 /*
3  * CDDL HEADER START
4  *
5  * The contents of this file are subject to the terms of the
6  * Common Development and Distribution License (the "License").
7  * You may not use this file except in compliance with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or https://opensource.org/licenses/CDDL-1.0.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 /*
27  * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
28  * Copyright 2012 Garrett D'Amore <garrett@damore.org>.  All rights reserved.
29  * Copyright (c) 2014 by Delphix. All rights reserved.
30  */
31 
32 #include <sys/sysmacros.h>
33 #include <sys/timer.h>
34 #include <sys/types.h>
35 #include <sys/thread.h>
36 #include <sys/taskq.h>
37 #include <sys/kmem.h>
38 #include <pthread.h>
39 
40 static pthread_key_t taskq_tsd;
41 static pthread_once_t taskq_tsd_once = PTHREAD_ONCE_INIT;
42 
43 static taskq_t *__system_taskq = NULL;
44 static taskq_t *__system_delay_taskq = NULL;
45 
46 taskq_t
_system_taskq(void)47 *_system_taskq(void)
48 {
49 	return (__system_taskq);
50 }
51 
52 taskq_t
_system_delay_taskq(void)53 *_system_delay_taskq(void)
54 {
55 	return (__system_delay_taskq);
56 }
57 
58 #define	TASKQ_ACTIVE	0x00010000
59 
60 static taskq_ent_t *
task_alloc(taskq_t * tq,int tqflags)61 task_alloc(taskq_t *tq, int tqflags)
62 {
63 	taskq_ent_t *t;
64 	int rv;
65 
66 again:	if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) {
67 		ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
68 		tq->tq_freelist = t->tqent_next;
69 	} else {
70 		if (tq->tq_nalloc >= tq->tq_maxalloc) {
71 			if (!(tqflags & KM_SLEEP))
72 				return (NULL);
73 
74 			/*
75 			 * We don't want to exceed tq_maxalloc, but we can't
76 			 * wait for other tasks to complete (and thus free up
77 			 * task structures) without risking deadlock with
78 			 * the caller.  So, we just delay for one second
79 			 * to throttle the allocation rate. If we have tasks
80 			 * complete before one second timeout expires then
81 			 * taskq_ent_free will signal us and we will
82 			 * immediately retry the allocation.
83 			 */
84 			tq->tq_maxalloc_wait++;
85 			rv = cv_timedwait(&tq->tq_maxalloc_cv,
86 			    &tq->tq_lock, ddi_get_lbolt() + hz);
87 			tq->tq_maxalloc_wait--;
88 			if (rv > 0)
89 				goto again;		/* signaled */
90 		}
91 		mutex_exit(&tq->tq_lock);
92 
93 		t = kmem_alloc(sizeof (taskq_ent_t), tqflags);
94 
95 		mutex_enter(&tq->tq_lock);
96 		if (t != NULL) {
97 			/* Make sure we start without any flags */
98 			t->tqent_flags = 0;
99 			tq->tq_nalloc++;
100 		}
101 	}
102 	return (t);
103 }
104 
105 static void
task_free(taskq_t * tq,taskq_ent_t * t)106 task_free(taskq_t *tq, taskq_ent_t *t)
107 {
108 	if (tq->tq_nalloc <= tq->tq_minalloc) {
109 		t->tqent_next = tq->tq_freelist;
110 		tq->tq_freelist = t;
111 	} else {
112 		tq->tq_nalloc--;
113 		mutex_exit(&tq->tq_lock);
114 		kmem_free(t, sizeof (taskq_ent_t));
115 		mutex_enter(&tq->tq_lock);
116 	}
117 
118 	if (tq->tq_maxalloc_wait)
119 		cv_signal(&tq->tq_maxalloc_cv);
120 }
121 
122 taskqid_t
taskq_dispatch(taskq_t * tq,task_func_t func,void * arg,uint_t tqflags)123 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags)
124 {
125 	taskq_ent_t *t;
126 
127 	mutex_enter(&tq->tq_lock);
128 	ASSERT(tq->tq_flags & TASKQ_ACTIVE);
129 	if ((t = task_alloc(tq, tqflags)) == NULL) {
130 		mutex_exit(&tq->tq_lock);
131 		return (0);
132 	}
133 	if (tqflags & TQ_FRONT) {
134 		t->tqent_next = tq->tq_task.tqent_next;
135 		t->tqent_prev = &tq->tq_task;
136 	} else {
137 		t->tqent_next = &tq->tq_task;
138 		t->tqent_prev = tq->tq_task.tqent_prev;
139 	}
140 	t->tqent_next->tqent_prev = t;
141 	t->tqent_prev->tqent_next = t;
142 	t->tqent_func = func;
143 	t->tqent_arg = arg;
144 	t->tqent_flags = 0;
145 	cv_signal(&tq->tq_dispatch_cv);
146 	mutex_exit(&tq->tq_lock);
147 	return (1);
148 }
149 
150 taskqid_t
taskq_dispatch_delay(taskq_t * tq,task_func_t func,void * arg,uint_t tqflags,clock_t expire_time)151 taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags,
152     clock_t expire_time)
153 {
154 	(void) tq, (void) func, (void) arg, (void) tqflags, (void) expire_time;
155 	return (0);
156 }
157 
158 int
taskq_empty_ent(taskq_ent_t * t)159 taskq_empty_ent(taskq_ent_t *t)
160 {
161 	return (t->tqent_next == NULL);
162 }
163 
164 void
taskq_init_ent(taskq_ent_t * t)165 taskq_init_ent(taskq_ent_t *t)
166 {
167 	t->tqent_next = NULL;
168 	t->tqent_prev = NULL;
169 	t->tqent_func = NULL;
170 	t->tqent_arg = NULL;
171 	t->tqent_flags = 0;
172 }
173 
174 void
taskq_dispatch_ent(taskq_t * tq,task_func_t func,void * arg,uint_t flags,taskq_ent_t * t)175 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
176     taskq_ent_t *t)
177 {
178 	ASSERT(func != NULL);
179 
180 	/*
181 	 * Mark it as a prealloc'd task.  This is important
182 	 * to ensure that we don't free it later.
183 	 */
184 	t->tqent_flags |= TQENT_FLAG_PREALLOC;
185 	/*
186 	 * Enqueue the task to the underlying queue.
187 	 */
188 	mutex_enter(&tq->tq_lock);
189 
190 	if (flags & TQ_FRONT) {
191 		t->tqent_next = tq->tq_task.tqent_next;
192 		t->tqent_prev = &tq->tq_task;
193 	} else {
194 		t->tqent_next = &tq->tq_task;
195 		t->tqent_prev = tq->tq_task.tqent_prev;
196 	}
197 	t->tqent_next->tqent_prev = t;
198 	t->tqent_prev->tqent_next = t;
199 	t->tqent_func = func;
200 	t->tqent_arg = arg;
201 	cv_signal(&tq->tq_dispatch_cv);
202 	mutex_exit(&tq->tq_lock);
203 }
204 
205 void
taskq_wait(taskq_t * tq)206 taskq_wait(taskq_t *tq)
207 {
208 	mutex_enter(&tq->tq_lock);
209 	while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0)
210 		cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
211 	mutex_exit(&tq->tq_lock);
212 }
213 
214 void
taskq_wait_id(taskq_t * tq,taskqid_t id)215 taskq_wait_id(taskq_t *tq, taskqid_t id)
216 {
217 	(void) id;
218 	taskq_wait(tq);
219 }
220 
221 void
taskq_wait_outstanding(taskq_t * tq,taskqid_t id)222 taskq_wait_outstanding(taskq_t *tq, taskqid_t id)
223 {
224 	(void) id;
225 	taskq_wait(tq);
226 }
227 
228 static void
taskq_tsd_init(void)229 taskq_tsd_init(void)
230 {
231 	VERIFY0(pthread_key_create(&taskq_tsd, NULL));
232 }
233 
234 static __attribute__((noreturn)) void
taskq_thread(void * arg)235 taskq_thread(void *arg)
236 {
237 	taskq_t *tq = arg;
238 	taskq_ent_t *t;
239 	boolean_t prealloc;
240 
241 	pthread_once(&taskq_tsd_once, taskq_tsd_init);
242 	VERIFY0(pthread_setspecific(taskq_tsd, tq));
243 
244 	mutex_enter(&tq->tq_lock);
245 	while (tq->tq_flags & TASKQ_ACTIVE) {
246 		if ((t = tq->tq_task.tqent_next) == &tq->tq_task) {
247 			if (--tq->tq_active == 0)
248 				cv_broadcast(&tq->tq_wait_cv);
249 			cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock);
250 			tq->tq_active++;
251 			continue;
252 		}
253 		t->tqent_prev->tqent_next = t->tqent_next;
254 		t->tqent_next->tqent_prev = t->tqent_prev;
255 		t->tqent_next = NULL;
256 		t->tqent_prev = NULL;
257 		prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC;
258 		mutex_exit(&tq->tq_lock);
259 
260 		rw_enter(&tq->tq_threadlock, RW_READER);
261 		t->tqent_func(t->tqent_arg);
262 		rw_exit(&tq->tq_threadlock);
263 
264 		mutex_enter(&tq->tq_lock);
265 		if (!prealloc)
266 			task_free(tq, t);
267 	}
268 	tq->tq_nthreads--;
269 	cv_broadcast(&tq->tq_wait_cv);
270 	mutex_exit(&tq->tq_lock);
271 	thread_exit();
272 }
273 
274 taskq_t *
taskq_create(const char * name,int nthreads,pri_t pri,int minalloc,int maxalloc,uint_t flags)275 taskq_create(const char *name, int nthreads, pri_t pri,
276     int minalloc, int maxalloc, uint_t flags)
277 {
278 	(void) pri;
279 	taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP);
280 	int t;
281 
282 	if (flags & TASKQ_THREADS_CPU_PCT) {
283 		int pct;
284 		ASSERT3S(nthreads, >=, 0);
285 		ASSERT3S(nthreads, <=, 100);
286 		pct = MIN(nthreads, 100);
287 		pct = MAX(pct, 0);
288 
289 		nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100;
290 		nthreads = MAX(nthreads, 1);	/* need at least 1 thread */
291 	} else {
292 		ASSERT3S(nthreads, >=, 1);
293 	}
294 
295 	rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL);
296 	mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL);
297 	cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL);
298 	cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL);
299 	cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL);
300 	(void) strlcpy(tq->tq_name, name, sizeof (tq->tq_name));
301 	tq->tq_flags = flags | TASKQ_ACTIVE;
302 	tq->tq_active = nthreads;
303 	tq->tq_nthreads = nthreads;
304 	tq->tq_minalloc = minalloc;
305 	tq->tq_maxalloc = maxalloc;
306 	tq->tq_task.tqent_next = &tq->tq_task;
307 	tq->tq_task.tqent_prev = &tq->tq_task;
308 	tq->tq_threadlist = kmem_alloc(nthreads * sizeof (kthread_t *),
309 	    KM_SLEEP);
310 
311 	if (flags & TASKQ_PREPOPULATE) {
312 		mutex_enter(&tq->tq_lock);
313 		while (minalloc-- > 0)
314 			task_free(tq, task_alloc(tq, KM_SLEEP));
315 		mutex_exit(&tq->tq_lock);
316 	}
317 
318 	for (t = 0; t < nthreads; t++)
319 		VERIFY((tq->tq_threadlist[t] = thread_create_named(tq->tq_name,
320 		    NULL, 0, taskq_thread, tq, 0, &p0, TS_RUN, pri)) != NULL);
321 
322 	return (tq);
323 }
324 
325 void
taskq_destroy(taskq_t * tq)326 taskq_destroy(taskq_t *tq)
327 {
328 	int nthreads = tq->tq_nthreads;
329 
330 	taskq_wait(tq);
331 
332 	mutex_enter(&tq->tq_lock);
333 
334 	tq->tq_flags &= ~TASKQ_ACTIVE;
335 	cv_broadcast(&tq->tq_dispatch_cv);
336 
337 	while (tq->tq_nthreads != 0)
338 		cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
339 
340 	tq->tq_minalloc = 0;
341 	while (tq->tq_nalloc != 0) {
342 		ASSERT(tq->tq_freelist != NULL);
343 		taskq_ent_t *tqent_nexttq = tq->tq_freelist->tqent_next;
344 		task_free(tq, tq->tq_freelist);
345 		tq->tq_freelist = tqent_nexttq;
346 	}
347 
348 	mutex_exit(&tq->tq_lock);
349 
350 	kmem_free(tq->tq_threadlist, nthreads * sizeof (kthread_t *));
351 
352 	rw_destroy(&tq->tq_threadlock);
353 	mutex_destroy(&tq->tq_lock);
354 	cv_destroy(&tq->tq_dispatch_cv);
355 	cv_destroy(&tq->tq_wait_cv);
356 	cv_destroy(&tq->tq_maxalloc_cv);
357 
358 	kmem_free(tq, sizeof (taskq_t));
359 }
360 
361 /*
362  * Create a taskq with a specified number of pool threads. Allocate
363  * and return an array of nthreads kthread_t pointers, one for each
364  * thread in the pool. The array is not ordered and must be freed
365  * by the caller.
366  */
367 taskq_t *
taskq_create_synced(const char * name,int nthreads,pri_t pri,int minalloc,int maxalloc,uint_t flags,kthread_t *** ktpp)368 taskq_create_synced(const char *name, int nthreads, pri_t pri,
369     int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp)
370 {
371 	taskq_t *tq;
372 	kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads,
373 	    KM_SLEEP);
374 
375 	(void) pri; (void) minalloc; (void) maxalloc;
376 
377 	flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH);
378 
379 	tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX,
380 	    flags | TASKQ_PREPOPULATE);
381 	VERIFY(tq != NULL);
382 	VERIFY(tq->tq_nthreads == nthreads);
383 
384 	for (int i = 0; i < nthreads; i++) {
385 		kthreads[i] = tq->tq_threadlist[i];
386 	}
387 	*ktpp = kthreads;
388 	return (tq);
389 }
390 
391 int
taskq_member(taskq_t * tq,kthread_t * t)392 taskq_member(taskq_t *tq, kthread_t *t)
393 {
394 	int i;
395 
396 	for (i = 0; i < tq->tq_nthreads; i++)
397 		if (tq->tq_threadlist[i] == t)
398 			return (1);
399 
400 	return (0);
401 }
402 
403 taskq_t *
taskq_of_curthread(void)404 taskq_of_curthread(void)
405 {
406 	return (pthread_getspecific(taskq_tsd));
407 }
408 
409 int
taskq_cancel_id(taskq_t * tq,taskqid_t id,boolean_t wait)410 taskq_cancel_id(taskq_t *tq, taskqid_t id, boolean_t wait)
411 {
412 	(void) tq, (void) id, (void) wait;
413 	return (ENOENT);
414 }
415 
416 void
system_taskq_init(void)417 system_taskq_init(void)
418 {
419 	__system_taskq = taskq_create("system_taskq", 64, maxclsyspri, 4, 512,
420 	    TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
421 	__system_delay_taskq = taskq_create("delay_taskq", 4, maxclsyspri, 4,
422 	    512, TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
423 }
424 
425 void
system_taskq_fini(void)426 system_taskq_fini(void)
427 {
428 	taskq_destroy(__system_taskq);
429 	__system_taskq = NULL; /* defensive */
430 	taskq_destroy(__system_delay_taskq);
431 	__system_delay_taskq = NULL;
432 }
433