1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Contains the core associated with submission side polling of the SQ 4 * ring, offloading submissions from the application to a kernel thread. 5 */ 6 #include <linux/kernel.h> 7 #include <linux/errno.h> 8 #include <linux/file.h> 9 #include <linux/mm.h> 10 #include <linux/slab.h> 11 #include <linux/audit.h> 12 #include <linux/security.h> 13 #include <linux/cpuset.h> 14 #include <linux/io_uring.h> 15 16 #include <uapi/linux/io_uring.h> 17 18 #include "io_uring.h" 19 #include "tctx.h" 20 #include "napi.h" 21 #include "sqpoll.h" 22 23 #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8 24 #define IORING_TW_CAP_ENTRIES_VALUE 32 25 26 enum { 27 IO_SQ_THREAD_SHOULD_STOP = 0, 28 IO_SQ_THREAD_SHOULD_PARK, 29 }; 30 31 void io_sq_thread_unpark(struct io_sq_data *sqd) 32 __releases(&sqd->lock) 33 { 34 WARN_ON_ONCE(sqpoll_task_locked(sqd) == current); 35 36 /* 37 * Do the dance but not conditional clear_bit() because it'd race with 38 * other threads incrementing park_pending and setting the bit. 39 */ 40 clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); 41 if (atomic_dec_return(&sqd->park_pending)) 42 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); 43 mutex_unlock(&sqd->lock); 44 wake_up(&sqd->wait); 45 } 46 47 void io_sq_thread_park(struct io_sq_data *sqd) 48 __acquires(&sqd->lock) 49 { 50 struct task_struct *tsk; 51 52 atomic_inc(&sqd->park_pending); 53 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); 54 mutex_lock(&sqd->lock); 55 56 tsk = sqpoll_task_locked(sqd); 57 if (tsk) { 58 WARN_ON_ONCE(tsk == current); 59 wake_up_process(tsk); 60 } 61 } 62 63 void io_sq_thread_stop(struct io_sq_data *sqd) 64 { 65 struct task_struct *tsk; 66 67 WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state)); 68 69 set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); 70 mutex_lock(&sqd->lock); 71 tsk = sqpoll_task_locked(sqd); 72 if (tsk) { 73 WARN_ON_ONCE(tsk == current); 74 wake_up_process(tsk); 75 } 76 mutex_unlock(&sqd->lock); 77 wait_for_completion(&sqd->exited); 78 } 79 80 void io_put_sq_data(struct io_sq_data *sqd) 81 { 82 if (refcount_dec_and_test(&sqd->refs)) { 83 WARN_ON_ONCE(atomic_read(&sqd->park_pending)); 84 85 io_sq_thread_stop(sqd); 86 kfree(sqd); 87 } 88 } 89 90 static __cold void io_sqd_update_thread_idle(struct io_sq_data *sqd) 91 { 92 struct io_ring_ctx *ctx; 93 unsigned sq_thread_idle = 0; 94 95 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 96 sq_thread_idle = max(sq_thread_idle, ctx->sq_thread_idle); 97 sqd->sq_thread_idle = sq_thread_idle; 98 } 99 100 void io_sq_thread_finish(struct io_ring_ctx *ctx) 101 { 102 struct io_sq_data *sqd = ctx->sq_data; 103 104 if (sqd) { 105 io_sq_thread_park(sqd); 106 list_del_init(&ctx->sqd_list); 107 io_sqd_update_thread_idle(sqd); 108 io_sq_thread_unpark(sqd); 109 110 io_put_sq_data(sqd); 111 ctx->sq_data = NULL; 112 } 113 } 114 115 static struct io_sq_data *io_attach_sq_data(struct io_uring_params *p) 116 { 117 struct io_ring_ctx *ctx_attach; 118 struct io_sq_data *sqd; 119 CLASS(fd, f)(p->wq_fd); 120 121 if (fd_empty(f)) 122 return ERR_PTR(-ENXIO); 123 if (!io_is_uring_fops(fd_file(f))) 124 return ERR_PTR(-EINVAL); 125 126 ctx_attach = fd_file(f)->private_data; 127 sqd = ctx_attach->sq_data; 128 if (!sqd) 129 return ERR_PTR(-EINVAL); 130 if (sqd->task_tgid != current->tgid) 131 return ERR_PTR(-EPERM); 132 133 refcount_inc(&sqd->refs); 134 return sqd; 135 } 136 137 static struct io_sq_data *io_get_sq_data(struct io_uring_params *p, 138 bool *attached) 139 { 140 struct io_sq_data *sqd; 141 142 *attached = false; 143 if (p->flags & IORING_SETUP_ATTACH_WQ) { 144 sqd = io_attach_sq_data(p); 145 if (!IS_ERR(sqd)) { 146 *attached = true; 147 return sqd; 148 } 149 /* fall through for EPERM case, setup new sqd/task */ 150 if (PTR_ERR(sqd) != -EPERM) 151 return sqd; 152 } 153 154 sqd = kzalloc(sizeof(*sqd), GFP_KERNEL); 155 if (!sqd) 156 return ERR_PTR(-ENOMEM); 157 158 atomic_set(&sqd->park_pending, 0); 159 refcount_set(&sqd->refs, 1); 160 INIT_LIST_HEAD(&sqd->ctx_list); 161 mutex_init(&sqd->lock); 162 init_waitqueue_head(&sqd->wait); 163 init_completion(&sqd->exited); 164 return sqd; 165 } 166 167 static inline bool io_sqd_events_pending(struct io_sq_data *sqd) 168 { 169 return READ_ONCE(sqd->state); 170 } 171 172 static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries) 173 { 174 unsigned int to_submit; 175 int ret = 0; 176 177 to_submit = io_sqring_entries(ctx); 178 /* if we're handling multiple rings, cap submit size for fairness */ 179 if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE) 180 to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE; 181 182 if (to_submit || !wq_list_empty(&ctx->iopoll_list)) { 183 const struct cred *creds = NULL; 184 185 if (ctx->sq_creds != current_cred()) 186 creds = override_creds(ctx->sq_creds); 187 188 mutex_lock(&ctx->uring_lock); 189 if (!wq_list_empty(&ctx->iopoll_list)) 190 io_do_iopoll(ctx, true); 191 192 /* 193 * Don't submit if refs are dying, good for io_uring_register(), 194 * but also it is relied upon by io_ring_exit_work() 195 */ 196 if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) && 197 !(ctx->flags & IORING_SETUP_R_DISABLED)) 198 ret = io_submit_sqes(ctx, to_submit); 199 mutex_unlock(&ctx->uring_lock); 200 201 if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait)) 202 wake_up(&ctx->sqo_sq_wait); 203 if (creds) 204 revert_creds(creds); 205 } 206 207 return ret; 208 } 209 210 static bool io_sqd_handle_event(struct io_sq_data *sqd) 211 { 212 bool did_sig = false; 213 struct ksignal ksig; 214 215 if (test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state) || 216 signal_pending(current)) { 217 mutex_unlock(&sqd->lock); 218 if (signal_pending(current)) 219 did_sig = get_signal(&ksig); 220 wait_event(sqd->wait, !atomic_read(&sqd->park_pending)); 221 mutex_lock(&sqd->lock); 222 sqd->sq_cpu = raw_smp_processor_id(); 223 } 224 return did_sig || test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); 225 } 226 227 /* 228 * Run task_work, processing the retry_list first. The retry_list holds 229 * entries that we passed on in the previous run, if we had more task_work 230 * than we were asked to process. Newly queued task_work isn't run until the 231 * retry list has been fully processed. 232 */ 233 static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries) 234 { 235 struct io_uring_task *tctx = current->io_uring; 236 unsigned int count = 0; 237 238 if (*retry_list) { 239 *retry_list = io_handle_tw_list(*retry_list, &count, max_entries); 240 if (count >= max_entries) 241 goto out; 242 max_entries -= count; 243 } 244 *retry_list = tctx_task_work_run(tctx, max_entries, &count); 245 out: 246 if (task_work_pending(current)) 247 task_work_run(); 248 return count; 249 } 250 251 static bool io_sq_tw_pending(struct llist_node *retry_list) 252 { 253 struct io_uring_task *tctx = current->io_uring; 254 255 return retry_list || !llist_empty(&tctx->task_list); 256 } 257 258 static void io_sq_update_worktime(struct io_sq_data *sqd, struct rusage *start) 259 { 260 struct rusage end; 261 262 getrusage(current, RUSAGE_SELF, &end); 263 end.ru_stime.tv_sec -= start->ru_stime.tv_sec; 264 end.ru_stime.tv_usec -= start->ru_stime.tv_usec; 265 266 sqd->work_time += end.ru_stime.tv_usec + end.ru_stime.tv_sec * 1000000; 267 } 268 269 static int io_sq_thread(void *data) 270 { 271 struct llist_node *retry_list = NULL; 272 struct io_sq_data *sqd = data; 273 struct io_ring_ctx *ctx; 274 struct rusage start; 275 unsigned long timeout = 0; 276 char buf[TASK_COMM_LEN] = {}; 277 DEFINE_WAIT(wait); 278 279 /* offload context creation failed, just exit */ 280 if (!current->io_uring) { 281 mutex_lock(&sqd->lock); 282 rcu_assign_pointer(sqd->thread, NULL); 283 put_task_struct(current); 284 mutex_unlock(&sqd->lock); 285 goto err_out; 286 } 287 288 snprintf(buf, sizeof(buf), "iou-sqp-%d", sqd->task_pid); 289 set_task_comm(current, buf); 290 291 /* reset to our pid after we've set task_comm, for fdinfo */ 292 sqd->task_pid = current->pid; 293 294 if (sqd->sq_cpu != -1) { 295 set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu)); 296 } else { 297 set_cpus_allowed_ptr(current, cpu_online_mask); 298 sqd->sq_cpu = raw_smp_processor_id(); 299 } 300 301 /* 302 * Force audit context to get setup, in case we do prep side async 303 * operations that would trigger an audit call before any issue side 304 * audit has been done. 305 */ 306 audit_uring_entry(IORING_OP_NOP); 307 audit_uring_exit(true, 0); 308 309 mutex_lock(&sqd->lock); 310 while (1) { 311 bool cap_entries, sqt_spin = false; 312 313 if (io_sqd_events_pending(sqd) || signal_pending(current)) { 314 if (io_sqd_handle_event(sqd)) 315 break; 316 timeout = jiffies + sqd->sq_thread_idle; 317 } 318 319 cap_entries = !list_is_singular(&sqd->ctx_list); 320 getrusage(current, RUSAGE_SELF, &start); 321 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { 322 int ret = __io_sq_thread(ctx, cap_entries); 323 324 if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list))) 325 sqt_spin = true; 326 } 327 if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE)) 328 sqt_spin = true; 329 330 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 331 if (io_napi(ctx)) 332 io_napi_sqpoll_busy_poll(ctx); 333 334 if (sqt_spin || !time_after(jiffies, timeout)) { 335 if (sqt_spin) { 336 io_sq_update_worktime(sqd, &start); 337 timeout = jiffies + sqd->sq_thread_idle; 338 } 339 if (unlikely(need_resched())) { 340 mutex_unlock(&sqd->lock); 341 cond_resched(); 342 mutex_lock(&sqd->lock); 343 sqd->sq_cpu = raw_smp_processor_id(); 344 } 345 continue; 346 } 347 348 prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE); 349 if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) { 350 bool needs_sched = true; 351 352 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { 353 atomic_or(IORING_SQ_NEED_WAKEUP, 354 &ctx->rings->sq_flags); 355 if ((ctx->flags & IORING_SETUP_IOPOLL) && 356 !wq_list_empty(&ctx->iopoll_list)) { 357 needs_sched = false; 358 break; 359 } 360 361 /* 362 * Ensure the store of the wakeup flag is not 363 * reordered with the load of the SQ tail 364 */ 365 smp_mb__after_atomic(); 366 367 if (io_sqring_entries(ctx)) { 368 needs_sched = false; 369 break; 370 } 371 } 372 373 if (needs_sched) { 374 mutex_unlock(&sqd->lock); 375 schedule(); 376 mutex_lock(&sqd->lock); 377 sqd->sq_cpu = raw_smp_processor_id(); 378 } 379 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 380 atomic_andnot(IORING_SQ_NEED_WAKEUP, 381 &ctx->rings->sq_flags); 382 } 383 384 finish_wait(&sqd->wait, &wait); 385 timeout = jiffies + sqd->sq_thread_idle; 386 } 387 388 if (retry_list) 389 io_sq_tw(&retry_list, UINT_MAX); 390 391 io_uring_cancel_generic(true, sqd); 392 rcu_assign_pointer(sqd->thread, NULL); 393 put_task_struct(current); 394 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) 395 atomic_or(IORING_SQ_NEED_WAKEUP, &ctx->rings->sq_flags); 396 io_run_task_work(); 397 mutex_unlock(&sqd->lock); 398 err_out: 399 complete(&sqd->exited); 400 do_exit(0); 401 } 402 403 void io_sqpoll_wait_sq(struct io_ring_ctx *ctx) 404 { 405 DEFINE_WAIT(wait); 406 407 do { 408 if (!io_sqring_full(ctx)) 409 break; 410 prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE); 411 412 if (!io_sqring_full(ctx)) 413 break; 414 schedule(); 415 } while (!signal_pending(current)); 416 417 finish_wait(&ctx->sqo_sq_wait, &wait); 418 } 419 420 __cold int io_sq_offload_create(struct io_ring_ctx *ctx, 421 struct io_uring_params *p) 422 { 423 int ret; 424 425 /* Retain compatibility with failing for an invalid attach attempt */ 426 if ((ctx->flags & (IORING_SETUP_ATTACH_WQ | IORING_SETUP_SQPOLL)) == 427 IORING_SETUP_ATTACH_WQ) { 428 CLASS(fd, f)(p->wq_fd); 429 if (fd_empty(f)) 430 return -ENXIO; 431 if (!io_is_uring_fops(fd_file(f))) 432 return -EINVAL; 433 } 434 if (ctx->flags & IORING_SETUP_SQPOLL) { 435 struct task_struct *tsk; 436 struct io_sq_data *sqd; 437 bool attached; 438 439 ret = security_uring_sqpoll(); 440 if (ret) 441 return ret; 442 443 sqd = io_get_sq_data(p, &attached); 444 if (IS_ERR(sqd)) { 445 ret = PTR_ERR(sqd); 446 goto err; 447 } 448 449 ctx->sq_creds = get_current_cred(); 450 ctx->sq_data = sqd; 451 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle); 452 if (!ctx->sq_thread_idle) 453 ctx->sq_thread_idle = HZ; 454 455 io_sq_thread_park(sqd); 456 list_add(&ctx->sqd_list, &sqd->ctx_list); 457 io_sqd_update_thread_idle(sqd); 458 /* don't attach to a dying SQPOLL thread, would be racy */ 459 ret = (attached && !sqd->thread) ? -ENXIO : 0; 460 io_sq_thread_unpark(sqd); 461 462 if (ret < 0) 463 goto err; 464 if (attached) 465 return 0; 466 467 if (p->flags & IORING_SETUP_SQ_AFF) { 468 cpumask_var_t allowed_mask; 469 int cpu = p->sq_thread_cpu; 470 471 ret = -EINVAL; 472 if (cpu >= nr_cpu_ids || !cpu_online(cpu)) 473 goto err_sqpoll; 474 ret = -ENOMEM; 475 if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL)) 476 goto err_sqpoll; 477 ret = -EINVAL; 478 cpuset_cpus_allowed(current, allowed_mask); 479 if (!cpumask_test_cpu(cpu, allowed_mask)) { 480 free_cpumask_var(allowed_mask); 481 goto err_sqpoll; 482 } 483 free_cpumask_var(allowed_mask); 484 sqd->sq_cpu = cpu; 485 } else { 486 sqd->sq_cpu = -1; 487 } 488 489 sqd->task_pid = current->pid; 490 sqd->task_tgid = current->tgid; 491 tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE); 492 if (IS_ERR(tsk)) { 493 ret = PTR_ERR(tsk); 494 goto err_sqpoll; 495 } 496 497 mutex_lock(&sqd->lock); 498 rcu_assign_pointer(sqd->thread, tsk); 499 mutex_unlock(&sqd->lock); 500 501 get_task_struct(tsk); 502 ret = io_uring_alloc_task_context(tsk, ctx); 503 wake_up_new_task(tsk); 504 if (ret) 505 goto err; 506 } else if (p->flags & IORING_SETUP_SQ_AFF) { 507 /* Can't have SQ_AFF without SQPOLL */ 508 ret = -EINVAL; 509 goto err; 510 } 511 return 0; 512 err_sqpoll: 513 complete(&ctx->sq_data->exited); 514 err: 515 io_sq_thread_finish(ctx); 516 return ret; 517 } 518 519 __cold int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx *ctx, 520 cpumask_var_t mask) 521 { 522 struct io_sq_data *sqd = ctx->sq_data; 523 int ret = -EINVAL; 524 525 if (sqd) { 526 struct task_struct *tsk; 527 528 io_sq_thread_park(sqd); 529 /* Don't set affinity for a dying thread */ 530 tsk = sqpoll_task_locked(sqd); 531 if (tsk) 532 ret = io_wq_cpu_affinity(tsk->io_uring, mask); 533 io_sq_thread_unpark(sqd); 534 } 535 536 return ret; 537 } 538