xref: /qemu/job.c (revision c302660920acf48425a1317d56122ea8af60fbc4)
1 /*
2  * Background jobs (long-running operations)
3  *
4  * Copyright (c) 2011 IBM Corp.
5  * Copyright (c) 2012, 2018 Red Hat, Inc.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23  * THE SOFTWARE.
24  */
25 
26 #include "qemu/osdep.h"
27 #include "qapi/error.h"
28 #include "qemu/job.h"
29 #include "qemu/id.h"
30 #include "qemu/main-loop.h"
31 #include "block/aio-wait.h"
32 #include "trace/trace-root.h"
33 #include "qapi/qapi-events-job.h"
34 
35 /*
36  * The job API is composed of two categories of functions.
37  *
38  * The first includes functions used by the monitor.  The monitor is
39  * peculiar in that it accesses the job list with job_get, and
40  * therefore needs consistency across job_get and the actual operation
41  * (e.g. job_user_cancel). To achieve this consistency, the caller
42  * calls job_lock/job_unlock itself around the whole operation.
43  *
44  *
45  * The second includes functions used by the job drivers and sometimes
46  * by the core block layer. These delegate the locking to the callee instead.
47  */
48 
49 /*
50  * job_mutex protects the jobs list, but also makes the
51  * struct job fields thread-safe.
52  */
53 QemuMutex job_mutex;
54 
55 /* Protected by job_mutex */
56 static QLIST_HEAD(, Job) jobs = QLIST_HEAD_INITIALIZER(jobs);
57 
58 /* Job State Transition Table */
59 bool JobSTT[JOB_STATUS__MAX][JOB_STATUS__MAX] = {
60                                     /* U, C, R, P, Y, S, W, D, X, E, N */
61     /* U: */ [JOB_STATUS_UNDEFINED] = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
62     /* C: */ [JOB_STATUS_CREATED]   = {0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1},
63     /* R: */ [JOB_STATUS_RUNNING]   = {0, 0, 0, 1, 1, 0, 1, 0, 1, 0, 0},
64     /* P: */ [JOB_STATUS_PAUSED]    = {0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
65     /* Y: */ [JOB_STATUS_READY]     = {0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0},
66     /* S: */ [JOB_STATUS_STANDBY]   = {0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
67     /* W: */ [JOB_STATUS_WAITING]   = {0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0},
68     /* D: */ [JOB_STATUS_PENDING]   = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
69     /* X: */ [JOB_STATUS_ABORTING]  = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
70     /* E: */ [JOB_STATUS_CONCLUDED] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
71     /* N: */ [JOB_STATUS_NULL]      = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
72 };
73 
74 bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
75                                     /* U, C, R, P, Y, S, W, D, X, E, N */
76     [JOB_VERB_CANCEL]               = {0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0},
77     [JOB_VERB_PAUSE]                = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
78     [JOB_VERB_RESUME]               = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
79     [JOB_VERB_SET_SPEED]            = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
80     [JOB_VERB_COMPLETE]             = {0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0},
81     [JOB_VERB_FINALIZE]             = {0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
82     [JOB_VERB_DISMISS]              = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
83     [JOB_VERB_CHANGE]               = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
84 };
85 
86 /* Transactional group of jobs */
87 struct JobTxn {
88 
89     /* Is this txn being cancelled? */
90     bool aborting;
91 
92     /* List of jobs */
93     QLIST_HEAD(, Job) jobs;
94 
95     /* Reference count */
96     int refcnt;
97 };
98 
job_lock(void)99 void job_lock(void)
100 {
101     qemu_mutex_lock(&job_mutex);
102 }
103 
job_unlock(void)104 void job_unlock(void)
105 {
106     qemu_mutex_unlock(&job_mutex);
107 }
108 
job_init(void)109 static void __attribute__((__constructor__)) job_init(void)
110 {
111     qemu_mutex_init(&job_mutex);
112 }
113 
job_txn_new(void)114 JobTxn *job_txn_new(void)
115 {
116     JobTxn *txn = g_new0(JobTxn, 1);
117     QLIST_INIT(&txn->jobs);
118     txn->refcnt = 1;
119     return txn;
120 }
121 
122 /* Called with job_mutex held. */
job_txn_ref_locked(JobTxn * txn)123 static void job_txn_ref_locked(JobTxn *txn)
124 {
125     txn->refcnt++;
126 }
127 
job_txn_unref_locked(JobTxn * txn)128 void job_txn_unref_locked(JobTxn *txn)
129 {
130     if (txn && --txn->refcnt == 0) {
131         g_free(txn);
132     }
133 }
134 
job_txn_unref(JobTxn * txn)135 void job_txn_unref(JobTxn *txn)
136 {
137     JOB_LOCK_GUARD();
138     job_txn_unref_locked(txn);
139 }
140 
141 /**
142  * @txn: The transaction (may be NULL)
143  * @job: Job to add to the transaction
144  *
145  * Add @job to the transaction.  The @job must not already be in a transaction.
146  * The caller must call either job_txn_unref() or job_completed() to release
147  * the reference that is automatically grabbed here.
148  *
149  * If @txn is NULL, the function does nothing.
150  *
151  * Called with job_mutex held.
152  */
job_txn_add_job_locked(JobTxn * txn,Job * job)153 static void job_txn_add_job_locked(JobTxn *txn, Job *job)
154 {
155     if (!txn) {
156         return;
157     }
158 
159     assert(!job->txn);
160     job->txn = txn;
161 
162     QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
163     job_txn_ref_locked(txn);
164 }
165 
166 /* Called with job_mutex held. */
job_txn_del_job_locked(Job * job)167 static void job_txn_del_job_locked(Job *job)
168 {
169     if (job->txn) {
170         QLIST_REMOVE(job, txn_list);
171         job_txn_unref_locked(job->txn);
172         job->txn = NULL;
173     }
174 }
175 
176 /* Called with job_mutex held, but releases it temporarily. */
job_txn_apply_locked(Job * job,int fn (Job *))177 static int job_txn_apply_locked(Job *job, int fn(Job *))
178 {
179     Job *other_job, *next;
180     JobTxn *txn = job->txn;
181     int rc = 0;
182 
183     /*
184      * Similar to job_completed_txn_abort, we take each job's lock before
185      * applying fn, but since we assume that outer_ctx is held by the caller,
186      * we need to release it here to avoid holding the lock twice - which would
187      * break AIO_WAIT_WHILE from within fn.
188      */
189     job_ref_locked(job);
190 
191     QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
192         rc = fn(other_job);
193         if (rc) {
194             break;
195         }
196     }
197 
198     job_unref_locked(job);
199     return rc;
200 }
201 
job_is_internal(Job * job)202 bool job_is_internal(Job *job)
203 {
204     return (job->id == NULL);
205 }
206 
207 /* Called with job_mutex held. */
job_state_transition_locked(Job * job,JobStatus s1)208 static void job_state_transition_locked(Job *job, JobStatus s1)
209 {
210     JobStatus s0 = job->status;
211     assert(s1 >= 0 && s1 < JOB_STATUS__MAX);
212     trace_job_state_transition(job, job->ret,
213                                JobSTT[s0][s1] ? "allowed" : "disallowed",
214                                JobStatus_str(s0), JobStatus_str(s1));
215     assert(JobSTT[s0][s1]);
216     job->status = s1;
217 
218     if (!job_is_internal(job) && s1 != s0) {
219         qapi_event_send_job_status_change(job->id, job->status);
220     }
221 }
222 
job_apply_verb_locked(Job * job,JobVerb verb,Error ** errp)223 int job_apply_verb_locked(Job *job, JobVerb verb, Error **errp)
224 {
225     JobStatus s0 = job->status;
226     assert(verb >= 0 && verb < JOB_VERB__MAX);
227     trace_job_apply_verb(job, JobStatus_str(s0), JobVerb_str(verb),
228                          JobVerbTable[verb][s0] ? "allowed" : "prohibited");
229     if (JobVerbTable[verb][s0]) {
230         return 0;
231     }
232     error_setg(errp, "Job '%s' in state '%s' cannot accept command verb '%s'",
233                job->id, JobStatus_str(s0), JobVerb_str(verb));
234     return -EPERM;
235 }
236 
job_type(const Job * job)237 JobType job_type(const Job *job)
238 {
239     return job->driver->job_type;
240 }
241 
job_type_str(const Job * job)242 const char *job_type_str(const Job *job)
243 {
244     return JobType_str(job_type(job));
245 }
246 
job_is_cancelled_locked(Job * job)247 bool job_is_cancelled_locked(Job *job)
248 {
249     /* force_cancel may be true only if cancelled is true, too */
250     assert(job->cancelled || !job->force_cancel);
251     return job->force_cancel;
252 }
253 
job_is_paused(Job * job)254 bool job_is_paused(Job *job)
255 {
256     JOB_LOCK_GUARD();
257     return job->paused;
258 }
259 
job_is_cancelled(Job * job)260 bool job_is_cancelled(Job *job)
261 {
262     JOB_LOCK_GUARD();
263     return job_is_cancelled_locked(job);
264 }
265 
266 /* Called with job_mutex held. */
job_cancel_requested_locked(Job * job)267 static bool job_cancel_requested_locked(Job *job)
268 {
269     return job->cancelled;
270 }
271 
job_cancel_requested(Job * job)272 bool job_cancel_requested(Job *job)
273 {
274     JOB_LOCK_GUARD();
275     return job_cancel_requested_locked(job);
276 }
277 
job_is_ready_locked(Job * job)278 bool job_is_ready_locked(Job *job)
279 {
280     switch (job->status) {
281     case JOB_STATUS_UNDEFINED:
282     case JOB_STATUS_CREATED:
283     case JOB_STATUS_RUNNING:
284     case JOB_STATUS_PAUSED:
285     case JOB_STATUS_WAITING:
286     case JOB_STATUS_PENDING:
287     case JOB_STATUS_ABORTING:
288     case JOB_STATUS_CONCLUDED:
289     case JOB_STATUS_NULL:
290         return false;
291     case JOB_STATUS_READY:
292     case JOB_STATUS_STANDBY:
293         return true;
294     default:
295         g_assert_not_reached();
296     }
297     return false;
298 }
299 
job_is_ready(Job * job)300 bool job_is_ready(Job *job)
301 {
302     JOB_LOCK_GUARD();
303     return job_is_ready_locked(job);
304 }
305 
job_is_completed_locked(Job * job)306 bool job_is_completed_locked(Job *job)
307 {
308     switch (job->status) {
309     case JOB_STATUS_UNDEFINED:
310     case JOB_STATUS_CREATED:
311     case JOB_STATUS_RUNNING:
312     case JOB_STATUS_PAUSED:
313     case JOB_STATUS_READY:
314     case JOB_STATUS_STANDBY:
315         return false;
316     case JOB_STATUS_WAITING:
317     case JOB_STATUS_PENDING:
318     case JOB_STATUS_ABORTING:
319     case JOB_STATUS_CONCLUDED:
320     case JOB_STATUS_NULL:
321         return true;
322     default:
323         g_assert_not_reached();
324     }
325     return false;
326 }
327 
job_is_completed(Job * job)328 static bool job_is_completed(Job *job)
329 {
330     JOB_LOCK_GUARD();
331     return job_is_completed_locked(job);
332 }
333 
job_started_locked(Job * job)334 static bool job_started_locked(Job *job)
335 {
336     return job->co;
337 }
338 
339 /* Called with job_mutex held. */
job_should_pause_locked(Job * job)340 static bool job_should_pause_locked(Job *job)
341 {
342     return job->pause_count > 0;
343 }
344 
job_next_locked(Job * job)345 Job *job_next_locked(Job *job)
346 {
347     if (!job) {
348         return QLIST_FIRST(&jobs);
349     }
350     return QLIST_NEXT(job, job_list);
351 }
352 
job_next(Job * job)353 Job *job_next(Job *job)
354 {
355     JOB_LOCK_GUARD();
356     return job_next_locked(job);
357 }
358 
job_get_locked(const char * id)359 Job *job_get_locked(const char *id)
360 {
361     Job *job;
362 
363     QLIST_FOREACH(job, &jobs, job_list) {
364         if (job->id && !strcmp(id, job->id)) {
365             return job;
366         }
367     }
368 
369     return NULL;
370 }
371 
job_set_aio_context(Job * job,AioContext * ctx)372 void job_set_aio_context(Job *job, AioContext *ctx)
373 {
374     /* protect against read in job_finish_sync_locked and job_start */
375     GLOBAL_STATE_CODE();
376     /* protect against read in job_do_yield_locked */
377     JOB_LOCK_GUARD();
378     /* ensure the job is quiescent while the AioContext is changed */
379     assert(job->paused || job_is_completed_locked(job));
380     job->aio_context = ctx;
381 }
382 
383 /* Called with job_mutex *not* held. */
job_sleep_timer_cb(void * opaque)384 static void job_sleep_timer_cb(void *opaque)
385 {
386     Job *job = opaque;
387 
388     job_enter(job);
389 }
390 
job_create(const char * job_id,const JobDriver * driver,JobTxn * txn,AioContext * ctx,int flags,BlockCompletionFunc * cb,void * opaque,Error ** errp)391 void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
392                  AioContext *ctx, int flags, BlockCompletionFunc *cb,
393                  void *opaque, Error **errp)
394 {
395     Job *job;
396 
397     JOB_LOCK_GUARD();
398 
399     if (job_id) {
400         if (flags & JOB_INTERNAL) {
401             error_setg(errp, "Cannot specify job ID for internal job");
402             return NULL;
403         }
404         if (!id_wellformed(job_id)) {
405             error_setg(errp, "Invalid job ID '%s'", job_id);
406             return NULL;
407         }
408         if (job_get_locked(job_id)) {
409             error_setg(errp, "Job ID '%s' already in use", job_id);
410             return NULL;
411         }
412     } else if (!(flags & JOB_INTERNAL)) {
413         error_setg(errp, "An explicit job ID is required");
414         return NULL;
415     }
416 
417     job = g_malloc0(driver->instance_size);
418     job->driver        = driver;
419     job->id            = g_strdup(job_id);
420     job->refcnt        = 1;
421     job->aio_context   = ctx;
422     job->busy          = false;
423     job->paused        = true;
424     job->pause_count   = 1;
425     job->auto_finalize = !(flags & JOB_MANUAL_FINALIZE);
426     job->auto_dismiss  = !(flags & JOB_MANUAL_DISMISS);
427     job->cb            = cb;
428     job->opaque        = opaque;
429 
430     progress_init(&job->progress);
431 
432     notifier_list_init(&job->on_finalize_cancelled);
433     notifier_list_init(&job->on_finalize_completed);
434     notifier_list_init(&job->on_pending);
435     notifier_list_init(&job->on_ready);
436     notifier_list_init(&job->on_idle);
437 
438     job_state_transition_locked(job, JOB_STATUS_CREATED);
439     aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
440                    QEMU_CLOCK_REALTIME, SCALE_NS,
441                    job_sleep_timer_cb, job);
442 
443     QLIST_INSERT_HEAD(&jobs, job, job_list);
444 
445     /* Single jobs are modeled as single-job transactions for sake of
446      * consolidating the job management logic */
447     if (!txn) {
448         txn = job_txn_new();
449         job_txn_add_job_locked(txn, job);
450         job_txn_unref_locked(txn);
451     } else {
452         job_txn_add_job_locked(txn, job);
453     }
454 
455     return job;
456 }
457 
job_ref_locked(Job * job)458 void job_ref_locked(Job *job)
459 {
460     ++job->refcnt;
461 }
462 
job_unref_locked(Job * job)463 void job_unref_locked(Job *job)
464 {
465     GLOBAL_STATE_CODE();
466 
467     if (--job->refcnt == 0) {
468         assert(job->status == JOB_STATUS_NULL);
469         assert(!timer_pending(&job->sleep_timer));
470         assert(!job->txn);
471 
472         if (job->driver->free) {
473             job_unlock();
474             job->driver->free(job);
475             job_lock();
476         }
477 
478         QLIST_REMOVE(job, job_list);
479 
480         progress_destroy(&job->progress);
481         error_free(job->err);
482         g_free(job->id);
483         g_free(job);
484     }
485 }
486 
job_progress_update(Job * job,uint64_t done)487 void job_progress_update(Job *job, uint64_t done)
488 {
489     progress_work_done(&job->progress, done);
490 }
491 
job_progress_set_remaining(Job * job,uint64_t remaining)492 void job_progress_set_remaining(Job *job, uint64_t remaining)
493 {
494     progress_set_remaining(&job->progress, remaining);
495 }
496 
job_progress_increase_remaining(Job * job,uint64_t delta)497 void job_progress_increase_remaining(Job *job, uint64_t delta)
498 {
499     progress_increase_remaining(&job->progress, delta);
500 }
501 
502 /**
503  * To be called when a cancelled job is finalised.
504  * Called with job_mutex held.
505  */
job_event_cancelled_locked(Job * job)506 static void job_event_cancelled_locked(Job *job)
507 {
508     notifier_list_notify(&job->on_finalize_cancelled, job);
509 }
510 
511 /**
512  * To be called when a successfully completed job is finalised.
513  * Called with job_mutex held.
514  */
job_event_completed_locked(Job * job)515 static void job_event_completed_locked(Job *job)
516 {
517     notifier_list_notify(&job->on_finalize_completed, job);
518 }
519 
520 /* Called with job_mutex held. */
job_event_pending_locked(Job * job)521 static void job_event_pending_locked(Job *job)
522 {
523     notifier_list_notify(&job->on_pending, job);
524 }
525 
526 /* Called with job_mutex held. */
job_event_ready_locked(Job * job)527 static void job_event_ready_locked(Job *job)
528 {
529     notifier_list_notify(&job->on_ready, job);
530 }
531 
532 /* Called with job_mutex held. */
job_event_idle_locked(Job * job)533 static void job_event_idle_locked(Job *job)
534 {
535     notifier_list_notify(&job->on_idle, job);
536 }
537 
job_enter_cond_locked(Job * job,bool (* fn)(Job * job))538 void job_enter_cond_locked(Job *job, bool(*fn)(Job *job))
539 {
540     if (!job_started_locked(job)) {
541         return;
542     }
543     if (job->deferred_to_main_loop) {
544         return;
545     }
546 
547     if (job->busy) {
548         return;
549     }
550 
551     if (fn && !fn(job)) {
552         return;
553     }
554 
555     assert(!job->deferred_to_main_loop);
556     timer_del(&job->sleep_timer);
557     job->busy = true;
558     job_unlock();
559     aio_co_wake(job->co);
560     job_lock();
561 }
562 
job_enter(Job * job)563 void job_enter(Job *job)
564 {
565     JOB_LOCK_GUARD();
566     job_enter_cond_locked(job, NULL);
567 }
568 
569 /* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
570  * Reentering the job coroutine with job_enter() before the timer has expired
571  * is allowed and cancels the timer.
572  *
573  * If @ns is (uint64_t) -1, no timer is scheduled and job_enter() must be
574  * called explicitly.
575  *
576  * Called with job_mutex held, but releases it temporarily.
577  */
job_do_yield_locked(Job * job,uint64_t ns)578 static void coroutine_fn job_do_yield_locked(Job *job, uint64_t ns)
579 {
580     AioContext *next_aio_context;
581 
582     if (ns != -1) {
583         timer_mod(&job->sleep_timer, ns);
584     }
585     job->busy = false;
586     job_event_idle_locked(job);
587     job_unlock();
588     qemu_coroutine_yield();
589     job_lock();
590 
591     next_aio_context = job->aio_context;
592     /*
593      * Coroutine has resumed, but in the meanwhile the job AioContext
594      * might have changed via bdrv_try_change_aio_context(), so we need to move
595      * the coroutine too in the new aiocontext.
596      */
597     while (qemu_get_current_aio_context() != next_aio_context) {
598         job_unlock();
599         aio_co_reschedule_self(next_aio_context);
600         job_lock();
601         next_aio_context = job->aio_context;
602     }
603 
604     /* Set by job_enter_cond_locked() before re-entering the coroutine.  */
605     assert(job->busy);
606 }
607 
608 /* Called with job_mutex held, but releases it temporarily. */
job_pause_point_locked(Job * job)609 static void coroutine_fn job_pause_point_locked(Job *job)
610 {
611     assert(job && job_started_locked(job));
612 
613     if (!job_should_pause_locked(job)) {
614         return;
615     }
616     if (job_is_cancelled_locked(job)) {
617         return;
618     }
619 
620     if (job->driver->pause) {
621         job_unlock();
622         job->driver->pause(job);
623         job_lock();
624     }
625 
626     if (job_should_pause_locked(job) && !job_is_cancelled_locked(job)) {
627         JobStatus status = job->status;
628         job_state_transition_locked(job, status == JOB_STATUS_READY
629                                     ? JOB_STATUS_STANDBY
630                                     : JOB_STATUS_PAUSED);
631         job->paused = true;
632         job_do_yield_locked(job, -1);
633         job->paused = false;
634         job_state_transition_locked(job, status);
635     }
636 
637     if (job->driver->resume) {
638         job_unlock();
639         job->driver->resume(job);
640         job_lock();
641     }
642 }
643 
job_pause_point(Job * job)644 void coroutine_fn job_pause_point(Job *job)
645 {
646     JOB_LOCK_GUARD();
647     job_pause_point_locked(job);
648 }
649 
job_yield(Job * job)650 void coroutine_fn job_yield(Job *job)
651 {
652     JOB_LOCK_GUARD();
653     assert(job->busy);
654 
655     /* Check cancellation *before* setting busy = false, too!  */
656     if (job_is_cancelled_locked(job)) {
657         return;
658     }
659 
660     if (!job_should_pause_locked(job)) {
661         job_do_yield_locked(job, -1);
662     }
663 
664     job_pause_point_locked(job);
665 }
666 
job_sleep_ns(Job * job,int64_t ns)667 void coroutine_fn job_sleep_ns(Job *job, int64_t ns)
668 {
669     JOB_LOCK_GUARD();
670     assert(job->busy);
671 
672     /* Check cancellation *before* setting busy = false, too!  */
673     if (job_is_cancelled_locked(job)) {
674         return;
675     }
676 
677     if (!job_should_pause_locked(job)) {
678         job_do_yield_locked(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
679     }
680 
681     job_pause_point_locked(job);
682 }
683 
684 /* Assumes the job_mutex is held */
job_timer_not_pending_locked(Job * job)685 static bool job_timer_not_pending_locked(Job *job)
686 {
687     return !timer_pending(&job->sleep_timer);
688 }
689 
job_pause_locked(Job * job)690 void job_pause_locked(Job *job)
691 {
692     job->pause_count++;
693     if (!job->paused) {
694         job_enter_cond_locked(job, NULL);
695     }
696 }
697 
job_pause(Job * job)698 void job_pause(Job *job)
699 {
700     JOB_LOCK_GUARD();
701     job_pause_locked(job);
702 }
703 
job_resume_locked(Job * job)704 void job_resume_locked(Job *job)
705 {
706     assert(job->pause_count > 0);
707     job->pause_count--;
708     if (job->pause_count) {
709         return;
710     }
711 
712     /* kick only if no timer is pending */
713     job_enter_cond_locked(job, job_timer_not_pending_locked);
714 }
715 
job_resume(Job * job)716 void job_resume(Job *job)
717 {
718     JOB_LOCK_GUARD();
719     job_resume_locked(job);
720 }
721 
job_user_pause_locked(Job * job,Error ** errp)722 void job_user_pause_locked(Job *job, Error **errp)
723 {
724     if (job_apply_verb_locked(job, JOB_VERB_PAUSE, errp)) {
725         return;
726     }
727     if (job->user_paused) {
728         error_setg(errp, "Job is already paused");
729         return;
730     }
731     job->user_paused = true;
732     job_pause_locked(job);
733 }
734 
job_user_paused_locked(Job * job)735 bool job_user_paused_locked(Job *job)
736 {
737     return job->user_paused;
738 }
739 
job_user_resume_locked(Job * job,Error ** errp)740 void job_user_resume_locked(Job *job, Error **errp)
741 {
742     assert(job);
743     GLOBAL_STATE_CODE();
744     if (!job->user_paused || job->pause_count <= 0) {
745         error_setg(errp, "Can't resume a job that was not paused");
746         return;
747     }
748     if (job_apply_verb_locked(job, JOB_VERB_RESUME, errp)) {
749         return;
750     }
751     if (job->driver->user_resume) {
752         job_unlock();
753         job->driver->user_resume(job);
754         job_lock();
755     }
756     job->user_paused = false;
757     job_resume_locked(job);
758 }
759 
760 /* Called with job_mutex held, but releases it temporarily. */
job_do_dismiss_locked(Job * job)761 static void job_do_dismiss_locked(Job *job)
762 {
763     assert(job);
764     job->busy = false;
765     job->paused = false;
766     job->deferred_to_main_loop = true;
767 
768     job_txn_del_job_locked(job);
769 
770     job_state_transition_locked(job, JOB_STATUS_NULL);
771     job_unref_locked(job);
772 }
773 
job_dismiss_locked(Job ** jobptr,Error ** errp)774 void job_dismiss_locked(Job **jobptr, Error **errp)
775 {
776     Job *job = *jobptr;
777     /* similarly to _complete, this is QMP-interface only. */
778     assert(job->id);
779     if (job_apply_verb_locked(job, JOB_VERB_DISMISS, errp)) {
780         return;
781     }
782 
783     job_do_dismiss_locked(job);
784     *jobptr = NULL;
785 }
786 
job_early_fail(Job * job)787 void job_early_fail(Job *job)
788 {
789     JOB_LOCK_GUARD();
790     assert(job->status == JOB_STATUS_CREATED);
791     job_do_dismiss_locked(job);
792 }
793 
794 /* Called with job_mutex held. */
job_conclude_locked(Job * job)795 static void job_conclude_locked(Job *job)
796 {
797     job_state_transition_locked(job, JOB_STATUS_CONCLUDED);
798     if (job->auto_dismiss || !job_started_locked(job)) {
799         job_do_dismiss_locked(job);
800     }
801 }
802 
803 /* Called with job_mutex held. */
job_update_rc_locked(Job * job)804 static void job_update_rc_locked(Job *job)
805 {
806     if (!job->ret && job_is_cancelled_locked(job)) {
807         job->ret = -ECANCELED;
808     }
809     if (job->ret) {
810         if (!job->err) {
811             error_setg(&job->err, "%s", strerror(-job->ret));
812         }
813         job_state_transition_locked(job, JOB_STATUS_ABORTING);
814     }
815 }
816 
job_commit(Job * job)817 static void job_commit(Job *job)
818 {
819     assert(!job->ret);
820     GLOBAL_STATE_CODE();
821     if (job->driver->commit) {
822         job->driver->commit(job);
823     }
824 }
825 
job_abort(Job * job)826 static void job_abort(Job *job)
827 {
828     assert(job->ret);
829     GLOBAL_STATE_CODE();
830     if (job->driver->abort) {
831         job->driver->abort(job);
832     }
833 }
834 
job_clean(Job * job)835 static void job_clean(Job *job)
836 {
837     GLOBAL_STATE_CODE();
838     if (job->driver->clean) {
839         job->driver->clean(job);
840     }
841 }
842 
843 /*
844  * Called with job_mutex held, but releases it temporarily.
845  */
job_finalize_single_locked(Job * job)846 static int job_finalize_single_locked(Job *job)
847 {
848     int job_ret;
849 
850     assert(job_is_completed_locked(job));
851 
852     /* Ensure abort is called for late-transactional failures */
853     job_update_rc_locked(job);
854 
855     job_ret = job->ret;
856     job_unlock();
857 
858     if (!job_ret) {
859         job_commit(job);
860     } else {
861         job_abort(job);
862     }
863     job_clean(job);
864 
865     if (job->cb) {
866         job->cb(job->opaque, job_ret);
867     }
868 
869     job_lock();
870 
871     /* Emit events only if we actually started */
872     if (job_started_locked(job)) {
873         if (job_is_cancelled_locked(job)) {
874             job_event_cancelled_locked(job);
875         } else {
876             job_event_completed_locked(job);
877         }
878     }
879 
880     job_txn_del_job_locked(job);
881     job_conclude_locked(job);
882     return 0;
883 }
884 
885 /*
886  * Called with job_mutex held, but releases it temporarily.
887  */
job_cancel_async_locked(Job * job,bool force)888 static void job_cancel_async_locked(Job *job, bool force)
889 {
890     GLOBAL_STATE_CODE();
891     if (job->driver->cancel) {
892         job_unlock();
893         force = job->driver->cancel(job, force);
894         job_lock();
895     } else {
896         /* No .cancel() means the job will behave as if force-cancelled */
897         force = true;
898     }
899 
900     if (job->user_paused) {
901         /* Do not call job_enter here, the caller will handle it.  */
902         if (job->driver->user_resume) {
903             job_unlock();
904             job->driver->user_resume(job);
905             job_lock();
906         }
907         job->user_paused = false;
908         assert(job->pause_count > 0);
909         job->pause_count--;
910     }
911 
912     /*
913      * Ignore soft cancel requests after the job is already done
914      * (We will still invoke job->driver->cancel() above, but if the
915      * job driver supports soft cancelling and the job is done, that
916      * should be a no-op, too.  We still call it so it can override
917      * @force.)
918      */
919     if (force || !job->deferred_to_main_loop) {
920         job->cancelled = true;
921         /* To prevent 'force == false' overriding a previous 'force == true' */
922         job->force_cancel |= force;
923     }
924 }
925 
926 /*
927  * Called with job_mutex held, but releases it temporarily.
928  */
job_completed_txn_abort_locked(Job * job)929 static void job_completed_txn_abort_locked(Job *job)
930 {
931     JobTxn *txn = job->txn;
932     Job *other_job;
933 
934     if (txn->aborting) {
935         /*
936          * We are cancelled by another job, which will handle everything.
937          */
938         return;
939     }
940     txn->aborting = true;
941     job_txn_ref_locked(txn);
942 
943     job_ref_locked(job);
944 
945     /* Other jobs are effectively cancelled by us, set the status for
946      * them; this job, however, may or may not be cancelled, depending
947      * on the caller, so leave it. */
948     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
949         if (other_job != job) {
950             /*
951              * This is a transaction: If one job failed, no result will matter.
952              * Therefore, pass force=true to terminate all other jobs as quickly
953              * as possible.
954              */
955             job_cancel_async_locked(other_job, true);
956         }
957     }
958     while (!QLIST_EMPTY(&txn->jobs)) {
959         other_job = QLIST_FIRST(&txn->jobs);
960         if (!job_is_completed_locked(other_job)) {
961             assert(job_cancel_requested_locked(other_job));
962             job_finish_sync_locked(other_job, NULL, NULL);
963         }
964         job_finalize_single_locked(other_job);
965     }
966 
967     job_unref_locked(job);
968     job_txn_unref_locked(txn);
969 }
970 
971 /* Called with job_mutex held, but releases it temporarily */
job_prepare_locked(Job * job)972 static int job_prepare_locked(Job *job)
973 {
974     int ret;
975 
976     GLOBAL_STATE_CODE();
977 
978     if (job->ret == 0 && job->driver->prepare) {
979         job_unlock();
980         ret = job->driver->prepare(job);
981         job_lock();
982         job->ret = ret;
983         job_update_rc_locked(job);
984     }
985 
986     return job->ret;
987 }
988 
989 /* Called with job_mutex held */
job_needs_finalize_locked(Job * job)990 static int job_needs_finalize_locked(Job *job)
991 {
992     return !job->auto_finalize;
993 }
994 
995 /* Called with job_mutex held */
job_do_finalize_locked(Job * job)996 static void job_do_finalize_locked(Job *job)
997 {
998     int rc;
999     assert(job && job->txn);
1000 
1001     /* prepare the transaction to complete */
1002     rc = job_txn_apply_locked(job, job_prepare_locked);
1003     if (rc) {
1004         job_completed_txn_abort_locked(job);
1005     } else {
1006         job_txn_apply_locked(job, job_finalize_single_locked);
1007     }
1008 }
1009 
job_finalize_locked(Job * job,Error ** errp)1010 void job_finalize_locked(Job *job, Error **errp)
1011 {
1012     assert(job && job->id);
1013     if (job_apply_verb_locked(job, JOB_VERB_FINALIZE, errp)) {
1014         return;
1015     }
1016     job_do_finalize_locked(job);
1017 }
1018 
1019 /* Called with job_mutex held. */
job_transition_to_pending_locked(Job * job)1020 static int job_transition_to_pending_locked(Job *job)
1021 {
1022     job_state_transition_locked(job, JOB_STATUS_PENDING);
1023     if (!job->auto_finalize) {
1024         job_event_pending_locked(job);
1025     }
1026     return 0;
1027 }
1028 
job_transition_to_ready(Job * job)1029 void job_transition_to_ready(Job *job)
1030 {
1031     JOB_LOCK_GUARD();
1032     job_state_transition_locked(job, JOB_STATUS_READY);
1033     job_event_ready_locked(job);
1034 }
1035 
1036 /* Called with job_mutex held. */
job_completed_txn_success_locked(Job * job)1037 static void job_completed_txn_success_locked(Job *job)
1038 {
1039     JobTxn *txn = job->txn;
1040     Job *other_job;
1041 
1042     job_state_transition_locked(job, JOB_STATUS_WAITING);
1043 
1044     /*
1045      * Successful completion, see if there are other running jobs in this
1046      * txn.
1047      */
1048     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
1049         if (!job_is_completed_locked(other_job)) {
1050             return;
1051         }
1052         assert(other_job->ret == 0);
1053     }
1054 
1055     job_txn_apply_locked(job, job_transition_to_pending_locked);
1056 
1057     /* If no jobs need manual finalization, automatically do so */
1058     if (job_txn_apply_locked(job, job_needs_finalize_locked) == 0) {
1059         job_do_finalize_locked(job);
1060     }
1061 }
1062 
1063 /* Called with job_mutex held. */
job_completed_locked(Job * job)1064 static void job_completed_locked(Job *job)
1065 {
1066     assert(job && job->txn && !job_is_completed_locked(job));
1067 
1068     job_update_rc_locked(job);
1069     trace_job_completed(job, job->ret);
1070     if (job->ret) {
1071         job_completed_txn_abort_locked(job);
1072     } else {
1073         job_completed_txn_success_locked(job);
1074     }
1075 }
1076 
1077 /**
1078  * Useful only as a type shim for aio_bh_schedule_oneshot.
1079  * Called with job_mutex *not* held.
1080  */
job_exit(void * opaque)1081 static void job_exit(void *opaque)
1082 {
1083     Job *job = (Job *)opaque;
1084     JOB_LOCK_GUARD();
1085     job_ref_locked(job);
1086 
1087     /* This is a lie, we're not quiescent, but still doing the completion
1088      * callbacks. However, completion callbacks tend to involve operations that
1089      * drain block nodes, and if .drained_poll still returned true, we would
1090      * deadlock. */
1091     job->busy = false;
1092     job_event_idle_locked(job);
1093 
1094     job_completed_locked(job);
1095     job_unref_locked(job);
1096 }
1097 
1098 /**
1099  * All jobs must allow a pause point before entering their job proper. This
1100  * ensures that jobs can be paused prior to being started, then resumed later.
1101  */
job_co_entry(void * opaque)1102 static void coroutine_fn job_co_entry(void *opaque)
1103 {
1104     Job *job = opaque;
1105     int ret;
1106 
1107     assert(job && job->driver && job->driver->run);
1108     WITH_JOB_LOCK_GUARD() {
1109         assert(job->aio_context == qemu_get_current_aio_context());
1110         job_pause_point_locked(job);
1111     }
1112     ret = job->driver->run(job, &job->err);
1113     WITH_JOB_LOCK_GUARD() {
1114         job->ret = ret;
1115         job->deferred_to_main_loop = true;
1116         job->busy = true;
1117     }
1118     aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
1119 }
1120 
job_start(Job * job)1121 void job_start(Job *job)
1122 {
1123     assert(qemu_in_main_thread());
1124 
1125     WITH_JOB_LOCK_GUARD() {
1126         assert(job && !job_started_locked(job) && job->paused &&
1127             job->driver && job->driver->run);
1128         job->co = qemu_coroutine_create(job_co_entry, job);
1129         job->pause_count--;
1130         job->busy = true;
1131         job->paused = false;
1132         job_state_transition_locked(job, JOB_STATUS_RUNNING);
1133     }
1134     aio_co_enter(job->aio_context, job->co);
1135 }
1136 
job_cancel_locked(Job * job,bool force)1137 void job_cancel_locked(Job *job, bool force)
1138 {
1139     if (job->status == JOB_STATUS_CONCLUDED) {
1140         job_do_dismiss_locked(job);
1141         return;
1142     }
1143     job_cancel_async_locked(job, force);
1144     if (!job_started_locked(job)) {
1145         job_completed_locked(job);
1146     } else if (job->deferred_to_main_loop) {
1147         /*
1148          * job_cancel_async() ignores soft-cancel requests for jobs
1149          * that are already done (i.e. deferred to the main loop).  We
1150          * have to check again whether the job is really cancelled.
1151          * (job_cancel_requested() and job_is_cancelled() are equivalent
1152          * here, because job_cancel_async() will make soft-cancel
1153          * requests no-ops when deferred_to_main_loop is true.  We
1154          * choose to call job_is_cancelled() to show that we invoke
1155          * job_completed_txn_abort() only for force-cancelled jobs.)
1156          */
1157         if (job_is_cancelled_locked(job)) {
1158             job_completed_txn_abort_locked(job);
1159         }
1160     } else {
1161         job_enter_cond_locked(job, NULL);
1162     }
1163 }
1164 
job_user_cancel_locked(Job * job,bool force,Error ** errp)1165 void job_user_cancel_locked(Job *job, bool force, Error **errp)
1166 {
1167     if (job_apply_verb_locked(job, JOB_VERB_CANCEL, errp)) {
1168         return;
1169     }
1170     job_cancel_locked(job, force);
1171 }
1172 
1173 /* A wrapper around job_cancel_locked() taking an Error ** parameter so it may
1174  * be used with job_finish_sync_locked() without the need for (rather nasty)
1175  * function pointer casts there.
1176  *
1177  * Called with job_mutex held.
1178  */
job_cancel_err_locked(Job * job,Error ** errp)1179 static void job_cancel_err_locked(Job *job, Error **errp)
1180 {
1181     job_cancel_locked(job, false);
1182 }
1183 
1184 /**
1185  * Same as job_cancel_err(), but force-cancel.
1186  * Called with job_mutex held.
1187  */
job_force_cancel_err_locked(Job * job,Error ** errp)1188 static void job_force_cancel_err_locked(Job *job, Error **errp)
1189 {
1190     job_cancel_locked(job, true);
1191 }
1192 
job_cancel_sync_locked(Job * job,bool force)1193 int job_cancel_sync_locked(Job *job, bool force)
1194 {
1195     if (force) {
1196         return job_finish_sync_locked(job, &job_force_cancel_err_locked, NULL);
1197     } else {
1198         return job_finish_sync_locked(job, &job_cancel_err_locked, NULL);
1199     }
1200 }
1201 
job_cancel_sync(Job * job,bool force)1202 int job_cancel_sync(Job *job, bool force)
1203 {
1204     JOB_LOCK_GUARD();
1205     return job_cancel_sync_locked(job, force);
1206 }
1207 
job_cancel_sync_all(void)1208 void job_cancel_sync_all(void)
1209 {
1210     Job *job;
1211     JOB_LOCK_GUARD();
1212 
1213     while ((job = job_next_locked(NULL))) {
1214         job_cancel_sync_locked(job, true);
1215     }
1216 }
1217 
job_complete_sync_locked(Job * job,Error ** errp)1218 int job_complete_sync_locked(Job *job, Error **errp)
1219 {
1220     return job_finish_sync_locked(job, job_complete_locked, errp);
1221 }
1222 
job_complete_locked(Job * job,Error ** errp)1223 void job_complete_locked(Job *job, Error **errp)
1224 {
1225     /* Should not be reachable via external interface for internal jobs */
1226     assert(job->id);
1227     GLOBAL_STATE_CODE();
1228     if (job_apply_verb_locked(job, JOB_VERB_COMPLETE, errp)) {
1229         return;
1230     }
1231     if (job_cancel_requested_locked(job) || !job->driver->complete) {
1232         error_setg(errp, "The active block job '%s' cannot be completed",
1233                    job->id);
1234         return;
1235     }
1236 
1237     job_unlock();
1238     job->driver->complete(job, errp);
1239     job_lock();
1240 }
1241 
job_finish_sync_locked(Job * job,void (* finish)(Job *,Error ** errp),Error ** errp)1242 int job_finish_sync_locked(Job *job,
1243                            void (*finish)(Job *, Error **errp),
1244                            Error **errp)
1245 {
1246     Error *local_err = NULL;
1247     int ret;
1248     GLOBAL_STATE_CODE();
1249 
1250     job_ref_locked(job);
1251 
1252     if (finish) {
1253         finish(job, &local_err);
1254     }
1255     if (local_err) {
1256         error_propagate(errp, local_err);
1257         job_unref_locked(job);
1258         return -EBUSY;
1259     }
1260 
1261     job_unlock();
1262     AIO_WAIT_WHILE_UNLOCKED(job->aio_context,
1263                             (job_enter(job), !job_is_completed(job)));
1264     job_lock();
1265 
1266     ret = (job_is_cancelled_locked(job) && job->ret == 0)
1267           ? -ECANCELED : job->ret;
1268     job_unref_locked(job);
1269     return ret;
1270 }
1271