1 // SPDX-License-Identifier: CDDL-1.0
2 /*
3 * CDDL HEADER START
4 *
5 * The contents of this file are subject to the terms of the
6 * Common Development and Distribution License (the "License").
7 * You may not use this file except in compliance with the License.
8 *
9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 * or https://opensource.org/licenses/CDDL-1.0.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
13 *
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
19 *
20 * CDDL HEADER END
21 */
22 /*
23 * Copyright 2010 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 */
26 /*
27 * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
28 * Copyright 2012 Garrett D'Amore <garrett@damore.org>. All rights reserved.
29 * Copyright (c) 2014 by Delphix. All rights reserved.
30 */
31
32 #include <sys/sysmacros.h>
33 #include <sys/timer.h>
34 #include <sys/types.h>
35 #include <sys/thread.h>
36 #include <sys/taskq.h>
37 #include <sys/kmem.h>
38 #include <pthread.h>
39
40 static pthread_key_t taskq_tsd;
41 static pthread_once_t taskq_tsd_once = PTHREAD_ONCE_INIT;
42
43 static taskq_t *__system_taskq = NULL;
44 static taskq_t *__system_delay_taskq = NULL;
45
46 taskq_t
_system_taskq(void)47 *_system_taskq(void)
48 {
49 return (__system_taskq);
50 }
51
52 taskq_t
_system_delay_taskq(void)53 *_system_delay_taskq(void)
54 {
55 return (__system_delay_taskq);
56 }
57
58 #define TASKQ_ACTIVE 0x00010000
59
60 static taskq_ent_t *
task_alloc(taskq_t * tq,int tqflags)61 task_alloc(taskq_t *tq, int tqflags)
62 {
63 taskq_ent_t *t;
64 int rv;
65
66 again: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) {
67 ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
68 tq->tq_freelist = t->tqent_next;
69 } else {
70 if (tq->tq_nalloc >= tq->tq_maxalloc) {
71 if (!(tqflags & KM_SLEEP))
72 return (NULL);
73
74 /*
75 * We don't want to exceed tq_maxalloc, but we can't
76 * wait for other tasks to complete (and thus free up
77 * task structures) without risking deadlock with
78 * the caller. So, we just delay for one second
79 * to throttle the allocation rate. If we have tasks
80 * complete before one second timeout expires then
81 * taskq_ent_free will signal us and we will
82 * immediately retry the allocation.
83 */
84 tq->tq_maxalloc_wait++;
85 rv = cv_timedwait(&tq->tq_maxalloc_cv,
86 &tq->tq_lock, ddi_get_lbolt() + hz);
87 tq->tq_maxalloc_wait--;
88 if (rv > 0)
89 goto again; /* signaled */
90 }
91 mutex_exit(&tq->tq_lock);
92
93 t = kmem_alloc(sizeof (taskq_ent_t), tqflags);
94
95 mutex_enter(&tq->tq_lock);
96 if (t != NULL) {
97 /* Make sure we start without any flags */
98 t->tqent_flags = 0;
99 tq->tq_nalloc++;
100 }
101 }
102 return (t);
103 }
104
105 static void
task_free(taskq_t * tq,taskq_ent_t * t)106 task_free(taskq_t *tq, taskq_ent_t *t)
107 {
108 if (tq->tq_nalloc <= tq->tq_minalloc) {
109 t->tqent_next = tq->tq_freelist;
110 tq->tq_freelist = t;
111 } else {
112 tq->tq_nalloc--;
113 mutex_exit(&tq->tq_lock);
114 kmem_free(t, sizeof (taskq_ent_t));
115 mutex_enter(&tq->tq_lock);
116 }
117
118 if (tq->tq_maxalloc_wait)
119 cv_signal(&tq->tq_maxalloc_cv);
120 }
121
122 taskqid_t
taskq_dispatch(taskq_t * tq,task_func_t func,void * arg,uint_t tqflags)123 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags)
124 {
125 taskq_ent_t *t;
126
127 mutex_enter(&tq->tq_lock);
128 ASSERT(tq->tq_flags & TASKQ_ACTIVE);
129 if ((t = task_alloc(tq, tqflags)) == NULL) {
130 mutex_exit(&tq->tq_lock);
131 return (0);
132 }
133 if (tqflags & TQ_FRONT) {
134 t->tqent_next = tq->tq_task.tqent_next;
135 t->tqent_prev = &tq->tq_task;
136 } else {
137 t->tqent_next = &tq->tq_task;
138 t->tqent_prev = tq->tq_task.tqent_prev;
139 }
140 t->tqent_next->tqent_prev = t;
141 t->tqent_prev->tqent_next = t;
142 t->tqent_func = func;
143 t->tqent_arg = arg;
144 t->tqent_flags = 0;
145 cv_signal(&tq->tq_dispatch_cv);
146 mutex_exit(&tq->tq_lock);
147 return (1);
148 }
149
150 taskqid_t
taskq_dispatch_delay(taskq_t * tq,task_func_t func,void * arg,uint_t tqflags,clock_t expire_time)151 taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags,
152 clock_t expire_time)
153 {
154 (void) tq, (void) func, (void) arg, (void) tqflags, (void) expire_time;
155 return (0);
156 }
157
158 int
taskq_empty_ent(taskq_ent_t * t)159 taskq_empty_ent(taskq_ent_t *t)
160 {
161 return (t->tqent_next == NULL);
162 }
163
164 void
taskq_init_ent(taskq_ent_t * t)165 taskq_init_ent(taskq_ent_t *t)
166 {
167 t->tqent_next = NULL;
168 t->tqent_prev = NULL;
169 t->tqent_func = NULL;
170 t->tqent_arg = NULL;
171 t->tqent_flags = 0;
172 }
173
174 void
taskq_dispatch_ent(taskq_t * tq,task_func_t func,void * arg,uint_t flags,taskq_ent_t * t)175 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
176 taskq_ent_t *t)
177 {
178 ASSERT(func != NULL);
179
180 /*
181 * Mark it as a prealloc'd task. This is important
182 * to ensure that we don't free it later.
183 */
184 t->tqent_flags |= TQENT_FLAG_PREALLOC;
185 /*
186 * Enqueue the task to the underlying queue.
187 */
188 mutex_enter(&tq->tq_lock);
189
190 if (flags & TQ_FRONT) {
191 t->tqent_next = tq->tq_task.tqent_next;
192 t->tqent_prev = &tq->tq_task;
193 } else {
194 t->tqent_next = &tq->tq_task;
195 t->tqent_prev = tq->tq_task.tqent_prev;
196 }
197 t->tqent_next->tqent_prev = t;
198 t->tqent_prev->tqent_next = t;
199 t->tqent_func = func;
200 t->tqent_arg = arg;
201 cv_signal(&tq->tq_dispatch_cv);
202 mutex_exit(&tq->tq_lock);
203 }
204
205 void
taskq_wait(taskq_t * tq)206 taskq_wait(taskq_t *tq)
207 {
208 mutex_enter(&tq->tq_lock);
209 while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0)
210 cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
211 mutex_exit(&tq->tq_lock);
212 }
213
214 void
taskq_wait_id(taskq_t * tq,taskqid_t id)215 taskq_wait_id(taskq_t *tq, taskqid_t id)
216 {
217 (void) id;
218 taskq_wait(tq);
219 }
220
221 void
taskq_wait_outstanding(taskq_t * tq,taskqid_t id)222 taskq_wait_outstanding(taskq_t *tq, taskqid_t id)
223 {
224 (void) id;
225 taskq_wait(tq);
226 }
227
228 static void
taskq_tsd_init(void)229 taskq_tsd_init(void)
230 {
231 VERIFY0(pthread_key_create(&taskq_tsd, NULL));
232 }
233
234 static __attribute__((noreturn)) void
taskq_thread(void * arg)235 taskq_thread(void *arg)
236 {
237 taskq_t *tq = arg;
238 taskq_ent_t *t;
239 boolean_t prealloc;
240
241 pthread_once(&taskq_tsd_once, taskq_tsd_init);
242 VERIFY0(pthread_setspecific(taskq_tsd, tq));
243
244 mutex_enter(&tq->tq_lock);
245 while (tq->tq_flags & TASKQ_ACTIVE) {
246 if ((t = tq->tq_task.tqent_next) == &tq->tq_task) {
247 if (--tq->tq_active == 0)
248 cv_broadcast(&tq->tq_wait_cv);
249 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock);
250 tq->tq_active++;
251 continue;
252 }
253 t->tqent_prev->tqent_next = t->tqent_next;
254 t->tqent_next->tqent_prev = t->tqent_prev;
255 t->tqent_next = NULL;
256 t->tqent_prev = NULL;
257 prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC;
258 mutex_exit(&tq->tq_lock);
259
260 rw_enter(&tq->tq_threadlock, RW_READER);
261 t->tqent_func(t->tqent_arg);
262 rw_exit(&tq->tq_threadlock);
263
264 mutex_enter(&tq->tq_lock);
265 if (!prealloc)
266 task_free(tq, t);
267 }
268 tq->tq_nthreads--;
269 cv_broadcast(&tq->tq_wait_cv);
270 mutex_exit(&tq->tq_lock);
271 thread_exit();
272 }
273
274 taskq_t *
taskq_create(const char * name,int nthreads,pri_t pri,int minalloc,int maxalloc,uint_t flags)275 taskq_create(const char *name, int nthreads, pri_t pri,
276 int minalloc, int maxalloc, uint_t flags)
277 {
278 (void) pri;
279 taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP);
280 int t;
281
282 if (flags & TASKQ_THREADS_CPU_PCT) {
283 int pct;
284 ASSERT3S(nthreads, >=, 0);
285 ASSERT3S(nthreads, <=, 100);
286 pct = MIN(nthreads, 100);
287 pct = MAX(pct, 0);
288
289 nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100;
290 nthreads = MAX(nthreads, 1); /* need at least 1 thread */
291 } else {
292 ASSERT3S(nthreads, >=, 1);
293 }
294
295 rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL);
296 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL);
297 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL);
298 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL);
299 cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL);
300 (void) strlcpy(tq->tq_name, name, sizeof (tq->tq_name));
301 tq->tq_flags = flags | TASKQ_ACTIVE;
302 tq->tq_active = nthreads;
303 tq->tq_nthreads = nthreads;
304 tq->tq_minalloc = minalloc;
305 tq->tq_maxalloc = maxalloc;
306 tq->tq_task.tqent_next = &tq->tq_task;
307 tq->tq_task.tqent_prev = &tq->tq_task;
308 tq->tq_threadlist = kmem_alloc(nthreads * sizeof (kthread_t *),
309 KM_SLEEP);
310
311 if (flags & TASKQ_PREPOPULATE) {
312 mutex_enter(&tq->tq_lock);
313 while (minalloc-- > 0)
314 task_free(tq, task_alloc(tq, KM_SLEEP));
315 mutex_exit(&tq->tq_lock);
316 }
317
318 for (t = 0; t < nthreads; t++)
319 VERIFY((tq->tq_threadlist[t] = thread_create_named(tq->tq_name,
320 NULL, 0, taskq_thread, tq, 0, &p0, TS_RUN, pri)) != NULL);
321
322 return (tq);
323 }
324
325 void
taskq_destroy(taskq_t * tq)326 taskq_destroy(taskq_t *tq)
327 {
328 int nthreads = tq->tq_nthreads;
329
330 taskq_wait(tq);
331
332 mutex_enter(&tq->tq_lock);
333
334 tq->tq_flags &= ~TASKQ_ACTIVE;
335 cv_broadcast(&tq->tq_dispatch_cv);
336
337 while (tq->tq_nthreads != 0)
338 cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
339
340 tq->tq_minalloc = 0;
341 while (tq->tq_nalloc != 0) {
342 ASSERT(tq->tq_freelist != NULL);
343 taskq_ent_t *tqent_nexttq = tq->tq_freelist->tqent_next;
344 task_free(tq, tq->tq_freelist);
345 tq->tq_freelist = tqent_nexttq;
346 }
347
348 mutex_exit(&tq->tq_lock);
349
350 kmem_free(tq->tq_threadlist, nthreads * sizeof (kthread_t *));
351
352 rw_destroy(&tq->tq_threadlock);
353 mutex_destroy(&tq->tq_lock);
354 cv_destroy(&tq->tq_dispatch_cv);
355 cv_destroy(&tq->tq_wait_cv);
356 cv_destroy(&tq->tq_maxalloc_cv);
357
358 kmem_free(tq, sizeof (taskq_t));
359 }
360
361 /*
362 * Create a taskq with a specified number of pool threads. Allocate
363 * and return an array of nthreads kthread_t pointers, one for each
364 * thread in the pool. The array is not ordered and must be freed
365 * by the caller.
366 */
367 taskq_t *
taskq_create_synced(const char * name,int nthreads,pri_t pri,int minalloc,int maxalloc,uint_t flags,kthread_t *** ktpp)368 taskq_create_synced(const char *name, int nthreads, pri_t pri,
369 int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp)
370 {
371 taskq_t *tq;
372 kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads,
373 KM_SLEEP);
374
375 (void) pri; (void) minalloc; (void) maxalloc;
376
377 flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH);
378
379 tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX,
380 flags | TASKQ_PREPOPULATE);
381 VERIFY(tq != NULL);
382 VERIFY(tq->tq_nthreads == nthreads);
383
384 for (int i = 0; i < nthreads; i++) {
385 kthreads[i] = tq->tq_threadlist[i];
386 }
387 *ktpp = kthreads;
388 return (tq);
389 }
390
391 int
taskq_member(taskq_t * tq,kthread_t * t)392 taskq_member(taskq_t *tq, kthread_t *t)
393 {
394 int i;
395
396 for (i = 0; i < tq->tq_nthreads; i++)
397 if (tq->tq_threadlist[i] == t)
398 return (1);
399
400 return (0);
401 }
402
403 taskq_t *
taskq_of_curthread(void)404 taskq_of_curthread(void)
405 {
406 return (pthread_getspecific(taskq_tsd));
407 }
408
409 int
taskq_cancel_id(taskq_t * tq,taskqid_t id,boolean_t wait)410 taskq_cancel_id(taskq_t *tq, taskqid_t id, boolean_t wait)
411 {
412 (void) tq, (void) id, (void) wait;
413 return (ENOENT);
414 }
415
416 void
system_taskq_init(void)417 system_taskq_init(void)
418 {
419 __system_taskq = taskq_create("system_taskq", 64, maxclsyspri, 4, 512,
420 TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
421 __system_delay_taskq = taskq_create("delay_taskq", 4, maxclsyspri, 4,
422 512, TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
423 }
424
425 void
system_taskq_fini(void)426 system_taskq_fini(void)
427 {
428 taskq_destroy(__system_taskq);
429 __system_taskq = NULL; /* defensive */
430 taskq_destroy(__system_delay_taskq);
431 __system_delay_taskq = NULL;
432 }
433