1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Copyright (C) 2007 Oracle. All rights reserved. 4 * Copyright (C) 2014 Fujitsu. All rights reserved. 5 */ 6 7 #include <linux/kthread.h> 8 #include <linux/slab.h> 9 #include <linux/list.h> 10 #include <linux/spinlock.h> 11 #include <linux/freezer.h> 12 #include <trace/events/btrfs.h> 13 #include "async-thread.h" 14 15 enum { 16 WORK_DONE_BIT, 17 WORK_ORDER_DONE_BIT, 18 }; 19 20 #define NO_THRESHOLD (-1) 21 #define DEFAULT_THRESHOLD (32) 22 23 struct btrfs_workqueue { 24 struct workqueue_struct *normal_wq; 25 26 /* File system this workqueue services */ 27 struct btrfs_fs_info *fs_info; 28 29 /* List head pointing to ordered work list */ 30 struct list_head ordered_list; 31 32 /* Spinlock for ordered_list */ 33 spinlock_t list_lock; 34 35 /* Thresholding related variants */ 36 atomic_t pending; 37 38 /* Up limit of concurrency workers */ 39 int limit_active; 40 41 /* Current number of concurrency workers */ 42 int current_active; 43 44 /* Threshold to change current_active */ 45 int thresh; 46 unsigned int count; 47 spinlock_t thres_lock; 48 }; 49 50 struct btrfs_fs_info * __pure btrfs_workqueue_owner(const struct btrfs_workqueue *wq) 51 { 52 return wq->fs_info; 53 } 54 55 struct btrfs_fs_info * __pure btrfs_work_owner(const struct btrfs_work *work) 56 { 57 return work->wq->fs_info; 58 } 59 60 bool btrfs_workqueue_normal_congested(const struct btrfs_workqueue *wq) 61 { 62 /* 63 * We could compare wq->pending with num_online_cpus() 64 * to support "thresh == NO_THRESHOLD" case, but it requires 65 * moving up atomic_inc/dec in thresh_queue/exec_hook. Let's 66 * postpone it until someone needs the support of that case. 67 */ 68 if (wq->thresh == NO_THRESHOLD) 69 return false; 70 71 return atomic_read(&wq->pending) > wq->thresh * 2; 72 } 73 74 static void btrfs_init_workqueue(struct btrfs_workqueue *wq, 75 struct btrfs_fs_info *fs_info) 76 { 77 wq->fs_info = fs_info; 78 atomic_set(&wq->pending, 0); 79 INIT_LIST_HEAD(&wq->ordered_list); 80 spin_lock_init(&wq->list_lock); 81 spin_lock_init(&wq->thres_lock); 82 } 83 84 struct btrfs_workqueue *btrfs_alloc_workqueue(struct btrfs_fs_info *fs_info, 85 const char *name, unsigned int flags, 86 int limit_active, int thresh) 87 { 88 struct btrfs_workqueue *ret = kzalloc(sizeof(*ret), GFP_KERNEL); 89 90 if (!ret) 91 return NULL; 92 93 btrfs_init_workqueue(ret, fs_info); 94 95 ret->limit_active = limit_active; 96 if (thresh == 0) 97 thresh = DEFAULT_THRESHOLD; 98 /* For low threshold, disabling threshold is a better choice */ 99 if (thresh < DEFAULT_THRESHOLD) { 100 ret->current_active = limit_active; 101 ret->thresh = NO_THRESHOLD; 102 } else { 103 /* 104 * For threshold-able wq, let its concurrency grow on demand. 105 * Use minimal max_active at alloc time to reduce resource 106 * usage. 107 */ 108 ret->current_active = 1; 109 ret->thresh = thresh; 110 } 111 112 ret->normal_wq = alloc_workqueue("btrfs-%s", flags, ret->current_active, 113 name); 114 if (!ret->normal_wq) { 115 kfree(ret); 116 return NULL; 117 } 118 119 trace_btrfs_workqueue_alloc(ret, name); 120 return ret; 121 } 122 123 struct btrfs_workqueue *btrfs_alloc_ordered_workqueue( 124 struct btrfs_fs_info *fs_info, const char *name, 125 unsigned int flags) 126 { 127 struct btrfs_workqueue *ret; 128 129 ret = kzalloc(sizeof(*ret), GFP_KERNEL); 130 if (!ret) 131 return NULL; 132 133 btrfs_init_workqueue(ret, fs_info); 134 135 /* Ordered workqueues don't allow @max_active adjustments. */ 136 ret->limit_active = 1; 137 ret->current_active = 1; 138 ret->thresh = NO_THRESHOLD; 139 140 ret->normal_wq = alloc_ordered_workqueue("btrfs-%s", flags, name); 141 if (!ret->normal_wq) { 142 kfree(ret); 143 return NULL; 144 } 145 146 trace_btrfs_workqueue_alloc(ret, name); 147 return ret; 148 } 149 150 /* 151 * Hook for threshold which will be called in btrfs_queue_work. 152 * This hook WILL be called in IRQ handler context, 153 * so workqueue_set_max_active MUST NOT be called in this hook 154 */ 155 static inline void thresh_queue_hook(struct btrfs_workqueue *wq) 156 { 157 if (wq->thresh == NO_THRESHOLD) 158 return; 159 atomic_inc(&wq->pending); 160 } 161 162 /* 163 * Hook for threshold which will be called before executing the work, 164 * This hook is called in kthread content. 165 * So workqueue_set_max_active is called here. 166 */ 167 static inline void thresh_exec_hook(struct btrfs_workqueue *wq) 168 { 169 int new_current_active; 170 long pending; 171 bool need_change = false; 172 173 if (wq->thresh == NO_THRESHOLD) 174 return; 175 176 atomic_dec(&wq->pending); 177 spin_lock(&wq->thres_lock); 178 /* 179 * Use wq->count to limit the calling frequency of 180 * workqueue_set_max_active. 181 */ 182 wq->count++; 183 wq->count %= (wq->thresh / 4); 184 if (!wq->count) 185 goto out; 186 new_current_active = wq->current_active; 187 188 /* 189 * pending may be changed later, but it's OK since we really 190 * don't need it so accurate to calculate new_max_active. 191 */ 192 pending = atomic_read(&wq->pending); 193 if (pending > wq->thresh) 194 new_current_active++; 195 if (pending < wq->thresh / 2) 196 new_current_active--; 197 new_current_active = clamp_val(new_current_active, 1, wq->limit_active); 198 if (new_current_active != wq->current_active) { 199 need_change = true; 200 wq->current_active = new_current_active; 201 } 202 out: 203 spin_unlock(&wq->thres_lock); 204 205 if (need_change) 206 workqueue_set_max_active(wq->normal_wq, wq->current_active); 207 } 208 209 static void run_ordered_work(struct btrfs_workqueue *wq, 210 struct btrfs_work *self) 211 { 212 struct list_head *list = &wq->ordered_list; 213 struct btrfs_work *work; 214 spinlock_t *lock = &wq->list_lock; 215 unsigned long flags; 216 bool free_self = false; 217 218 while (1) { 219 spin_lock_irqsave(lock, flags); 220 if (list_empty(list)) 221 break; 222 work = list_first_entry(list, struct btrfs_work, ordered_list); 223 if (!test_bit(WORK_DONE_BIT, &work->flags)) 224 break; 225 /* 226 * Orders all subsequent loads after reading WORK_DONE_BIT, 227 * paired with the smp_mb__before_atomic in btrfs_work_helper 228 * this guarantees that the ordered function will see all 229 * updates from ordinary work function. 230 */ 231 smp_rmb(); 232 233 /* 234 * we are going to call the ordered done function, but 235 * we leave the work item on the list as a barrier so 236 * that later work items that are done don't have their 237 * functions called before this one returns 238 */ 239 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags)) 240 break; 241 trace_btrfs_ordered_sched(work); 242 spin_unlock_irqrestore(lock, flags); 243 work->ordered_func(work, false); 244 245 /* now take the lock again and drop our item from the list */ 246 spin_lock_irqsave(lock, flags); 247 list_del(&work->ordered_list); 248 spin_unlock_irqrestore(lock, flags); 249 250 if (work == self) { 251 /* 252 * This is the work item that the worker is currently 253 * executing. 254 * 255 * The kernel workqueue code guarantees non-reentrancy 256 * of work items. I.e., if a work item with the same 257 * address and work function is queued twice, the second 258 * execution is blocked until the first one finishes. A 259 * work item may be freed and recycled with the same 260 * work function; the workqueue code assumes that the 261 * original work item cannot depend on the recycled work 262 * item in that case (see find_worker_executing_work()). 263 * 264 * Note that different types of Btrfs work can depend on 265 * each other, and one type of work on one Btrfs 266 * filesystem may even depend on the same type of work 267 * on another Btrfs filesystem via, e.g., a loop device. 268 * Therefore, we must not allow the current work item to 269 * be recycled until we are really done, otherwise we 270 * break the above assumption and can deadlock. 271 */ 272 free_self = true; 273 } else { 274 /* 275 * We don't want to call the ordered free functions with 276 * the lock held. 277 */ 278 work->ordered_func(work, true); 279 /* NB: work must not be dereferenced past this point. */ 280 trace_btrfs_all_work_done(wq->fs_info, work); 281 } 282 } 283 spin_unlock_irqrestore(lock, flags); 284 285 if (free_self) { 286 self->ordered_func(self, true); 287 /* NB: self must not be dereferenced past this point. */ 288 trace_btrfs_all_work_done(wq->fs_info, self); 289 } 290 } 291 292 static void btrfs_work_helper(struct work_struct *normal_work) 293 { 294 struct btrfs_work *work = container_of(normal_work, struct btrfs_work, 295 normal_work); 296 struct btrfs_workqueue *wq = work->wq; 297 bool need_order = false; 298 299 /* 300 * We should not touch things inside work in the following cases: 301 * 1) after work->func() if it has no ordered_func(..., true) to free 302 * Since the struct is freed in work->func(). 303 * 2) after setting WORK_DONE_BIT 304 * The work may be freed in other threads almost instantly. 305 * So we save the needed things here. 306 */ 307 if (work->ordered_func) 308 need_order = true; 309 310 trace_btrfs_work_sched(work); 311 thresh_exec_hook(wq); 312 work->func(work); 313 if (need_order) { 314 /* 315 * Ensures all memory accesses done in the work function are 316 * ordered before setting the WORK_DONE_BIT. Ensuring the thread 317 * which is going to executed the ordered work sees them. 318 * Pairs with the smp_rmb in run_ordered_work. 319 */ 320 smp_mb__before_atomic(); 321 set_bit(WORK_DONE_BIT, &work->flags); 322 run_ordered_work(wq, work); 323 } else { 324 /* NB: work must not be dereferenced past this point. */ 325 trace_btrfs_all_work_done(wq->fs_info, work); 326 } 327 } 328 329 void btrfs_init_work(struct btrfs_work *work, btrfs_func_t func, 330 btrfs_ordered_func_t ordered_func) 331 { 332 work->func = func; 333 work->ordered_func = ordered_func; 334 INIT_WORK(&work->normal_work, btrfs_work_helper); 335 INIT_LIST_HEAD(&work->ordered_list); 336 work->flags = 0; 337 } 338 339 void btrfs_queue_work(struct btrfs_workqueue *wq, struct btrfs_work *work) 340 { 341 unsigned long flags; 342 343 work->wq = wq; 344 thresh_queue_hook(wq); 345 if (work->ordered_func) { 346 spin_lock_irqsave(&wq->list_lock, flags); 347 list_add_tail(&work->ordered_list, &wq->ordered_list); 348 spin_unlock_irqrestore(&wq->list_lock, flags); 349 } 350 trace_btrfs_work_queued(work); 351 queue_work(wq->normal_wq, &work->normal_work); 352 } 353 354 void btrfs_destroy_workqueue(struct btrfs_workqueue *wq) 355 { 356 if (!wq) 357 return; 358 destroy_workqueue(wq->normal_wq); 359 trace_btrfs_workqueue_destroy(wq); 360 kfree(wq); 361 } 362 363 void btrfs_workqueue_set_max(struct btrfs_workqueue *wq, int limit_active) 364 { 365 if (wq) 366 wq->limit_active = limit_active; 367 } 368 369 void btrfs_flush_workqueue(struct btrfs_workqueue *wq) 370 { 371 flush_workqueue(wq->normal_wq); 372 } 373